Orchestrating Dependent Tasks with Elixir
Last time I gave a brief high level overview on managing a job pipeline (DAG) with Elixir. Today I’m going to give a fully native Elixir example. I know I promised a Rails integration example last time but this grew much longer than I had originally though. The Rails + Sidekick + Verk example will have to wait for next time.
This post is a bit on the long side so here are some short cuts in case you want to jump around:
The Pipeline #
First we need to define our pipeline. Let’s stick to the original simple DAG from the previous example, although I am happy to report that more complex graphs are now supported!
This time lets give the jobs more realistic names to help keep things straight between prose and the diagrams.
DNS --------|
|------------> S3 ------------> GEN
SSL --------|
Native Elixir #
Lets write a few simple modules to handle building the Job
records. To start these modules will simply have a function to translate our arguments into an AssemblyLine.Job
struct which we can give to an AssemblyLine.JobQueue.Server
.
defmodule DNS do
alias AssemblyLine.Job
def new_job(task_name, record_type, value) do
%Job{task: task_name, worker: DNS, args: [record_type, value]}
end
end
defmodule SSL do
alias AssemblyLine.Job
def new_job(task_name, domain, sig_data) do
%Job{task: task_name, worker: SSL, args: [domain, sig_data]}
end
end
defmodule S3 do
alias AssemblyLine.Job
def new_job(task_name, bucket_name) do
%Job{task: task_name, worker: S3, args: [bucket_name]}
end
end
defmodule GEN do
alias AssemblyLine.Job
def new_job(task_name) do
%Job{task: task_name, worker: GEN}
end
end
Now that we are able to format our data in a way that AssemblyLine
can understand we need to look at how to build the actual Job Pipeline. Using the graph we defined in the pipeline section we need one job of each type, we also have a pretty clear dependency structure.
We need a GEN
job, but that can’t be run until we’ve run the S3
job which itself can’t run until the DNS
and SSL
jobs have run. As discussed in the previous post we can represent our graph with the following list: [[DNS, SSL], S3, GEN]
, but remember the Job Queue Server expects Job
structs (or binaries). To build the appropriate list we would do the following:
job_list = [
[
DNS.new_job("add cname", :c_name, ["www.we.us", "we.us"]),
SSL.new_job("purchase cert", :new_cert, %ssl_stuff{})
],
S3.new_job("build bucket", "demo_bucket"),
GEN.new_job("create defaults")
]
You may have noticed that in the new_job
functions we set the parent Module as the worker
value. We are going to be using the Handler
Module that comes with AssemblyLine for the processing examples. This means we need to define a perform
function for each Module. The behavior for each will obviously be different, I’ll give a quick example below of a very simple perform
for the DNS
Module
defmodule DNS do
...
def perform(record_type, value) do
case modify_zone(record_type, value) do
:ok ->
{:ok, record_type}
_ ->
exit(:normal)
end
end
end
With the final piece in place we can create a Queue and use the Handler
Module to process it.
alias AssemblyLine.JobQueue.Supervisor
alias AssemblyLine.JobQueue.Handler
Supervisor.start_queue("awesome.com create", job_list)
Handler.start_all("awesome.com create")
That’s it! The start_all
function will either return :finished
if all jobs succeed or it will return {:incomplete, [bad_jobs]}
where bad_jobs
is a list of the job or jobs that caused processing to halt.
Non-Trivial Graphs #
Up to this point we’ve covered the simplest graph structure, as that is really all AssemblyLine
would would optimize. However I recently pushed changes that allow for optimal processing of more complex graph structures with a bit of reduction work on your part.
Let’s look at a slightly more complex graph:
A ---------> B ---------|
|----------> D
C ---------|
If we were to use the previous technique we could preserve the dependency requirements with the following list [A, [B, C], D]
however if A
happens to be sufficiently slow then C
is blocked for no reason. However you can now include a reference to another Server process in the work list. Which means the above graph can be structured thusly:
alias AssemblyLine.JobQueue.Supervisor
Supervisor.start_queue("inner", [A, B])
Supervisor.start_queue("outer", [[C, "inner"], D])
When processing the Pipeline you only need to interface with the "outer"
Server process. It will delegate to the nested process for job retrieval and completion.
Note: When dealing with nested queues be careful with the complete_current_set
function as that currently has the potential to pop an incomplete subgraph off the list.