This learning project has for purpose to distribute task into worker separated by category or put them in queue if all worker are busy. It also load balance the queues, and save the task in Mnesia.
It is done to have distinctive logic between the request/response elements (such as requesting a web page), and the request which doesn't need a response (such as sending e-mail).
Note: As Elixir run everything by allocating the same CPU availability to each process, this tool is not necessary, as it can be in languages such as Ruby (which needs Sidekiq (or other)).
This project was done in March 2020 as part of a private project. It was extracted and put in this Phoenix app and on Github to give you access to it.
Only minor changes (some documentation, the README), which I feel were needed to understand how the JobScheduler works, where added on the initial commit.
As the Application launch at the same time Mnesia and the WorkerBalancer (which depends of Mnesia), it might happens that, when launching the server, Mnesia is called last and the WorkerBalancer don't get the data it needs and shut down the system.
But the logic behind all of that (which is, for now, what interest us the most) is still relevant.
- Before continuing
- Current problem
- Summary
- How does it works?
- Technical decision
- Install
- Testing it
- TODO
- Note for later
- Application: Main supervisor, common to most Elixir project.
- Dispatch: Entry point of the JobScheduler, it's a simple module which redirect the task to the correct worker.
- Waiter: Simple Genserver made to be called with after a certain time. Once the time is over, it calls the
Dispatcher
. - WorkerBalancer: Genserver which reevaluate the need of worker for each category and send message to allocate them.
- CategorySupervisor: Generate associated
FifoAgent
,WorkerAgent
andWorkerSupervisor
. - FifoAgent: Agent storing an erlang :queue, acting as a fifo storage.
- WorkerAgent: Agent storing a map of worker PIDs and interacting directly with them and the
WorkerSupervisor
of the same category. - WorkerSupervisor: Starts and terminates workers.
- Worker: Genserver handling the logic. It is by default iddle and waiting for new task if the
FifoAgent
doesn't have anything available.
- Table: Behaviour module to define the base functions for a new struct file to work.
- Server: Generser connected to :mnesia, handling all calls.
- Table.ServiceWorker: Module with a structure representing generic task and associated action a
Worker
will have to execute.
- Start the application and assocaited processes, including
- Mnesia server
- Waiter
- Every CategorySupervisor (1 per category)
- Starts category's
FifoAgent
- The
FifoAgent
calls Mnesia to get and store all existing unfinished task of its category
- The
- Starts category's
WorkerAgent
- Starts category's
WorkerSupervisor
- Starts category's
- WorkerBalancer
- At the end of the
WorkerBalancer
initialization, the first worker count per category is calculate. - It sends the result to the
WorkerAgent
- It calls
WorkerSupervisor
spawn the worker, and save the PIDs - Workers at spawn take an element from the
FifoAgent
if there are any, otherwise they are set to:free
- At the end of the
- Call
Dispatcher.dispatch()
- If there is a timer, send a message to
Waiter
, which will recal theDispatcher.dispatch()
function at the end of the timer
- If there is a timer, send a message to
- The
Dispatcher
checks in which category a task should be executed and send the task to the associatedWorkerAgent
- The
WorkerAgent
checks if there is any worker is:free
- If there is no worker
:free
, it sends the task to the FifoAgent
- If there is no worker
- If/when the worker is/become
:free
, the worker takes the task and execute it, changing its state to:busy
- If there is no task, the worker call the WorkerAgent to change its status to
:free
- If there is no task, the worker call the WorkerAgent to change its status to
The WorkerBalancer
doesn't only calculate the workers at the end of the initialization, but also every N seconds after it. The calcul is the following one
- It takes the number of tasks in the category and multiply it by number associted to a priority.
- It take the number of available worker and multiply it by the previous result.
- It divides the previous result by the total number of weighted tasks
There is also a notion of minimum worker available at all time which isn't used yet.
Note: the dispatcher isn't yet perfected, and may not work for the moment
Some decision were taken for this project which may be discussed. I don't know if they represent the best way of doing things, but they are still made with some ideas in mind.
The phoenix server is here to, later, be able to have a web interface to verify if everything works great in a more visual way.
It doesn't add anything for now but the /dashboard
which itself is still something.
DB directly integrated into Erlang, it is set up to persist the data on disc should the server be down and the cache erased.
I was curious about it and wanted to learn to use it.
Fist In First Out separated in category. Each category as its own fifo queue which allows it to get the latest jobs associated to its purpose.
There is no define way to pick the category. It can be named after the jobs it as to execute (heavy calculation, email sending...) or based on the priority.
Though Elixir processes are concurrent (and thread parallel) and the calcul power evenly shared, having too many processes could still impact the performance of the main application.
By limiting the amount of workers, and keeping the heavy tasks in the workers (combined with memoization), you assure a more stable state of you server, at the cost of, sometimes, some seconds before executing a task.
It was decided to include a load balancer on the level of the WorkerSupervisor to adapt to the current charges of each category depending of the importance level
Though it is not a necesity, it is still a nice thing to have in some pike condition (such as sending an email to all your users).
mix deps.get
As most of the elements are GenServers, and as I didn't find anything at the time to test them, there is no test available.
I recently found a ressource about testing the genserver with erlang trace, but it wasn't published at the time of creating the project (March vs August).
iex -S mix
# Once you're in the shell of the console
JobScheduler.Tester.generate_jobs(1000)
- Stabilise the launch at the start of the application
- Validate the dispatcher
- Automate the test through a temporary staging env, and apply chaos engineernig (check this video?)
- Handle the LATER elements
- Add a web interface to generate and visualize worker dispatch and usage
- Add time consuming code to improve the tests
- In the
app/worker/config/dev
switch the timer (uncomment one and comment the other) - Add or remove workers depending of your need (see the Not yet session)
- Call
Process.send(:worker_queue_worker_balancer, :work, [])