DEV Community

Cover image for Building Fail-Safes for Incomplete LLM Responses in Laravel Echo
Dewald Hugo
Dewald Hugo

Posted on • Originally published at origin-main.com

Building Fail-Safes for Incomplete LLM Responses in Laravel Echo

Broadcasting LLM token streams through Laravel Echo feels elegant right up until the moment it silently dies halfway through a response. No error. No terminal event. Just a client sitting there, waiting for tokens that will never arrive.

We hit this in production during a multi-user document generation feature. The Pusher connection degraded during a particularly long Anthropic response. The queue job had no idea the client had disconnected, and the user stared at a spinner for three minutes before refreshing. The partial response was gone. No retry surface. No recovery path. The incident report had three action items and none of them were obvious beforehand.

That experience shaped the *Laravel LLM streaming fail-safe* architecture this article covers. Every pattern below is oriented toward the specific failure modes that broadcasting-based LLM streams introduce, and in several cases those failure modes are different from what you encounter with SSE.

One framing note before we start. Laravel Echo names a client-side JavaScript library for subscribing to broadcast channels. It is not an SSE wrapper. The architecture here is: a queued job calls the LLM API, iterates the stream, and broadcasts each token as a private channel event. Echo subscribes on the client. This pattern earns its weight when you need multiple subscribers on the same stream (team collaboration, agent monitoring, admin oversight), or when you are already running Reverb or Pusher in your stack. The tradeoffs between Livewire, SSE, and WebSockets for AI streaming are worth understanding before committing to this path. If you only have one client per stream, SSE is simpler and has materially lower infrastructure overhead.

Why Incomplete Streams Are More Dangerous in Echo

SSE gives you a persistent, unidirectional HTTP connection with browser-native reconnect semantics. When the connection drops, the browser reconnects and sends Last-Event-ID. You have backpressure. You know whether the connection is alive.

Echo over WebSockets gives you none of that for free. The WebSocket channel stays open even when the underlying LLM stream on the server has failed. The client shows “connected” while the queue job is dead or retrying. Pusher and Reverb will accept broadcast events and deliver them on arrival, but they will not tell your client that no more events are coming.

That gap is exactly where incomplete responses live. Four categories to prepare for.

Token limit truncation. The LLM hits max_tokens and stops mid-sentence. The finish_reason comes back as max_tokens, not end_turn. If your client only listens for a generic “done” signal without inspecting the finish reason, a truncated response renders as complete. Users won’t know. You won’t know until they complain.

Queue job failure. The job throws after token 47. Laravel retries it. The client has already rendered tokens 1 through 47 and will now receive them again from the retry run, duplicated, in the same Echo channel.

Silent connection drop. The WebSocket drops between the server and Pusher, or between Pusher and the client. Events broadcast during the gap are gone. The job may complete successfully on the server and broadcast a terminal event that the client never receives.

Orphaned streams. The queue worker is killed mid-flight by an OOM event or a forced deployment restart. The stream record in your database stays in streaming status indefinitely, because nothing transitions it out.

Each one needs a different fix. Build the layers in order.

The Architecture Baseline

The core pattern is a queued job that streams from the LLM API and broadcasts each token as a private channel event scoped to a unique stream_id. Every event carries a monotonically increasing sequence number. That sequence number is the foundation of every fail-safe that follows.

Start with the broadcast event. The Laravel Broadcasting documentation covers private channel authorization in full, but the event structure below is what drives the client-side recovery logic.

<?php

namespace App\Events;

use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;

class LlmTokenReceived implements ShouldBroadcast
{
    public function __construct(
        public readonly string  $streamId,
        public readonly string  $token,
        public readonly int     $sequence,
        public readonly string  $status,       // 'streaming' | 'complete' | 'truncated' | 'error' | 'dead'
        public readonly ?string $finishReason = null,
    ) {}

    public function broadcastOn(): PrivateChannel
    {
        return new PrivateChannel("stream.{$this->streamId}");
    }

    public function broadcastAs(): string
    {
        return 'token.received';
    }

    public function broadcastWith(): array
    {
        return [
            'stream_id'     => $this->streamId,
            'token'         => $this->token,
            'sequence'      => $this->sequence,
            'status'        => $this->status,
            'finish_reason' => $this->finishReason,
        ];
    }
}
Enter fullscreen mode Exit fullscreen mode

The status field carries the terminal state explicitly. complete means the LLM finished naturally (end_turn). truncated means it hit a token limit. error and dead mean infrastructure or retry failure. The client does not guess. The server tells it.

The database layer makes recovery possible at all:

<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration
{
    public function up(): void
    {
        Schema::create('llm_streams', function (Blueprint $table) {
            $table->id();
            $table->string('stream_id')->unique();
            $table->foreignId('user_id')->constrained()->cascadeOnDelete();
            $table->text('prompt');
            $table->enum('status', ['pending', 'streaming', 'complete', 'truncated', 'error', 'dead'])
                  ->default('pending');
            $table->longText('partial_content')->nullable();
            $table->longText('final_content')->nullable();
            $table->unsignedInteger('last_sequence')->default(0);
            $table->string('finish_reason')->nullable();
            $table->text('error_message')->nullable();
            $table->timestamp('started_at')->nullable();
            $table->timestamp('last_checkpoint_at')->nullable();
            $table->timestamp('completed_at')->nullable();
            $table->timestamp('failed_at')->nullable();
            $table->timestamps();

            $table->index(['status', 'last_checkpoint_at']); // orphan detection
            $table->index(['user_id', 'status']);
        });
    }
};
Enter fullscreen mode Exit fullscreen mode

Server-Side Fail-Safes

The queue job carries most of the server-side protection. Three responsibilities: stream tokens and broadcast them with sequence numbers, write periodic checkpoints so recovery is possible, and guarantee a terminal broadcast event regardless of how the job ends.

The Anthropic Messages Streaming documentation defines the event types used in the iterator below. The message_stop event is the authoritative signal that the LLM has finished, and its stop_reason field is what you map to your own status enum.

<?php

namespace App\Jobs;

use App\Events\LlmTokenReceived;
use App\Models\LlmStream;
use Anthropic\Laravel\Facades\Anthropic;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;

class StreamLlmResponseJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $timeout = 120;
    public int $tries   = 3;
    public int $backoff = 10;

    public function __construct(
        private readonly string $streamId,
        private readonly string $prompt,
        private readonly int    $userId,
    ) {}

    public function handle(): void
    {
        $sequence  = 0;
        $buffer    = '';
        $terminated = false;

        LlmStream::where('stream_id', $this->streamId)
            ->update(['status' => 'streaming', 'started_at' => now()]);

        try {
            $stream = Anthropic::messages()->stream([
                'model'      => 'claude-sonnet-4-5',
                'max_tokens' => 4096,
                'messages'   => [
                    ['role' => 'user', 'content' => $this->prompt],
                ],
            ]);

            foreach ($stream as $event) {
                if ($event->type === 'content_block_delta' && isset($event->delta->text)) {
                    $token   = $event->delta->text;
                    $buffer .= $token;
                    $sequence++;

                    broadcast(new LlmTokenReceived(
                        streamId: $this->streamId,
                        token:    $token,
                        sequence: $sequence,
                        status:   'streaming',
                    ));

                    if ($sequence % 50 === 0) {
                        $this->checkpoint($buffer, $sequence);
                    }
                }

                if ($event->type === 'message_stop') {
                    $finishReason = $event->message?->stop_reason ?? 'unknown';
                    $status       = $finishReason === 'end_turn' ? 'complete' : 'truncated';
                    $terminated   = true;
                    $sequence++;

                    broadcast(new LlmTokenReceived(
                        streamId:     $this->streamId,
                        token:        '',
                        sequence:     $sequence,
                        status:       $status,
                        finishReason: $finishReason,
                    ));

                    LlmStream::where('stream_id', $this->streamId)->update([
                        'status'        => $status,
                        'finish_reason' => $finishReason,
                        'final_content' => $buffer,
                        'last_sequence' => $sequence,
                        'completed_at'  => now(),
                    ]);
                }
            }
        } catch (Throwable $e) {
            if (! $terminated) {
                $sequence++;

                broadcast(new LlmTokenReceived(
                    streamId:     $this->streamId,
                    token:        '',
                    sequence:     $sequence,
                    status:       'error',
                    finishReason: 'exception',
                ));

                LlmStream::where('stream_id', $this->streamId)->update([
                    'status'          => 'error',
                    'error_message'   => $e->getMessage(),
                    'last_sequence'   => $sequence,
                    'partial_content' => $buffer,
                    'failed_at'       => now(),
                ]);
            }

            throw $e; // preserve queue retry behaviour
        }
    }

    private function checkpoint(string $buffer, int $sequence): void
    {
        LlmStream::where('stream_id', $this->streamId)->update([
            'partial_content'    => $buffer,
            'last_sequence'      => $sequence,
            'last_checkpoint_at' => now(),
        ]);
    }

    public function failed(Throwable $exception): void
    {
        LlmStream::where('stream_id', $this->streamId)->update([
            'status'        => 'dead',
            'error_message' => $exception->getMessage(),
            'failed_at'     => now(),
        ]);

        // Last-resort broadcast after all retries are exhausted.
        // PHP_INT_MAX sequence guarantees the client accepts this regardless
        // of how many gaps have accumulated.
        broadcast(new LlmTokenReceived(
            streamId:     $this->streamId,
            token:        '',
            sequence:     PHP_INT_MAX,
            status:       'dead',
            finishReason: 'retries_exhausted',
        ));
    }
}
Enter fullscreen mode Exit fullscreen mode

Three decisions in this code are worth calling out explicitly.

The $terminated flag prevents double-broadcasting. If message_stop fires and then the foreach cleanup throws, you do not want a second terminal event with status: 'error' landing after you have already broadcast status: 'complete'. Clients will not handle that gracefully.

The throw $e at the end of the catch block is intentional. Removing it tells Laravel the job succeeded, killing retry behaviour. The partial broadcast for error is informational to the client. The throw is what keeps the queue system honest.

The failed() hook runs after all retry attempts are exhausted and is your last-resort client notification. This method is called by Laravel’s Queue Worker, not by your own code.

[Edge Case Alert] If you are running Laravel Reverb rather than Pusher, broadcasts inside failed() may not deliver if Reverb is also under load or restarting at the same time as your queue worker. failed() is best-effort in that scenario. The orphan detection command below is your backstop.

Client-Side Fail-Safes

Most teams spend three lines on the Echo subscription and zero lines on what happens when it goes quiet. The watchdog and sequence tracking below are the part that actually makes this production-safe.

class LlmStreamClient {
    constructor(streamId, options = {}) {
        this.streamId  = streamId;
        this.tokens    = [];
        this.lastSeq   = 0;
        this.gaps      = [];
        this.watchdog  = null;
        this.silenceMs = options.silenceMs  ?? 10000;
        this.onToken   = options.onToken    ?? (() => {});
        this.onComplete = options.onComplete ?? (() => {});
        this.onError   = options.onError    ?? (() => {});
        this.channel   = null;
    }

    subscribe() {
        this.channel = window.Echo
            .private(`stream.${this.streamId}`)
            .listen('.token.received', (e) => {
                this.resetWatchdog();
                this.handleEvent(e);
            });

        this.startWatchdog();
        return this;
    }

    handleEvent({ sequence, token, status, finish_reason: finishReason }) {
        if (sequence > this.lastSeq + 1) {
            this.gaps.push({ expected: this.lastSeq + 1, received: sequence });
        }
        this.lastSeq = Math.max(this.lastSeq, sequence);

        if (status === 'streaming') {
            this.tokens.push(token);
            this.onToken(token, this.tokens.join(''));
            return;
        }

        this.clearWatchdog();
        const partial = this.tokens.join('');

        if (status === 'complete') {
            this.onComplete({ content: partial, gaps: this.gaps, finishReason });
        } else {
            this.onError({ type: status, partial, gaps: this.gaps, finishReason });
        }

        this.teardown();
    }

    startWatchdog() {
        this.watchdog = setTimeout(() => {
            this.onError({
                type:    'client_timeout',
                partial: this.tokens.join(''),
                message: `No event received for ${this.silenceMs}ms`,
                gaps:    this.gaps,
            });
            this.teardown();
        }, this.silenceMs);
    }

    resetWatchdog() { this.clearWatchdog(); this.startWatchdog(); }

    clearWatchdog() {
        if (this.watchdog) { clearTimeout(this.watchdog); this.watchdog = null; }
    }

    teardown() {
        this.clearWatchdog();
        if (this.channel) {
            window.Echo.leave(`stream.${this.streamId}`);
            this.channel = null;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Usage with the full error surface handled:

const client = new LlmStreamClient('abc-123', {
    silenceMs: 10000,

    onToken: (token, full) => {
        document.getElementById('output').textContent = full;
    },

    onComplete: ({ content, gaps, finishReason }) => {
        if (gaps.length > 0) {
            console.warn('Stream gaps detected — some tokens may be missing.', gaps);
        }
        saveResponse(content);
    },

    onError: ({ type, partial, finishReason }) => {
        savePartial(partial); // always persist what arrived

        if (type === 'truncated') {
            showRetryButton('Response was cut off. Retry to continue.');
        } else if (type === 'client_timeout') {
            showRetryButton('Connection timed out. Retry when ready.');
        } else {
            showError('Something went wrong. Your partial response has been saved.');
        }
    },
});

client.subscribe();
Enter fullscreen mode Exit fullscreen mode

The watchdog fires after silenceMs of silence. Ten seconds without a token means something is wrong: the job died, the Pusher connection degraded, or the LLM API paused for an unusually long time between tokens. All three warrant user feedback.

[Production Pitfall] Do not set silenceMs below 8000. Claude’s API pauses several seconds between tokens during complex structured outputs before streaming resumes. We set ours to 5000 initially and got false timeout fires on longer document generation tasks. 10000 is the minimum safe floor for production. If you are generating very long documents, consider 15000.

The gaps array is diagnostic, not restorative. Pusher and Reverb do not offer event replay for private channels. Gap detection tells you whether a broadcast delivery problem contributed to an incomplete response, which helps you distinguish API truncation from infrastructure failure. Log it to your analytics or error tracker on every onComplete call.

Detecting and Recovering Orphaned Streams

The orphan case does not surface at runtime. The queue worker is killed. The stream stays in streaming status. The client timed out via the watchdog and showed the retry UI. But nothing tells the database the stream is dead.

Checkpointing every 50 tokens gives you a heartbeat. An active stream always updates last_checkpoint_at within a predictable window. Silence beyond that window is evidence of death.

Register the cleanup command in routes/console.php:

<?php

use Illuminate\Support\Facades\Schedule;

Schedule::command('streams:reap-orphans')->everyFiveMinutes();
Enter fullscreen mode Exit fullscreen mode

The command itself:

<?php

namespace App\Console\Commands;

use App\Events\LlmTokenReceived;
use App\Models\LlmStream;
use Illuminate\Console\Command;

class ReapOrphanedStreams extends Command
{
    protected $signature   = 'streams:reap-orphans';
    protected $description = 'Mark streams with no checkpoint activity as orphaned.';

    public function handle(): void
    {
        $orphans = LlmStream::query()
            ->where('status', 'streaming')
            ->where(function ($q) {
                $q->where('last_checkpoint_at', '<', now()->subMinutes(2))
                  ->orWhere(function ($q2) {
                      $q2->whereNull('last_checkpoint_at')
                         ->where('started_at', '<', now()->subMinutes(2));
                  });
            })
            ->get();

        foreach ($orphans as $stream) {
            $stream->update([
                'status'        => 'error',
                'error_message' => 'Orphaned: no checkpoint activity for 2+ minutes.',
                'failed_at'     => now(),
            ]);

            broadcast(new LlmTokenReceived(
                streamId:     $stream->stream_id,
                token:        '',
                sequence:     $stream->last_sequence + 1,
                status:       'error',
                finishReason: 'orphaned',
            ));

            $this->line("Reaped: {$stream->stream_id}");
        }

        $this->info("Reaped {$orphans->count()} orphaned stream(s).");
    }
}
Enter fullscreen mode Exit fullscreen mode

Two minutes is conservative. On a fast model with short prompts, 50 tokens arrive in seconds. If you are streaming long documents, increase the reap window to match, or reduce the checkpoint interval from 50 to 20 tokens.

[Architect’s Note] The last_checkpoint_at column is a poor man’s distributed heartbeat, and that is not a criticism. This is the same liveness-detection pattern event-driven systems have used for decades. If you later move to Laravel Horizon with job event hooks, you can correlate orphan detection directly with actual queue worker failures and remove the scheduler command. For most setups, the scheduled Artisan command approach costs nothing and requires no additional infrastructure.

Retry Strategy

When a stream ends in error, truncated, or dead, your user needs a retry button. Always generate a new stream_id for each retry. Reusing the original stream_id means the retry job broadcasts into the same channel the client already abandoned, and the partial content from attempt one gets mixed with attempt two in your database record. Neither is recoverable cleanly.

<?php

namespace App\Http\Controllers;

use App\Jobs\StreamLlmResponseJob;
use App\Models\LlmStream;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
use Illuminate\Support\Str;

class StreamRetryController extends Controller
{
    public function __invoke(Request $request, string $streamId): JsonResponse
    {
        $original = LlmStream::query()
            ->where('stream_id', $streamId)
            ->where('user_id', $request->user()->id)
            ->whereIn('status', ['error', 'truncated', 'dead'])
            ->firstOrFail();

        $newStreamId = (string) Str::uuid();

        LlmStream::create([
            'stream_id' => $newStreamId,
            'user_id'   => $request->user()->id,
            'prompt'    => $original->prompt,
            'status'    => 'pending',
        ]);

        StreamLlmResponseJob::dispatch($newStreamId, $original->prompt, $request->user()->id);

        return response()->json(['stream_id' => $newStreamId]);
    }
}
Enter fullscreen mode Exit fullscreen mode

Route registration:

<?php

use App\Http\Controllers\StreamRetryController;
use Illuminate\Support\Facades\Route;

Route::middleware('auth:sanctum')->group(function () {
    Route::post('/streams/{streamId}/retry', StreamRetryController::class);
});
Enter fullscreen mode Exit fullscreen mode

The original stream record stays in the database with its status and partial_content intact for audit purposes. The retry creates a clean record. The client receives the new stream_id in the JSON response and calls client.subscribe() fresh.

If your streaming endpoints are not locked down with proper token-based authentication, the retry endpoint becomes an unauthenticated job dispatch surface. Sanctum’s API token middleware is the right guard here, not session auth, because the retry call typically originates from JavaScript, not a form submission.

The Service Layer Is Not This Job’s Problem

The job above handles the transport layer. It does not handle token budget management, provider fallback, cost attribution, or prompt construction. Those belong one layer up.

If you are building this into a larger system, the production AI architecture for Laravel covers the contracts and driver abstraction layer that should wrap around the streaming job described here. The job should be thin: stream, checkpoint, broadcast. Everything else belongs in the service it calls.

That separation also matters when your streamed output is structured. Schema validation against LLM hallucinations belongs at the service layer. A truncated JSON response is harder to recover from than truncated prose. Catching a max_tokens truncation in a partial JSON structure before it reaches your client is far cheaper than handling the parse error downstream.

Monitoring What Actually Matters

Before going live, instrument three metrics from your llm_streams table:

Stream completion rate. status = 'complete' divided by total streams initiated, measured hourly. Below 95% in production warrants investigation. Below 90% is an incident.

Orphan rate. Streams reaped by the Artisan command as a percentage of total streams. A spike here usually correlates with deployment restarts or queue worker OOM events. Check your Horizon metrics or system logs alongside it.

Gap frequency. Log the gaps array from client-side onComplete callbacks via a lightweight analytics endpoint. Persistent gaps from specific geographic regions point to Pusher or Reverb delivery problems, not LLM API issues. The distinction matters: one is your infrastructure, the other is your vendor’s.

Connecting these to Laravel’s AI middleware for token tracking gives you cost visibility alongside reliability metrics. That combination is what you need to have an honest conversation about whether Echo-based streaming is worth its operational complexity for your specific use case.

Top comments (0)