Building a Production-Ready Shopify App with Background Jobs: A Complete Architecture Guide
Building a Shopify app that handles heavy data processing requires careful architectural decisions. In this comprehensive guide, I'll walk you through how we built a scalable Shopify pricing app using Remix, BullMQ, Redis, and a dedicated worker process to handle background jobs efficiently.
Table of Contents
- Architecture Overview
- Project Structure
- Setting Up the Foundation
- Building the Queue System
- Creating Workers
- Handling Job Recovery
- Deployment Strategy
- Best Practices & Lessons Learned
Architecture Overview
Our Shopify app processes bulk price updates, which can involve thousands of products. Here's the high-level architecture:
┌─────────────────┐
│ Shopify Admin │
└────────┬────────┘
│
▼
┌─────────────────────────┐
│ Remix Web Server │
│ (Handles UI & API) │
└────────┬────────────────┘
│
▼
┌─────────────────────────┐
│ Redis + BullMQ │
│ (Job Queues) │
└────────┬────────────────┘
│
▼
┌─────────────────────────┐
│ Worker Process │
│ (Background Jobs) │
└─────────────────────────┘
Why This Architecture?
- Separation of Concerns: Web server handles HTTP requests; worker handles heavy lifting
- Scalability: Workers can scale independently from the web server
- Reliability: Jobs persist in Redis; if a worker crashes, jobs can be recovered
- Performance: Non-blocking operations keep the web server responsive
Project Structure
Here's how we organized our codebase:
bevy-price-rules/
├── app/
│ ├── routes/ # Remix routes (API & UI)
│ │ ├── api.sales.ts # Enqueue price update jobs
│ │ ├── api.upload.ts # Handle CSV uploads
│ │ └── api.revert.ts # Revert operations
│ ├── services/
│ │ └── queue/ # Queue infrastructure
│ │ ├── config.server.ts # Redis & queue config
│ │ ├── request-queue.server.ts # Request queue
│ │ ├── request-worker.server.ts # Request worker
│ │ ├── processing-queue.server.ts # Processing queue
│ │ ├── processing-worker.server.ts # Processing worker
│ │ ├── waiting-queue.server.ts # Waiting queue
│ │ └── waiting-worker.server.ts # Waiting worker
│ ├── types/
│ │ └── queue.type.ts # TypeScript interfaces
│ └── db.server.ts # Prisma client
├── worker.ts # Main worker entry point
├── tsconfig.worker.json # Worker-specific TypeScript config
├── package.json # Scripts for building workers
├── Dockerfile # Docker configuration
└── heroku.yml # Heroku deployment config
Setting Up the Foundation
1. Install Dependencies
{
"dependencies": {
"@remix-run/node": "^2.16.1",
"@shopify/shopify-app-remix": "^3.7.0",
"@prisma/client": "^6.2.1",
"bullmq": "^5.56.0",
"ioredis": "^5.6.1",
"dotenv": "^17.0.1"
},
"devDependencies": {
"typescript": "^5.0.0",
"ts-node": "^10.9.1"
}
}
2. Configure Package Scripts
{
"scripts": {
"build": "remix vite:build",
"start": "remix-serve ./build/server/index.js",
"worker:build": "tsc -p ./tsconfig.worker.json && tsc-esm-fix --target=dist",
"worker:start": "node ./dist/worker.js",
"worker:dev": "node --loader ts-node/esm --no-warnings --experimental-specifier-resolution=node ./worker.ts"
}
}
3. Worker TypeScript Configuration
Create tsconfig.worker.json to exclude UI-related files:
{
"compilerOptions": {
"module": "ESNext",
"esModuleInterop": true,
"target": "ES2022",
"moduleResolution": "bundler",
"outDir": "dist",
"baseUrl": ".",
"strict": true,
"skipLibCheck": true
},
"include": [
"./worker.ts",
"./app/**/*.ts"
],
"exclude": [
"node_modules",
"./app/**/*.tsx",
"./app/components/**/*",
"./app/routes/**/*",
"./app/pages/**/*",
"./app/providers/**/*"
]
}
4. Optimize Vite Configuration
Exclude worker-related dependencies from the web bundle:
// vite.config.ts
export default defineConfig({
optimizeDeps: {
exclude: [
"bullmq",
"./app/services/*",
"@sentry/node"
],
},
resolve: {
alias: {
fs: path.resolve("./app/shims/fs.ts"),
},
},
});
Building the Queue System
1. Redis Configuration
Create a robust Redis connection with retry logic:
// app/services/queue/config.server.ts
import Redis from 'ioredis';
import type { RedisOptions } from 'ioredis';
export const redisConfig: RedisOptions = {
tls: process.env.REDIS_TLS === 'true'
? { rejectUnauthorized: false }
: undefined,
db: 0,
family: 4,
enableAutoPipelining: true,
maxRetriesPerRequest: null,
enableReadyCheck: true,
connectTimeout: 60000,
retryStrategy: (times) => {
const maxDelay = 30000;
const maxAttempts = 20;
if (times >= maxAttempts) {
console.error(`Redis unreachable after ${maxAttempts} attempts`);
return null;
}
const delay = Math.min(100 * Math.pow(2, times), maxDelay);
console.warn(`Redis retry #${times} in ${delay}ms...`);
return delay;
},
reconnectOnError: (err) => {
const recoverableErrors = ['READONLY', 'ECONNRESET', 'ETIMEDOUT'];
return recoverableErrors.some(error => err?.message?.includes(error));
},
};
export const redisClient = new Redis(process.env.REDIS_URI!, redisConfig);
export const defaultQueueOptions = {
attempts: 3,
backoff: {
type: 'exponential',
delay: 10_000,
},
removeOnComplete: true,
removeOnFail: true,
};
export const defaultStreams = {
events: {
maxLen: 5, // Limit stream size
},
};
2. Define Queue Names & Job Data Types
// app/types/queue.type.ts
export enum QueueJobNames {
requestQ = 'bulk-price-update-request',
processingQ = 'bulk-price-processing',
waitingQ = 'bulk-price-waiting',
computeEngineQ = 'compute-engine',
productWebhookQ = 'product-webhook',
}
export interface IRequestJobData {
shop: string;
shopId: number;
identifier: string; // UUID for tracking
entityId: number; // Price rule or upload ID
type: SyncLogType;
source: BulkOperationSourceType;
action: ActionType;
bulkOperationJobId: number;
priority?: number;
}
3. Create the Request Queue
// app/services/queue/request-queue.server.ts
import { Queue, QueueEvents } from 'bullmq';
import { redisClient, QueueJobNames, defaultQueueOptions } from './config.server';
const RequestQueue = new Queue(QueueJobNames.requestQ, {
connection: redisClient,
defaultJobOptions: defaultQueueOptions,
});
const queueEvents = new QueueEvents(QueueJobNames.requestQ, {
connection: redisClient,
});
queueEvents.on('failed', ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed: ${failedReason}`);
});
export async function queueBulkOperationRequest(
data: IRequestJobData
): Promise<{
success: boolean;
message: string;
duplicateJob: boolean;
jobId: string | null
}> {
try {
// Validate required fields
if (!data?.shop || !data?.shopId || !data?.bulkOperationJobId) {
return {
success: false,
message: 'Missing required fields',
duplicateJob: false,
jobId: null
};
}
// Create deterministic job ID to prevent duplicates
const jobId = `${data.shop}__${data.source}__${data.entityId}`;
const existingJob = await RequestQueue.getJob(jobId);
if (existingJob) {
const state = await existingJob.getState();
// Block duplicate if job is active
if (state === 'waiting' || state === 'active' || state === 'delayed') {
console.warn(`Duplicate job blocked: ${jobId} is ${state}`);
await prisma.bulkOperationJob.update({
where: { id: data.bulkOperationJobId },
data: {
status: BulkOperationJobStatus.FAILED,
errorMessage: 'Operation already queued or running',
},
});
return {
success: false,
message: 'Operation already queued',
duplicateJob: true,
jobId,
};
}
// Remove completed/failed jobs
if (state === 'completed' || state === 'failed') {
await existingJob.remove();
}
}
// Add job to queue
await RequestQueue.add(
QueueJobNames.requestQ,
data,
{
jobId,
priority: data?.priority || 1,
attempts: 3,
removeOnComplete: true,
removeOnFail: { count: 1, age: 1000 },
delay: 50,
backoff: {
type: 'exponential',
delay: 10_000
},
}
);
// Update database
await prisma.bulkOperationJob.update({
where: { id: data.bulkOperationJobId },
data: {
requestQueueJobId: jobId,
requestQueueJobData: JSON.stringify(data),
status: BulkOperationJobStatus.QUEUED,
},
});
console.log(`Queued job ${jobId} for shop: ${data.shop}`);
return { success: true, message: 'ok', duplicateJob: false, jobId };
} catch (error: any) {
console.error(`Failed to queue job for ${data.shop}:`, error);
throw error;
}
}
export default RequestQueue;
4. Using the Queue from API Routes
// app/routes/api.sales.ts
import { json } from "@remix-run/node";
import { queueBulkOperationRequest } from "~/services/queue/request-queue.server";
export async function action({ request }: ActionFunctionArgs) {
const { admin, session } = await authenticate.admin(request);
// Create bulk operation job record
const bulkOperationJob = await prisma.bulkOperationJob.create({
data: {
shopId: session.shopId,
identifier: uuidv4(),
type: ActionType.CREATE,
sourceType: BulkOperationSourceType.PRICE_RULE,
status: BulkOperationJobStatus.PENDING,
},
});
// Queue the job
const result = await queueBulkOperationRequest({
shop: session.shop,
shopId: session.shopId,
identifier: bulkOperationJob.identifier,
entityId: priceRuleId,
type: SyncLogType.CREATE_PRICE_RULES,
source: BulkOperationSourceType.PRICE_RULE,
action: ActionType.CREATE,
bulkOperationJobId: bulkOperationJob.id,
});
if (!result.success) {
return json({ error: result.message }, { status: 400 });
}
return json({ success: true, jobId: result.jobId });
}
Creating Workers
1. Request Worker
The request worker validates and routes jobs to the appropriate processing queue:
// app/services/queue/request-worker.server.ts
import { Worker, type Job } from 'bullmq';
import { redisClient, QueueJobNames } from './config.server';
import { queueProcessBulkOperationRequest } from './processing-queue.server';
const MAX_STALLS = 3;
const REQ_STALL_MS = 30 * 1000;
const REQ_LOCK_TTL_MS = 60 * 1000;
export const RequestWorker = () => {
console.log('Starting Request Worker with concurrency: 1');
const worker = new Worker(
QueueJobNames.requestQ,
async (job: Job) => {
const { shop, shopId, bulkOperationJobId } = job.data;
console.log(`Processing request for shop: ${shop}, job ${job.id}`);
try {
job.updateProgress(0);
if (!shop || !shopId || !bulkOperationJobId) {
throw new Error('Missing required fields');
}
// Check stall count
const stalledCount = (job as any).stalledCounter ?? 0;
if (stalledCount >= MAX_STALLS) {
throw new Error('Job stalled too many times');
}
// Forward to processing queue
const result = await queueProcessBulkOperationRequest(job.data);
if (!result?.success) {
throw new Error(result?.message);
}
return { success: true, message: 'ok', shop };
} catch (error: any) {
console.error(`Failed to process job ${job.id}:`, error);
throw error;
}
},
{
connection: redisClient,
concurrency: 1,
stalledInterval: REQ_STALL_MS,
lockDuration: REQ_LOCK_TTL_MS,
drainDelay: 5,
removeOnComplete: {
count: 1,
age: 1000
},
removeOnFail: {
count: 1,
age: 1000
}
}
);
// Event handlers
worker.on('completed', async (job) => {
console.log(`Job ${job.id} completed`);
});
worker.on('failed', async (job, err) => {
const attemptsMade = job?.attemptsMade || 1;
const jobAttemptsLimit = job?.opts?.attempts || 1;
const isFinal = attemptsMade >= jobAttemptsLimit;
if (isFinal) {
console.error(`Job ${job.id} failed permanently:`, err);
await prisma.bulkOperationJob.update({
where: { id: job.data.bulkOperationJobId },
data: {
status: BulkOperationJobStatus.FAILED,
errorMessage: err?.message,
},
});
}
});
worker.on('stalled', async (jobId) => {
console.warn(`Job ${jobId} has stalled`);
});
worker.on('error', (error) => {
console.error('Worker error:', error);
});
return worker;
};
2. Processing Worker
The processing worker handles the actual business logic:
// app/services/queue/processing-worker.server.ts
import { Worker, type Job } from 'bullmq';
import { redisClient, QueueJobNames } from './config.server';
export const ProcessingWorker = () => {
console.log('Starting Processing Worker with concurrency: 5');
const worker = new Worker(
QueueJobNames.processingQ,
async (job: Job) => {
const { shop, shopId, entityId, source, action } = job.data;
console.log(`Processing ${action} for ${source} ${entityId}`);
try {
job.updateProgress(10);
// Mark as active in DB
await prisma.bulkOperationJob.update({
where: { id: job.data.bulkOperationJobId },
data: {
status: BulkOperationJobStatus.PROCESSING,
activeInWorker: true,
},
});
// Perform the actual work (Shopify API calls, etc.)
switch (action) {
case ActionType.CREATE:
await handleCreateAction(job.data);
break;
case ActionType.UPDATE:
await handleUpdateAction(job.data);
break;
case ActionType.DELETE:
await handleDeleteAction(job.data);
break;
}
job.updateProgress(100);
// Mark as complete
await prisma.bulkOperationJob.update({
where: { id: job.data.bulkOperationJobId },
data: {
status: BulkOperationJobStatus.COMPLETED,
activeInWorker: false,
completedAt: new Date(),
},
});
return { success: true };
} catch (error: any) {
console.error(`Processing failed for job ${job.id}:`, error);
await prisma.bulkOperationJob.update({
where: { id: job.data.bulkOperationJobId },
data: {
status: BulkOperationJobStatus.FAILED,
activeInWorker: false,
errorMessage: error.message,
},
});
throw error;
}
},
{
connection: redisClient,
concurrency: 5, // Process multiple jobs in parallel
stalledInterval: 60 * 1000,
lockDuration: 120 * 1000,
}
);
return worker;
};
3. Main Worker Entry Point
// worker.ts
import * as dotenv from 'dotenv';
import { RequestWorker } from './app/services/queue/request-worker.server';
import { ProcessingWorker } from './app/services/queue/processing-worker.server';
import { WaitingRequestWorker } from './app/services/queue/waiting-worker.server';
import prisma, { disconnectPrisma } from "./app/db.server";
dotenv.config();
(async () => {
try {
// Start all workers
const workers = [
RequestWorker(),
ProcessingWorker(),
WaitingRequestWorker(),
];
console.log('Running Workers:', workers.map(w => w?.name));
// Resume incomplete operations (see next section)
await resumeIncompleteBulkOperations();
// Graceful shutdown
async function shutdown(signal: string) {
console.log(`Received ${signal}, shutting down gracefully...`);
try {
// Mark all jobs as inactive
await prisma.bulkOperationJob.updateMany({
where: { activeInWorker: true },
data: { activeInWorker: false },
});
// Close workers
await Promise.all(workers.map(w => w.close()));
// Disconnect from database
await disconnectPrisma();
console.log('Shutdown complete');
process.exit(0);
} catch (error) {
console.error('Error during shutdown:', error);
process.exit(1);
}
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
} catch (error) {
console.error('Failed to start workers:', error);
process.exit(1);
}
})();
Handling Job Recovery
One of the most critical features is recovering from crashes. Here's how we handle it:
// Inside worker.ts
async function resumeIncompleteBulkOperations() {
try {
await sleep(1000); // Wait for Redis to stabilize
console.log('[Recovery] Scanning for incomplete operations...');
// Reset activeInWorker flag
await prisma.bulkOperationJob.updateMany({
where: { activeInWorker: true },
data: { activeInWorker: false },
});
const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
const pageSize = 10;
let page = 0;
while (true) {
// Find incomplete jobs
const incompleteJobs = await prisma.bulkOperationJob.findMany({
where: {
status: {
in: [
BulkOperationJobStatus.QUEUED,
BulkOperationJobStatus.PROCESSING,
BulkOperationJobStatus.ACTIVE,
BulkOperationJobStatus.PENDING,
]
},
activeInWorker: false,
createdAt: { gte: twentyFourHoursAgo },
},
include: {
shop: { select: { id: true, domain: true, status: true } },
},
orderBy: { createdAt: 'asc' },
skip: page * pageSize,
take: pageSize,
});
if (incompleteJobs.length === 0) break;
for (const job of incompleteJobs) {
try {
if (job.status === BulkOperationJobStatus.PROCESSING &&
job.processingQueueJobData) {
// Re-queue to processing
const jobData = JSON.parse(job.processingQueueJobData);
await queueProcessBulkOperationRequest(jobData);
console.log(`[Recovery] Re-queued processing job ${job.id}`);
}
else if (job.status === BulkOperationJobStatus.QUEUED &&
job.requestQueueJobData) {
// Re-queue to waiting
const jobData = JSON.parse(job.requestQueueJobData);
await queueBulkOperationToWaitingQueue(jobData);
console.log(`[Recovery] Re-queued request job ${job.id}`);
}
} catch (err) {
console.error(`[Recovery] Error resuming job ${job.id}:`, err);
}
}
page++;
}
console.log('[Recovery] Job recovery complete');
} catch (err) {
console.error('[Recovery] Failed to scan for incomplete jobs:', err);
}
}
Key Recovery Features:
- Runs on worker startup
- Only recovers jobs from last 24 hours
- Preserves job state and data
- Handles pagination for large datasets
- Logs all recovery actions
Deployment Strategy
1. Dockerfile
FROM node:22
WORKDIR /app
COPY package.json package-lock.json* ./
COPY prisma ./prisma/
# Clean install
RUN rm -f package-lock.json && \
npm install --legacy-peer-deps
RUN npx prisma generate
# Copy application code
COPY . .
# Build web server
RUN npm run build
# Build worker
RUN npm run worker:build
2. Heroku Configuration
# heroku.yml
build:
docker:
web: Dockerfile
worker: Dockerfile
run:
web: npm run docker-start
worker:
command:
- npm run worker:start
image: web
release:
image: web
command:
- echo 'Starting server after deployment!'
3. Environment Variables
# Redis
REDIS_URI=redis://user:password@host:port
REDIS_TLS=true
# Database
DATABASE_URL=postgresql://user:password@host:port/dbname
# Shopify
SHOPIFY_API_KEY=your_api_key
SHOPIFY_API_SECRET=your_api_secret
# Worker Configuration
REQ_QUEUE_MAX_STALLS=3
REQ_STALL_MS=30000
REQ_LOCK_TTL_MS=60000
4. Scaling on Heroku
# Scale web dynos
heroku ps:scale web=2:standard-2x
# Scale worker dynos
heroku ps:scale worker=2:standard-2x
# Monitor
heroku logs --tail --dyno=worker
Best Practices & Lessons Learned
1. Use Deterministic Job IDs
Prevent duplicate jobs by creating IDs from shop + source + entity:
const jobId = `${shop}__${source}__${entityId}`;
2. Implement Retry Logic with Backoff
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 10_000 // 10s, then 20s, then 40s
}
}
3. Track Job State in Database
Never rely solely on Redis for state. Always maintain a source of truth in your database:
await prisma.bulkOperationJob.update({
where: { id: jobId },
data: {
status: BulkOperationJobStatus.PROCESSING,
requestQueueJobData: JSON.stringify(data),
processingQueueJobData: JSON.stringify(data),
},
});
4. Handle Stalled Jobs
Jobs can stall due to crashes or network issues. Detect and recover:
const stalledCount = (job as any).stalledCounter ?? 0;
if (stalledCount >= MAX_STALLS) {
throw new Error('Job stalled too many times');
}
5. Implement Graceful Shutdown
Always clean up on shutdown:
process.on('SIGTERM', async () => {
await prisma.bulkOperationJob.updateMany({
where: { activeInWorker: true },
data: { activeInWorker: false },
});
await Promise.all(workers.map(w => w.close()));
await disconnectPrisma();
process.exit(0);
});
6. Use Multiple Queues for Different Priorities
- Request Queue: Fast validation, concurrency 1
- Processing Queue: Heavy lifting, concurrency 5+
- Waiting Queue: Rate-limited operations
7. Clean Up Completed Jobs
Prevent Redis memory bloat:
{
removeOnComplete: true,
removeOnFail: { count: 1, age: 1000 }
}
8. Monitor Redis Connection Health
Implement robust retry strategies:
retryStrategy: (times) => {
if (times >= maxAttempts) return null;
return Math.min(100 * Math.pow(2, times), maxDelay);
}
9. Separate Worker TypeScript Configuration
Exclude UI dependencies to reduce worker bundle size:
{
"exclude": [
"./app/components/**/*",
"./app/routes/**/*",
"./app/pages/**/*"
]
}
10. Use Job Progress for User Feedback
Update progress to show users real-time status:
await job.updateProgress(50);
Performance Considerations
Redis Optimization
- Enable autoPipelining for batched commands
- Use maxLen on streams to prevent unbounded growth
- Set appropriate TTL values for locks
Worker Optimization
- Adjust concurrency based on workload (I/O-bound vs CPU-bound)
- Set stalledInterval to detect hung jobs quickly
- Use lockDuration longer than expected job duration
Database Optimization
- Index frequently queried fields (shopId, status, identifier)
- Use connection pooling
- Implement pagination for bulk queries
Monitoring & Debugging
Log Critical Events
worker.on('completed', (job) => {
console.log(`[Worker] Job ${job.id} completed`);
});
worker.on('failed', (job, err) => {
console.error(`[Worker] Job ${job.id} failed:`, err);
});
worker.on('stalled', (jobId) => {
console.warn(`[Worker] Job ${jobId} stalled`);
});
Health Checks
Implement health check endpoints:
// app/routes/health.ts
export async function loader() {
try {
await redisClient.ping();
await prisma.$queryRaw`SELECT 1`;
return json({ status: 'healthy' });
} catch (error) {
return json({ status: 'unhealthy' }, { status: 503 });
}
}
BullMQ Dashboard
Use Bull Board for visual monitoring:
npm install @bull-board/api @bull-board/express
Conclusion
Building a production-ready Shopify app with background jobs requires careful planning, but the payoff is huge:
✅ Scalable: Handle thousands of concurrent operations
✅ Reliable: Jobs survive crashes and get recovered
✅ Performant: Non-blocking, keeps your app responsive
✅ Maintainable: Clear separation of concerns
The architecture we've built processes millions of price updates reliably, scales horizontally, and recovers gracefully from failures.
Key Takeaways
- Use BullMQ for reliable job processing
- Implement multiple queues for different workloads
- Always store job state in your database
- Build in job recovery from day one
- Deploy web and worker as separate processes
Resources
Questions or suggestions? Feel free to reach out
Happy coding! 🚀
Top comments (0)