DEV Community

Cover image for Building a Rails Engine #7 — The Orchestrator: Coordinating the Workflow
Seryl Lns
Seryl Lns

Posted on

Building a Rails Engine #7 — The Orchestrator: Coordinating the Workflow

The Orchestrator: Coordinating the Import Workflow

How to coordinate parsing, validation, and persistence through a single class that manages state transitions, handles errors per-record, and integrates with ActiveJob.

Context

This is part 7 of the series where we build DataPorter, a mountable Rails engine for data import workflows. In part 6, we built the DataImport model to track import state and the Source layer to parse CSV files into mapped hashes.

We now have all the individual pieces: targets describe imports, sources parse files, the RecordValidator checks column constraints, and DataImport tracks state. But nothing ties them together. In this article, we build the Orchestrator -- the class that coordinates the full parse-then-import workflow.

The problem

Without an orchestration layer, the import workflow ends up as 80 lines of procedural code in a controller action:

def import
  import_record.update!(status: :parsing)
  source = DataPorter::Sources::Csv.new(import_record, content: file.read)
  rows = source.fetch
  records = []

  rows.each_with_index do |row, i|
    record = build_import_record(row, i)
    record = target.transform(record)
    target.validate(record)
    validate_columns(record, target.columns)
    records << record
  end

  import_record.update!(records: records, status: :previewing)
  # ... 40 more lines for the import phase, error handling, reporting
end
Enter fullscreen mode Exit fullscreen mode

Parse the file. Loop the rows. Validate each one. Handle errors somehow. Update the status. Build a report. Send a notification. Pray nothing crashes between step 3 and step 7.

If this lives in a controller, it is untestable and impossible to run in the background. If it lives in the model, DataImport becomes a god object. We need a dedicated coordination layer that knows the order of operations but delegates the details to the objects that own them.

How it fits together

The Orchestrator coordinates two phases, each following the same pattern: transition the status, delegate work to the right objects, handle failures.

parse!                                import!
──────                                ───────
status → parsing                      status → importing
    │                                     │
    ▼                                     ▼
Source.fetch                          for each importable record
    │                                     │
    ▼                                     ├─ target.persist(record)
for each row                              │   ├─ success → created++
    ├─ extract_data                       │   └─ error → record.add_error
    ├─ target.transform                   │              target.on_error
    ├─ target.validate                    │
    └─ validator.validate                 ▼
    │                                 target.after_import(results)
    ▼                                     │
status → previewing                       ▼
report generated                      status → completed
                                      report updated
Enter fullscreen mode Exit fullscreen mode

The user reviews parsed records between the two phases. This preview checkpoint is what makes the import workflow safe -- data never touches the database until the user confirms.

What we're building

Here is the Orchestrator in action -- two method calls that drive the entire workflow:

# In a controller or job
orchestrator = DataPorter::Orchestrator.new(data_import, content: csv_string)

# Step 1: Parse the file, validate records, generate a preview
orchestrator.parse!
data_import.status   # => "previewing"
data_import.records  # => [ImportRecord, ImportRecord, ...]

# Step 2: After user reviews the preview, persist the records
orchestrator.import!
data_import.status   # => "completed"
data_import.report.imported_count  # => 42
Enter fullscreen mode Exit fullscreen mode

Two methods, two phases. parse! turns raw data into validated records and stops at previewing so the user can review. import! takes the importable records and persists them through the target's persist method. If anything goes catastrophically wrong, the import transitions to failed with an error report.

Implementation

Step 1 -- The Orchestrator skeleton and parse phase

The Orchestrator is a plain Ruby object. It receives a DataImport and optional content (for testing), then delegates to the pieces we already built:

# lib/data_porter/orchestrator.rb
class Orchestrator
  def initialize(data_import, content: nil)
    @data_import = data_import
    @target = data_import.target_class.new
    @source_options = { content: content }.compact
  end

  def parse!
    @data_import.parsing!
    records = build_records
    @data_import.update!(records: records, status: :previewing)
    build_report
  rescue StandardError => e
    handle_failure(e)
  end
end
Enter fullscreen mode Exit fullscreen mode

The constructor instantiates the target (so we can call its hooks) and compacts the source options (so content: nil does not override ActiveStorage downloads). The parse! method follows a strict sequence: transition to parsing, build and validate records, save them with a previewing status, then generate a summary report. The rescue at the method level catches any failure -- a malformed CSV, a missing file, an unexpected source error -- and transitions the import to failed with the error message preserved in the report.

Notice that parsing! is called before the work starts. This is intentional: if the job crashes between the status transition and the update!, the import is left in parsing rather than pending, signaling to the user that something went wrong mid-process rather than silently sitting idle.

Step 2 -- Building and validating records

The build_records method is where the Source, Target, and RecordValidator converge:

# lib/data_porter/orchestrator.rb
def build_records
  source = Sources.resolve(@data_import.source_type)
                  .new(@data_import, **@source_options)
  raw_rows = source.fetch
  columns = @target.class._columns || []
  validator = RecordValidator.new(columns)

  raw_rows.each_with_index.map do |row, index|
    build_record(row, index, columns, validator)
  end
end

def build_record(row, index, columns, validator)
  record = StoreModels::ImportRecord.new(
    line_number: index + 1,
    data: extract_data(row, columns)
  )
  record = @target.transform(record)
  @target.validate(record)
  validator.validate(record)
  record.determine_status!
  record
end

def extract_data(row, columns)
  columns.each_with_object({}) do |col, hash|
    hash[col.name] = row[col.name]
  end
end
Enter fullscreen mode Exit fullscreen mode

Each row goes through a four-step pipeline: extract_data picks only the values matching declared columns (ignoring any extra data in the row), transform lets the target normalize values (e.g., formatting phone numbers), validate runs target-specific business rules, and the RecordValidator checks structural constraints (required fields, type checks). Finally, determine_status! sets each record to complete, partial, or missing based on whether errors were added.

The RecordValidator handles the generic constraints we defined in the column DSL:

# lib/data_porter/record_validator.rb
class RecordValidator
  def initialize(columns)
    @columns = columns
  end

  def validate(record)
    @columns.each do |col|
      value = record.data[col.name]
      validate_required(record, col, value)
      validate_type(record, col, value)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

This separation matters: the target owns business-rule validations ("email must be unique in the system"), while the RecordValidator owns structural validations ("email must look like an email"). Neither knows about the other.

Step 3 -- The import phase and per-record error handling

Once the user reviews the preview and confirms, import! persists the records:

# lib/data_porter/orchestrator.rb
def import!
  @data_import.importing!
  results = import_records
  update_import_report(results)
  @target.after_import(results, context: build_context)
rescue StandardError => e
  handle_failure(e)
end

def persist_record(record, context, results)
  @target.persist(record, context: context)
  results[:created] += 1
rescue StandardError => e
  record.add_error(e.message)
  @target.on_error(record, e, context: context)
  results[:errored] += 1
end
Enter fullscreen mode Exit fullscreen mode

The critical design decision here is the error boundary. Each record is persisted individually, and if persist raises -- a uniqueness violation, a foreign key constraint, a custom validation from the host app -- the error is captured on that record and the import continues. The import does not wrap everything in a single transaction. This means a 10,000-row file with 3 bad records will successfully import 9,997 records rather than rolling back the entire batch.

The on_error hook lets the target react to failures (logging, notifying, skipping related records), while after_import runs once after all records are processed, receiving the results hash for summary work like sending a confirmation email.

The remaining private methods handle the plumbing:

# lib/data_porter/orchestrator.rb (private methods)
def import_records
  context = build_context
  results = { created: 0, errored: 0 }

  @data_import.importable_records.each do |record|
    persist_record(record, context, results)
  end

  @data_import.save!
  results
end

def build_context
  { user: @data_import.user, import: @data_import }
end

def handle_failure(error)
  @data_import.update!(
    status: :failed,
    report: @data_import.report.tap { |r| r.error = error.message }
  )
end

def update_import_report(results)
  @data_import.report.assign_attributes(
    imported_count: results[:created],
    errored_count: results[:errored]
  )
  @data_import.update!(status: :completed)
end
Enter fullscreen mode Exit fullscreen mode

build_context provides the host app context that targets receive in their hooks -- the current user and the import record itself. handle_failure is the safety net: any unrecoverable error (source cannot be read, target cannot be resolved) transitions the import to failed with the error message preserved. update_import_report writes the final counts and marks the import as completed.

Step 4 -- ActiveJob integration

The Orchestrator is designed to be called from anywhere, but its primary consumer is a pair of ActiveJob classes:

# app/jobs/data_porter/parse_job.rb
class ParseJob < ActiveJob::Base
  queue_as { DataPorter.configuration.queue_name }

  def perform(import_id)
    data_import = DataImport.find(import_id)
    Orchestrator.new(data_import).parse!
  end
end

# app/jobs/data_porter/import_job.rb
class ImportJob < ActiveJob::Base
  queue_as { DataPorter.configuration.queue_name }

  def perform(import_id)
    data_import = DataImport.find(import_id)
    Orchestrator.new(data_import).import!
  end
end
Enter fullscreen mode Exit fullscreen mode

Each job is a one-liner: find the import, delegate to the Orchestrator. The queue name comes from the engine's configuration, so the host app controls which queue processes imports. Because the Orchestrator already handles failures internally (transitioning to failed and recording the error), the jobs do not need their own error handling -- a crash at the ActiveJob level means something truly unexpected happened, and the adapter's retry mechanism takes over.

Decisions & tradeoffs

Decision We chose Over Because
Coordination layer Dedicated Orchestrator class Controller-level logic or model callbacks Keeps controllers thin, models focused on data, and the workflow independently testable
Transaction boundaries Per-record persist (no wrapping transaction) Single transaction around all records A failed record should not roll back thousands of successful ones; partial success is more useful than total failure
Error recovery Capture error on the record, continue importing Halt on first error Users expect to see which rows failed and why, not just "import failed"; the report becomes actionable
Two-phase workflow Separate parse! and import! methods A single run! method The preview step between parse and import lets users catch problems before data hits the database
Job design Thin jobs delegating to Orchestrator Logic inside the job classes The Orchestrator is testable without ActiveJob; jobs are just the async trigger

Testing it

The Orchestrator specs exercise both phases end-to-end using an anonymous target class and injected CSV content:

# spec/data_porter/orchestrator_spec.rb
let(:csv_content) { "First Name,Last Name,Email\nAlice,Smith,alice@example.com\n" }

describe "#parse!" do
  it "transitions to previewing" do
    orchestrator = described_class.new(data_import, content: csv_content)

    orchestrator.parse!

    expect(data_import.reload.status).to eq("previewing")
  end

  it "validates required fields" do
    csv = "First Name,Last Name,Email\n,Smith,alice@example.com\n"
    orchestrator = described_class.new(data_import, content: csv)

    orchestrator.parse!

    record = data_import.reload.records.first
    expect(record.status).to eq("missing")
  end
end

describe "#import!" do
  let(:failing_target_class) do
    Class.new(DataPorter::Target) do
      label "Failing"
      model_name "Guest"
      columns do
        column :name, type: :string
      end

      def persist(_record, context:)
        raise "DB constraint violation"
      end
    end
  end

  it "handles per-record errors without failing the import" do
    data_import.update!(records: [
      DataPorter::StoreModels::ImportRecord.new(
        line_number: 1, data: { name: "Alice" }, status: :valid
      )
    ])
    allow(DataPorter::Registry).to receive(:find).and_return(failing_target_class)

    orchestrator = described_class.new(data_import)
    orchestrator.import!

    expect(data_import.reload.status).to eq("completed")
    expect(data_import.report.errored_count).to eq(1)
  end
end
Enter fullscreen mode Exit fullscreen mode

The key pattern: even when every persist call raises, the import still reaches completed -- not failed. The failed status is reserved for catastrophic errors (the source cannot be read, the target cannot be resolved). Per-record errors are expected operational noise, tracked in the report.

Recap

  • The Orchestrator is a plain Ruby class that coordinates the parse-validate-persist workflow, keeping controllers thin and models focused.
  • The two-phase design (parse! then import!) creates a natural preview checkpoint where users can review data before it touches the database.
  • Per-record error handling means a single bad row never takes down the entire import; errors are captured on individual records and surfaced in the report.
  • ActiveJob integration is a thin wrapper: two one-liner jobs that delegate to the Orchestrator, using the engine's configured queue name.

Next up

The import now runs in the background, but the user has no way to know what is happening. They click "Import" and stare at a static page. In part 8, we build a real-time progress system using ActionCable and Stimulus -- a Broadcaster service that pushes status updates and record counts to the browser as the Orchestrator processes each row. No more refreshing to check if it is done.


This is part 7 of the series "Building DataPorter - A Data Import Engine for Rails". Previous: Parsing CSV Data with Sources | Next: Real-time Progress with ActionCable & Stimulus


GitHub: SerylLns/data_porter | RubyGems: data_porter

Top comments (0)