People say that every front-end developer must implement their own JS framework, and every EPAM developer must implement a scheduler.
Let's assume we're developing a streaming platform (like Netflix) and our business team has asked us to implement two repeated tasks:
- Charge our paid users for monthly subscriptions.
- Upload partner-submitted videos to the cloud storage in preparation for processing.
Note: Some examples later on will be in C#, but knowledge of the language is not required. Most of the popular languages/frameworks look very similar, and the code is applicable to any tech stack.
Many of you have faced challenges like these in your own work, and the most typical and familiar solutions include:
- Using a timer (thread) and writing a custom implementation (for EPAM developers).
- Using Hangfire or Quartz (for experienced developers working on small projects).
- Using external out of process schedulers, like k8s cronjobs (for experienced developers working on bigger projects).
Each strategy has its own unique benefits and drawbacks, which we will compare a bit later.
First, let's define some of the common vocabulary.
-
Scheduler
simply refers to a system which is managing periodic runs. Jobs are scheduled to run when a given trigger occurs. Triggers are used to support a wide variety of scheduling options (Cron, External events, etc.). -
Job
is simply an execution of what you want to have done. Common examples of jobs include doing a distributed calculation of independent processes, video batch conversions, etc. -
Business operation
(converting a video, charging a fee).
Important to note that a Job is NOT always an Operation, and therefore a Job Completion does NOT always equal an Operational Result. In some instances one job contains only one operation, but this is not always the case. To further understand this, let’s look at the job of charging a fee. Suppose that you have successfully charged a fee (operation) and then the network goes down. A scheduler (for example, Quartz) will not have received a signal confirming the operational result, and thus it marks the job as not completed (or failed or unknown status) and then attempts to run the job again. This is an example of an operation being done but the job is not complete because the operational result has not been recorded.
It is also important to note that a scheduler is not the only way to run a job. Jobs can be run locally or on-demand without any scheduler. It can be used for debugging, unit testing, and/or incident management.
Okay, back to the question of Scheduler Implementation. Like at any other interview, we need to break down the task at hand into smaller parts. We also have to clarify what are the functional and non-functional requirements:
- Will it involve a dive deep into the code or only the high-level architecture?
- What resources are available, and what resources are being used?
- Does the system have an existing payment gateway?
- Is it necessary to notify other systems about uploadings?
- Are the intended operations long-running or short-running?
- Are they Resource Consuming operations?
Payments
Any system that will affect a user’s money must be reliable and consistent. Performance latency is not critical (for example: being charged 5 mins later may not do much harm). In this task, our goal is to implement a job and model core classes.
First, let’s start with some parameters:
- 150k paid users in total
- Up to 5k daily charges.
- Payment gateway speed = One payment in 500ms or less.
Doing the math, this means that the entirety of the workload usually takes less than 40 mins a day (5k * 500ms) to complete. What will happen if our company suddenly had ten times more paid customers? Even so, the task completion time is still under 4 hours. This means that we can perform a job using just 1 thread (i.e all payments can be performed sequentially until all is done) instead of having to coordinate multiple threads in real time.
*** Note: It is essential for efficiency and productivity that we keep our programming simple. Less code leads to faster development and fewer mistakes, easier operation and less debugging, and ultimately an increased profit margin for the company.
It is important to consider some common errors and things that could go wrong:
- A User charge was not initiated and revenue was not collected.
- A User was charged twice and a refund is required. This can happen when two parallel jobs are tasked with processing the same subscription.
In our project, we already have a so-called payment service
or subscription service
set up. We want to reuse existing code, models, etc. in order to streamline the process. This will allow us to use tested and verified coding and modeling that can be tailored to the needs of our specific application.
/* The job simply looks for ALL subscriptions that need to be
charged, and then tries to charge a fee. We don’t know how, when,
and how often we will run the job yet. But we don’t need those
parameters in order to look at how to implement the job itself.
Also since we want to process the payments sequentially, but
don’t know how many will have to be processed, we can code it to
just charge until all the job is complete in an infinite loop. */
while (true)
{
/* Every payment should perform within its own scope
(database context and shared memory) to avoid any potential
mistake when one thread is affecting another. */
using var scope = _serviceScopeFactory.CreateScope();
var paymentsService = scope.ServiceProvider.GetRequiredService<IPaymentsService>();
try
{
/* The business operation can be split into two parts:
First we’re looking for the subscription, we skip
implementation of this method, but suppose the query
inside is looking for a subscription that is still active,
the last successful charge happened more than a month ago,
and there are no payment attempts within today. */
var subscriptionId = await paymentsService.GetOldestOverdueSubscriptionIdAsync(cancellationToken);
// After performing the search, if no subscriptions were found it means that we are done, the job is complete.
if (subscriptionId == default)
break;
/* If, after performing the search, a subscription is
found, the next thing to do is to charge money. We look
for payment details, check that no other payments are
being processed simultaneously for this subscription, then
lock the row and create a “payment attempt” entity. This
command will then be sent to the payment provider. The
result of charging will be a callback, in which all
necessary status updates are performed. But this is out of
the scope of our example job. */
await paymentsService.ChargeForNextPeriodAsync(subscriptionId, cancellationToken);
}
catch (Exception ex)
{
/* Next we want to ensure the program will catch all
errors. If we rethrow an exception during this process, it
should cause the job to fail. In our case, the job
completion happens only when we’ve processed all pending
subscriptions. */
}
}
public class PaymentAttempt
{
public int PaymentAttemptId { get; set; }
public decimal Amount { get; set; }
public string CurrencyId { get; set; }
/* Good practice is to save time zone information. If your
company has more than one developer, or if you collaborate
with other teams (or companies) in different time zones,
accounting for differences in time stamps is a critical
component of error-proof coding. Never assume that everyone
knows about your conventions (using UTC, or adding a UTC
prefix to the name). This will save you from countless
mistakes and headaches. */
public DateTimeOffset RequestedAt { get; set; }
public DateTimeOffset? AcknowledgedAt { get; set; }
public State StateId { get; set; }
// Remember to add the row version (concurrency token) to make sure no data is overridden by mistake.
[Timestamp]
public byte[] RowVersion { get; set; }
…
}
public class Subscription
{
public int SubscriptionId { get; set; }
public int CustomerId { get; set; }
…
/* In this example we are updating those properties
when a fee has been successfully charged. */
public DateTimeOffset PaidUntilDate { get; set; }
public int PaymentsLeftCount { get; set; }
[Timestamp]
public byte[] RowVersion { get; set; }
[ForeignKey(nameof(PaymentPlanID))]
public PaymentPlan PaymentPlan { get; set; }
public List<PaymentAttempts> PaymentAttempts { get; set; }
}
That’s it! The benefit of this approach is that the job does not depend on a particular framework or set of tools. It can be run as either a console application, or used in unit tests. What is most important to note is that concurrent jobs can affect subscriptions. So before sending a request to the payment gateway, always make sure that you have locked the resource. Every modern database supports distributed locks
, and there are hundreds of frameworks on top of them. Examples of this can be found at https://github.com/madelson/DistributedLock. It is not a complicated task, but it is one that absolutely needs to be done.
Uploading videos
50 videos each hour appear in sftp server. The time when a video becomes available is very important. Time is money, and sooner is always better. When a video is first uploaded, it is essential to make certain that other systems are notified. Just imagine having to wait for your favorite tv show/series because your streaming provider doesn’t understand this concept.
For this task we can't process videos sequentially in one thread like we did with payments. We also must be very concerned about data consistency between sftp/database/published events in order to prevent disruptions to our operational results.
Instead of writing the code for this case, this time we will focus on an exploration of the high-level components.
Successful job completion will require 3 simple steps:
- At job commencement the program looks for the first file that is not currently locked by another job. To achieve that, we lock the file (for example, with write access, even though we’re going to read it only we limit access to other threads), and then upload the file to cloud storage if it’s not there yet.
- Create a record in the database (sql) if it doesn’t exist yet.
- Delete the file from sftp.
Aside from that, we have Kafka Connect
with the sql connector, which waits for all insert commands in the table and sinks to other systems in the company. Sounds simple,right? Remember, we don't always have to write code to solve problems, keep this in mind.
If something fails and the job stops working on any step, another process (job) will repeat the sequence of operations and data/files will not be lost or duplicated.
The benefits of using this approach are:
- Files will be uploaded only once.
- When a file is uploaded, we will have the record in the database.
- Using Change Data Capture (CDC), the file uploading event will be dispatched to other subsystems through kafka.
Scheduler
Let's now talk a bit about how to run those jobs.
A self-made scheduler is never the better option when compared to in-app schedulers. Because a scheduler is domain agnostic you most likely can’t bring anything new to existing frameworks.
So in-app schedulers or out of process? Which do you choose? Well, it mostly depends on the team and current tools you use.
In-app frameworks (quartz, hangfire)
- It’s polling the database, locks table and so one. It creates additional load on the database (it’s not critical, but anyway makes the system less responsive).
- If you have many domain services, you’ll need to install a scheduler to each of them -> many duplicated non-domain tables in each database.
- It will store serialized information about jobs. It is a less flexible approach since you have to think about versioning.
- A code change is often required if you need to add or change the job, either through the creation of additional endpoints in your API, or done each time during program startup.
- It can guarantee concurrency on the job level using a database for distributed locks, but you still need to think about it in your business operations.
- It requires external storage.
- The code depends on the framework, interfaces, attributes, and so on. Also, the frameworks often use reflection to run jobs. This can cause additional problems related to DI containers.
- It is language specific. If you want to use other languages, you’ll need to use another scheduler. This can severely complicate company operations.
External scheduler (k8s cronjobs)
- It is heavily dependent on infrastructure, experience in jobs configuration, and the tech maturity of your team.
- It has poorly built-in analytics and visualization tools.
- Cannot guarantee concurrency.
Let’s talk a minute about what I mean by “team tech maturity”?
Let’s imagine that we decided to run a payment job every hour with max concurrency set to 1, and a video job every 1 second with max concurrency set to 50 (which is impossible if you have 1 instance of your app with an in-app scheduler, you simply may not have enough RAM). You set activeDeadlineSeconds
to 1 hour (you suppose your job is always shorter than 1 hour).
What could go wrong?
Well, to start with, CronJob won't run again after being suspended long enough unless you are applying the correct startingDeadlineSeconds
property. What this means is that if the job takes more than 1h to complete for some reason, and it happens several times, CronJob may decide not to run it anymore based on default properties! So you must be very careful to avoid such mistakes.
Conclusion
If you have an old monolith .net application, if the core of your team are junior-middle developers, if you don’t have mature infrastructure (e.g. mostly use netlify or firebase), then an in-app scheduler (quartz or hangfire) is acceptable. In fact, in some cases it may even be preferable. But in light of updated applications, tech maturity, and developed infrastructure, I’d go for external schedulers (k8s cronjobs) every time.
As you can see, when we are talking about scheduling, we are talking about much more than just setting up a simple timer. Keep your perspective true, think from the up bottom (from infrastructure and ending concurrency tokens in the database) and back up again. Do this and you will make sure your implementation will work fine, even if the environment changes.
Top comments (0)