DEV Community

Kir Axanov
Kir Axanov

Posted on

Code. Gleam. Parallel tasks from pool

Hi!

Usecase and terms

Sometimes you want to split a heavy job into multiple pieces to run in parallel, but you cannot predict the amount of such smaller jobs.
So, we need a function, that will tell when to stop spawning new jobs.
Also, we will stop spawning new jobs if we got any error (already running jobs will continue until finished).

  • job is something, that you want to execute in parallel (the smaller pieces)
  • task is any data, which is passed as argument to a job

As an example, imagine that you need to import some huge amount of (unlinked) time-based data. We can run, say, 20 parallel processes at once. Each process will take a time window for a day - that day's start timestamp is a task for those processes (jobs). We run those jobs until we import all the data or encounter any errors.

Code

We will use a OTP-actor, which I named ParaWorker (parallel worker).

Here are the types in types.gleam:

import gleam/option.{ type Option }
import gleam/erlang/process


pub type ParaWorkerMsg(err) {
  TaskEnqueued
  TaskCompleted
  TaskFailed(error: err)
}

pub type ParaWorkerState(task, err) {
  ParaWorkerState(
    worker_name: process.Name(ParaWorkerMsg(err)),
    running_jobs: Int,
    next_task: Option(task),
    new_task: fn(task) -> Option(task),
    handle_task: fn(task) -> Result(Nil, err),
    errors: List(err),
    finish_subject: process.Subject(Result(Nil, List(err))),  // Where ParaWorker should send the results of handling all tasks.
  )
}
Enter fullscreen mode Exit fullscreen mode

And here is the ParaWorker in paraworker.gleam:

import gleam/string
import gleam/io
import gleam/otp/actor
import gleam/erlang/process
import gleam/result.{ try }
import gleam/option.{ type Option, Some, None }

import types as t


pub fn start(
  first_task: task,
  note: String,
  max_jobs: Int,
  new_task: fn(task) -> Option(task),
  handle_task: fn(task) -> Result(Nil, err),
) -> Result(Nil, List(err)) {
  case max_jobs {
    max_jobs if max_jobs <= 0 -> panic as "To start ParaWorker, `max_jobs` should be greater than 0!"
    _ -> Nil
  }

  let worker_name = process.new_name("parallel_worker")
  let finish_subject = process.new_subject()

  let _ = t.ParaWorkerState(
    worker_name:,
    running_jobs: 0,
    next_task: Some(first_task),
    new_task:,
    handle_task:,
    errors: [],
    finish_subject:,
  )
  |> actor.new()
  |> actor.on_message(handle_msg)
  |> actor.named(worker_name)
  |> actor.start()
  |> try(fn(_) { io.println("Started `" <> note <> "` ParaWorker.") |> Ok() })
  |> result.try_recover(fn(error) {
    io.println("Failed to start `" <> note <> "` ParaWorker.")
    Error(error)
  })

  start_handling(worker_name, max_jobs)
  let results = process.receive_forever(finish_subject)  // Waiting until all tasks are handled.

  io.println("Finished `" <> note <> "` ParaWorker with: `" <> string.inspect(results) <> "`.")
  results
}

fn start_handling(
  actor_name: process.Name(t.ParaWorkerMsg(err)),
  count: Int,
) -> Nil {
  actor_name
  |> process.named_subject()
  |> enqueue__loop(count)
}

fn enqueue__loop(
  subject: process.Subject(t.ParaWorkerMsg(err)),
  count: Int,
) -> Nil {
  case count {
    0 -> Nil
    count -> {
      process.send(subject, t.TaskEnqueued)
      enqueue__loop(subject, count - 1)
    }
  }
}

fn handle_msg(
  actor_state: t.ParaWorkerState(task, err),
  msg: t.ParaWorkerMsg(err),
) -> actor.Next(t.ParaWorkerState(task, err), t.ParaWorkerMsg(err)) {
  let t.ParaWorkerState(running_jobs:, next_task:, errors:, finish_subject:, ..) = actor_state

  case msg, running_jobs, next_task, errors {
    // New task is added for handling (during worker starting) - spawn new job and continue handling.
    t.TaskEnqueued, _, _, _ -> {
      start_job(actor_state)
      |> actor.continue()
    }

    // Finished the last job - stop and return errors if any.
    // `running_jobs` can't be 0, cause `first_task` is always `Some()`.
    _, 1, None, _ |  // No more tasks.
    t.TaskFailed(_), 1, _, _ |  // Got new error.
    _, 1, _, [_h, .._t] -> {  // Have any error.
      let results = case msg, errors {
        t.TaskFailed(error), errors -> Error([error, ..errors])
        _, [] -> Ok(Nil)
        _, errors -> Error(errors)
      }

      process.send(finish_subject, results)
      actor.stop()
    }

    // No errors found and have the next task - spawn new job and continue handling.
    t.TaskCompleted, _, Some(_), [] -> {
      t.ParaWorkerState(..actor_state, running_jobs: running_jobs - 1)
      |> start_job()
      |> actor.continue()
    }

    // A job returned error - wait till all running jobs finish.
    t.TaskFailed(error), running_jobs, _, errors -> {
      t.ParaWorkerState(..actor_state, running_jobs: running_jobs - 1, errors: [error, ..errors])
      |> actor.continue()
    }

    // No next task - wait till all running jobs finish.
    _, running_jobs, None, _ -> {
      t.ParaWorkerState(..actor_state, running_jobs: running_jobs - 1)
      |> actor.continue()
    }

    // Have some errors - wait till all running jobs finish.
    _, running_jobs, _, [_h, .._t] -> {
      t.ParaWorkerState(..actor_state, running_jobs: running_jobs - 1)
      |> actor.continue()
    }
  }
}

fn start_job(
  actor_state: t.ParaWorkerState(task, err),
) -> t.ParaWorkerState(task, err) {
  let t.ParaWorkerState(running_jobs:, next_task: task, new_task:, handle_task:, ..) = actor_state

  case task {
    Some(task) -> {
      let next_task = new_task(task)

      process.spawn(fn() {
        let msg = case handle_task(task) {
          Ok(_) -> t.TaskCompleted
          Error(error) -> t.TaskFailed(error)
        }

        actor_state.worker_name
        |> process.named_subject()
        |> process.send(msg)
      })

      t.ParaWorkerState(..actor_state, running_jobs: running_jobs + 1, next_task:)
    }
    None -> actor_state
  }
}
Enter fullscreen mode Exit fullscreen mode

Usage example

Here we count from 1 up to 10. First time without errors, second and third time with one error. Pay attention that already running jobs continue even after the error raised, but no new jobs started after. The third run shows that no new jobs started immediately after the error when we only use one job at a time:

import gleam/io
import gleam/int
import gleam/string
import gleam/erlang/process

import paraworker


pub fn main() {
  let new_task = fn(task: Int) {
    case task {
      task if task >= 10 -> None
      task -> Some(task + 1)
    }
  }

  let handle_task_ok = fn(task: Int) {
    io.println(timestamp() <> " Counted up to " <> int.to_string(task) <> ".")
    process.sleep(300)
    Ok(Nil)
  }

  let handle_task_err = fn(task: Int) {
    io.println(timestamp() <> " Counted up to " <> int.to_string(task) <> ".")
    process.sleep(300)
    case task {
      4 -> Error("1, 2, 3, 4... wait what... where is bleem?!")
      _ -> Ok(Nil)
    }
  }

  let _ = paraworker.start(1, "demo-ok", 3, new_task, handle_task_ok)
  io.println("")
  let _ = paraworker.start(1, "demo-error", 3, new_task, handle_task_err)
  io.println("")
  let _ = paraworker.start(1, "demo-error-single", 1, new_task, handle_task_err)
}

// Using erlang function for example's portability - replace by something like `birl` for convenience.
@external(erlang, "erlang", "timestamp")
fn erl_timestamp() -> #(Int, Int, Int)

fn timestamp() -> String {
  let erl_time = erl_timestamp()
  int.to_string(erl_time.0) <> int.to_string(erl_time.1) <> "." <> int.to_string(erl_time.2) |> string.slice(0, 3)
}
Enter fullscreen mode Exit fullscreen mode

Bye!

Top comments (0)