DEV Community

mgd43b for AgentEnsemble

Posted on • Originally published at agentensemble.net

Using AgentEnsemble as a Pure Workflow Engine: Orchestration Without AI

Most of the conversation around agent frameworks assumes that orchestration is synonymous with LLM calls. Pick a framework, wire up your model, define your agents. That framing is understandable, but it leaves something useful on the table.

The harder problem for many Java teams isn't choosing a model. It's building pipelines that are observable, composable, and consistent -- whether or not any given step involves an LLM. A pipeline that mixes REST API calls, data transformations, and a few AI-backed steps shouldn't need two separate orchestration systems.

AgentEnsemble is not exclusively an AI orchestrator. The same DAG execution engine, parallel phase runner, callbacks, and review gates that coordinate LLM-backed agents also work for purely deterministic Java functions. No ChatModel required.

This post covers the deterministic-only orchestration pattern: what the API looks like, how data flows between tasks, and when this pattern is worth reaching for.


The Static Factory for Deterministic Pipelines

For pipelines with no LLM steps, the entry point is a static factory method that takes no model parameter:

Ensemble.run(task1, task2, task3);
Enter fullscreen mode Exit fullscreen mode

If any task passed to this method lacks a handler, the framework throws at startup. This keeps the contract explicit: deterministic tasks must always declare how they execute.

For more complex setups -- phases, callbacks, guardrails -- you use the builder:

Ensemble.builder()
    .phases(List.of(ingestion, processing, output))
    .onTaskComplete(event -> log.info("Completed: {}", event.taskId()))
    .build()
    .run();
Enter fullscreen mode Exit fullscreen mode

The builder path accepts a chatModel, but it isn't required. If all tasks have handlers, the ensemble runs without one.


Handlers and Task Output

A deterministic task declares its behavior via a handler lambda. The handler receives a TaskHandlerContext and returns a ToolResult:

Task extract = Task.builder()
    .description("extract-records")
    .handler(ctx -> ToolResult.success("42 records extracted"))
    .build();
Enter fullscreen mode Exit fullscreen mode

ToolResult.success(String content) wraps the output. ToolResult.failure(String reason) triggers task failure handling. The string content becomes the task's raw output, accessible downstream.

For tasks that need to fail conditionally:

Task validate = Task.builder()
    .description("validate-input")
    .handler(ctx -> {
        String input = ctx.contextOutputs().get(0).getRaw();
        if (input.isBlank()) {
            return ToolResult.failure("input was empty");
        }
        return ToolResult.success("valid: " + input.length() + " chars");
    })
    .build();
Enter fullscreen mode Exit fullscreen mode

Data Passing Between Tasks

Downstream tasks declare upstream dependencies via .context(). The framework resolves outputs in declaration order and delivers them through ctx.contextOutputs():

Task extract = Task.builder()
    .description("extract-records")
    .handler(ctx -> ToolResult.success("42 records"))
    .build();

Task transform = Task.builder()
    .description("transform-records")
    .context(List.of(extract))
    .handler(ctx -> {
        String raw = ctx.contextOutputs().get(0).getRaw();
        return ToolResult.success("transformed: " + raw);
    })
    .build();

Task load = Task.builder()
    .description("load-records")
    .context(List.of(transform))
    .handler(ctx -> {
        String data = ctx.contextOutputs().get(0).getRaw();
        // write to database
        return ToolResult.success("loaded: " + data);
    })
    .build();

Ensemble.run(extract, transform, load);
Enter fullscreen mode Exit fullscreen mode

Multiple upstream outputs are accessible by index in the order they were declared:

Task merge = Task.builder()
    .description("merge-results")
    .context(List.of(fetchA, fetchB, fetchC))
    .handler(ctx -> {
        List<String> results = ctx.contextOutputs().stream()
            .map(TaskOutput::getRaw)
            .toList();
        return ToolResult.success(String.join(", ", results));
    })
    .build();
Enter fullscreen mode Exit fullscreen mode

Parallel Execution

Independent tasks run concurrently on virtual threads automatically. If tasks declare no context() dependency on each other, the executor runs them in parallel:

Task fetchA = Task.builder()
    .description("fetch-region-a")
    .handler(ctx -> ToolResult.success("region-a: 120 records"))
    .build();

Task fetchB = Task.builder()
    .description("fetch-region-b")
    .handler(ctx -> ToolResult.success("region-b: 98 records"))
    .build();

Task fetchC = Task.builder()
    .description("fetch-region-c")
    .handler(ctx -> ToolResult.success("region-c: 77 records"))
    .build();

Task merge = Task.builder()
    .description("merge-all-regions")
    .context(List.of(fetchA, fetchB, fetchC))
    .handler(ctx -> {
        List<String> outputs = ctx.contextOutputs().stream()
            .map(TaskOutput::getRaw)
            .toList();
        return ToolResult.success(String.join("; ", outputs));
    })
    .build();

Ensemble.run(fetchA, fetchB, fetchC, merge);
Enter fullscreen mode Exit fullscreen mode

The DAG is inferred from context() declarations. merge waits for all three fetch tasks. The three fetches run concurrently. No explicit thread management required.

You can also be explicit via .workflow(Workflow.PARALLEL) on individual tasks, but for most cases the DAG inference is sufficient.


Named Phases

For larger pipelines with distinct stages, phases provide explicit sequencing and named output access:

Task ingest = Task.builder()
    .description("ingest-raw-data")
    .handler(ctx -> ToolResult.success("1000 rows ingested"))
    .build();

Task schemaCheck = Task.builder()
    .description("validate-schema")
    .handler(ctx -> ToolResult.success("schema valid"))
    .build();

Phase ingestion = Phase.of("ingestion", ingest, schemaCheck);

Task normalize = Task.builder()
    .description("normalize-records")
    .handler(ctx -> ToolResult.success("normalized"))
    .build();

Task enrich = Task.builder()
    .description("enrich-with-reference-data")
    .handler(ctx -> ToolResult.success("enriched"))
    .build();

Phase processing = Phase.builder()
    .name("processing")
    .tasks(List.of(normalize, enrich))
    .after(ingestion)
    .build();

Task writeOutput = Task.builder()
    .description("write-to-data-warehouse")
    .handler(ctx -> ToolResult.success("written"))
    .build();

Phase output = Phase.builder()
    .name("output")
    .tasks(List.of(writeOutput))
    .after(processing)
    .build();

EnsembleOutput result = Ensemble.builder()
    .phases(List.of(ingestion, processing, output))
    .build()
    .run();
Enter fullscreen mode Exit fullscreen mode

Phase outputs are accessible by name:

PhaseOutput ingestionOut = result.getPhaseOutputs().get("ingestion");
String ingestedData = ingestionOut.taskOutputs().get(0).getRaw();
Enter fullscreen mode Exit fullscreen mode

Within a phase, task parallelism follows the same DAG rules. normalize and enrich run concurrently if they don't depend on each other.


Callbacks and Observability

The same onTaskStart, onTaskComplete, and onTaskFailed callbacks work for deterministic tasks:

Ensemble.builder()
    .phases(List.of(ingestion, processing, output))
    .onTaskStart(e -> log.info("[START] {}", e.taskId()))
    .onTaskComplete(e -> log.info("[DONE]  {} -> {}", e.taskId(), e.output().getRaw()))
    .onTaskFailed(e -> log.error("[FAIL]  {} -> {}", e.taskId(), e.error().getMessage()))
    .build()
    .run();
Enter fullscreen mode Exit fullscreen mode

Metrics via Micrometer and OpenTelemetry tracing are equally available. Deterministic tasks participate in the same span hierarchy as AI tasks. If your ensemble mixes both, traces show the full picture.


Mixing Deterministic and AI Tasks

Deterministic and AI-backed tasks compose freely within the same pipeline:

Task fetchData = Task.builder()
    .description("fetch-customer-records")
    .handler(ctx -> ToolResult.success(fetchFromDatabase()))
    .build();

Task analyzeData = Task.builder()
    .description("identify key themes and concerns from these customer records")
    .chatModel(model)
    .context(List.of(fetchData))
    .build();

Task writeReport = Task.builder()
    .description("format-as-json-report")
    .context(List.of(analyzeData))
    .handler(ctx -> {
        String analysis = ctx.contextOutputs().get(0).getRaw();
        return ToolResult.success(toJson(analysis));
    })
    .build();

Ensemble.builder()
    .tasks(List.of(fetchData, analyzeData, writeReport))
    .chatModel(model)
    .build()
    .run();
Enter fullscreen mode Exit fullscreen mode

The chatModel on the ensemble is only used for tasks that don't have a handler. Deterministic tasks bypass the LLM entirely.


When to Use This

Deterministic-only orchestration in AgentEnsemble is useful when you need:

  • Multi-step pipelines with DAG dependencies -- sequential or parallel, inferred from context declarations
  • Observable step-level execution -- callbacks, metrics, and tracing without external tooling
  • Mixed pipelines -- some steps are AI-backed, some are not; one orchestration model covers both
  • Phase-level sequencing -- named stages with explicit ordering and structured output access
  • Review gates on deterministic steps -- the PhaseReview API works regardless of whether the tasks inside are AI or handler-based

It is not a replacement for a dedicated workflow engine like Temporal if you need durable execution, long-running workflows with days or weeks of persistence, or replays across process restarts. For those requirements, Temporal or a similar system is the right choice.

For pipelines that live within a single process, run in seconds or minutes, and need consistent observability without adding infrastructure, this pattern removes a lot of plumbing.


Tradeoffs

No persistence: if the process dies mid-run, the pipeline restarts from the beginning. There is no checkpoint mechanism.

No scheduling: the orchestration layer has no built-in cron or trigger. External schedulers (Quartz, Spring Scheduler, Kubernetes CronJobs) are needed if the pipeline runs on a schedule.

No distributed execution: all tasks run in-process on virtual threads. Cross-JVM task distribution is not supported.

These are acceptable tradeoffs for many internal pipelines. They are not appropriate for workflows that span hours or need durable state across process restarts.


The guide at agentensemble.net/guides/deterministic-orchestration/ covers the full API including guardrails and review gate integration. The example source is runnable directly from the repository.

AgentEnsemble is open-source under the MIT license.

Top comments (0)