Hi!
Usecase and terms
Sometimes you want to split a heavy job into multiple pieces to run in parallel, but you cannot predict the amount of such smaller jobs.
So, we need a function, that will tell when to stop spawning new jobs.
Also, we will stop spawning new jobs if we got any error (already running jobs will continue until finished).
- job is something, that you want to execute in parallel (the smaller pieces)
- task is any data, which is passed as argument to a job
As an example, imagine that you need to import some huge amount of (unlinked) time-based data. We can run, say, 20 parallel processes at once. Each process will take a time window for a day - that day's start timestamp is a task for those processes (jobs). We run those jobs until we import all the data or encounter any errors.
Code
We will use a OTP-actor, which I named ParaWorker
(parallel worker).
Here are the types in types.gleam
:
import gleam/option.{ type Option }
import gleam/erlang/process
pub type ParaWorkerMsg(err) {
TaskEnqueued
TaskCompleted
TaskFailed(error: err)
}
pub type ParaWorkerState(task, err) {
ParaWorkerState(
worker_name: process.Name(ParaWorkerMsg(err)),
running_jobs: Int,
next_task: Option(task),
new_task: fn(task) -> Option(task),
handle_task: fn(task) -> Result(Nil, err),
errors: List(err),
finish_subject: process.Subject(Result(Nil, List(err))), // Where ParaWorker should send the results of handling all tasks.
)
}
And here is the ParaWorker in paraworker.gleam
:
import gleam/string
import gleam/io
import gleam/otp/actor
import gleam/erlang/process
import gleam/result.{ try }
import gleam/option.{ type Option, Some, None }
import types as t
pub fn start(
first_task: task,
note: String,
max_jobs: Int,
new_task: fn(task) -> Option(task),
handle_task: fn(task) -> Result(Nil, err),
) -> Result(Nil, List(err)) {
case max_jobs {
max_jobs if max_jobs <= 0 -> panic as "To start ParaWorker, `max_jobs` should be greater than 0!"
_ -> Nil
}
let worker_name = process.new_name("parallel_worker")
let finish_subject = process.new_subject()
let _ = t.ParaWorkerState(
worker_name:,
running_jobs: 0,
next_task: Some(first_task),
new_task:,
handle_task:,
errors: [],
finish_subject:,
)
|> actor.new()
|> actor.on_message(handle_msg)
|> actor.named(worker_name)
|> actor.start()
|> try(fn(_) { io.println("Started `" <> note <> "` ParaWorker.") |> Ok() })
|> result.try_recover(fn(error) {
io.println("Failed to start `" <> note <> "` ParaWorker.")
Error(error)
})
start_handling(worker_name, max_jobs)
let results = process.receive_forever(finish_subject) // Waiting until all tasks are handled.
io.println("Finished `" <> note <> "` ParaWorker with: `" <> string.inspect(results) <> "`.")
results
}
fn start_handling(
actor_name: process.Name(t.ParaWorkerMsg(err)),
count: Int,
) -> Nil {
actor_name
|> process.named_subject()
|> enqueue__loop(count)
}
fn enqueue__loop(
subject: process.Subject(t.ParaWorkerMsg(err)),
count: Int,
) -> Nil {
case count {
0 -> Nil
count -> {
process.send(subject, t.TaskEnqueued)
enqueue__loop(subject, count - 1)
}
}
}
fn handle_msg(
actor_state: t.ParaWorkerState(task, err),
msg: t.ParaWorkerMsg(err),
) -> actor.Next(t.ParaWorkerState(task, err), t.ParaWorkerMsg(err)) {
let t.ParaWorkerState(running_jobs:, next_task:, errors:, finish_subject:, ..) = actor_state
case msg, running_jobs, next_task, errors {
// New task is added for handling (during worker starting) - spawn new job and continue handling.
t.TaskEnqueued, _, _, _ -> {
start_job(actor_state)
|> actor.continue()
}
// Finished the last job - stop and return errors if any.
// `running_jobs` can't be 0, cause `first_task` is always `Some()`.
_, 1, None, _ | // No more tasks.
t.TaskFailed(_), 1, _, _ | // Got new error.
_, 1, _, [_h, .._t] -> { // Have any error.
let results = case msg, errors {
t.TaskFailed(error), errors -> Error([error, ..errors])
_, [] -> Ok(Nil)
_, errors -> Error(errors)
}
process.send(finish_subject, results)
actor.stop()
}
// No errors found and have the next task - spawn new job and continue handling.
t.TaskCompleted, _, Some(_), [] -> {
t.ParaWorkerState(..actor_state, running_jobs: running_jobs - 1)
|> start_job()
|> actor.continue()
}
// A job returned error - wait till all running jobs finish.
t.TaskFailed(error), running_jobs, _, errors -> {
t.ParaWorkerState(..actor_state, running_jobs: running_jobs - 1, errors: [error, ..errors])
|> actor.continue()
}
// No next task - wait till all running jobs finish.
_, running_jobs, None, _ -> {
t.ParaWorkerState(..actor_state, running_jobs: running_jobs - 1)
|> actor.continue()
}
// Have some errors - wait till all running jobs finish.
_, running_jobs, _, [_h, .._t] -> {
t.ParaWorkerState(..actor_state, running_jobs: running_jobs - 1)
|> actor.continue()
}
}
}
fn start_job(
actor_state: t.ParaWorkerState(task, err),
) -> t.ParaWorkerState(task, err) {
let t.ParaWorkerState(running_jobs:, next_task: task, new_task:, handle_task:, ..) = actor_state
case task {
Some(task) -> {
let next_task = new_task(task)
process.spawn(fn() {
let msg = case handle_task(task) {
Ok(_) -> t.TaskCompleted
Error(error) -> t.TaskFailed(error)
}
actor_state.worker_name
|> process.named_subject()
|> process.send(msg)
})
t.ParaWorkerState(..actor_state, running_jobs: running_jobs + 1, next_task:)
}
None -> actor_state
}
}
Usage example
Here we count from 1 up to 10. First time without errors, second and third time with one error. Pay attention that already running jobs continue even after the error raised, but no new jobs started after. The third run shows that no new jobs started immediately after the error when we only use one job at a time:
import gleam/io
import gleam/int
import gleam/string
import gleam/erlang/process
import paraworker
pub fn main() {
let new_task = fn(task: Int) {
case task {
task if task >= 10 -> None
task -> Some(task + 1)
}
}
let handle_task_ok = fn(task: Int) {
io.println(timestamp() <> " Counted up to " <> int.to_string(task) <> ".")
process.sleep(300)
Ok(Nil)
}
let handle_task_err = fn(task: Int) {
io.println(timestamp() <> " Counted up to " <> int.to_string(task) <> ".")
process.sleep(300)
case task {
4 -> Error("1, 2, 3, 4... wait what... where is bleem?!")
_ -> Ok(Nil)
}
}
let _ = paraworker.start(1, "demo-ok", 3, new_task, handle_task_ok)
io.println("")
let _ = paraworker.start(1, "demo-error", 3, new_task, handle_task_err)
io.println("")
let _ = paraworker.start(1, "demo-error-single", 1, new_task, handle_task_err)
}
// Using erlang function for example's portability - replace by something like `birl` for convenience.
@external(erlang, "erlang", "timestamp")
fn erl_timestamp() -> #(Int, Int, Int)
fn timestamp() -> String {
let erl_time = erl_timestamp()
int.to_string(erl_time.0) <> int.to_string(erl_time.1) <> "." <> int.to_string(erl_time.2) |> string.slice(0, 3)
}
Bye!
Top comments (0)