DEV Community

Cover image for Nuxt & Cloudflare Queues: Building a Data Sync Pipeline using Vectorize
Keith Mifsud
Keith Mifsud

Posted on • Originally published at keith-mifsud.me

Nuxt & Cloudflare Queues: Building a Data Sync Pipeline using Vectorize

Continuing from the last article, we’ve already deployed our AI-powered real estate listing demo Nuxt application to Cloudflare, including the D1 database and AI Workers binding. In this article, we will set up Cloudflare Vectorize and implement a Cloudflare Queue to populate the Vector store with embeddings for our locations.

This article is the second part of the Nuxt & Cloudflare AI Vector Pipeline Series, a three-part series.

The code for the entire demo application is publicly available on GitHub: Nuxt & Cloudflare AI Vector Pipeline Series - give it a ⭐.

Create a vector index on Cloudflare Vectorize and bind it to our Nuxt application

As briefly explained in the previous article, Vectorize is Cloudflare’s native vector store. A vector store is essentially a database designed to hold text embeddings and index them for fast querying. Text embeddings are numeric representations (lists of numbers) that AI models use to read and understand data. Feel free to ask in comments below if my subtle explanation of Vectorize and vector stores leads to more questions - I know it's too broad 😆.

Cloudflare’s Workers AI supports several embedding models; we will use @cf/baai/bge-m3 because of my experience with it on similar projects and its multilingual support, which is ideal for semantic matching use cases.

We will use Wrangler CLI to create the vector index on Cloudflare and bind it to the project through Wrangler’s config so we can build the index with the locations’ text embeddings later.

To use the bge-m3 embedding model, we need to specify a dimension of 1024. Run:

npx wrangler vectorize create locations-index --dimensions=1024 --metric=cosine
Enter fullscreen mode Exit fullscreen mode

and add the bindings in /wrangler.toml:

[[vectorize]]
binding = "VECTORIZE"
index_name = "locations-index"
Enter fullscreen mode Exit fullscreen mode

Now that we've created the location vector index, we need to enable filtering on our metadata. While upserts work automatically based on ID, we must explicitly index locationId so we can filter search results or delete obsolete records by their location ID in the future.

npx wrangler vectorize create-metadata-index locations-index --property-name=locationId --type=string
Enter fullscreen mode Exit fullscreen mode

We can now build our code and deploy it to make sure Vectorize is being used, and the index is created:

pnpm run build
npx wrangler deploy
Enter fullscreen mode Exit fullscreen mode

The CLI output should show that our worker has access to Vectorize and AI (which we previously added):

Your Worker has access to the following bindings:
Binding                               Resource             
env.DB (property-sync-db)             D1 Database          
env.VECTORIZE (locations-index)       Vectorize Index      
env.ASSETS                            Assets
env.AI                                AI   
Enter fullscreen mode Exit fullscreen mode

Upserting vectors using Cloudflare Workers AI and Vectorize

We will use a simple repository pattern for Cloudflare Vectorize, allowing us to easily swap implementations if needed. This sounds like overkill for such a small demo application, right? I recently avoided a substantial amount of time wasted when NuxtHub announced they’re deprecating their AI features. I was building a similar feature using hubAi() and hubVectorize() from NuxtHub’s core module, but I had to refactor it before I finished. Thanks to my use of the repository pattern, I only had to swap the repository implementation from NuxthubVectorAIRepository to CloudflareVectorAIRepository as, all other code was bound to the VectorAIRepository interface.

💡 I don’t see anything wrong with enforcing a Repository Pattern throughout the application. I also don’t see anything wrong with using the pattern for only some specific dependencies. In this case. I know that there’s a chance we will need to change the Vector and AI implementations, but changing the ORM (Drizzle) implementation is less likely. Let me know in the comments below your thoughts on the Repository Pattern in Nuxt.

I will go into detail about Queues and Handlers in the next section of this article. For now, we just need to know that the vectorisation of the location data will be queued, and that a queue handler will depend on a Location Vector Repository. Let’s add this dependency.

Create /server/utils/queueHandlers/repositories/LocationVectorRepository.ts with:

import { Location } from '~~/server/database/types'

export interface LocationVectorRepository {

  /**
   * Upserts locations into the Vector Store.
   *
   * @param locations - An array of Location objects to be upserted.
   * @returns A promise that resolves to the number of locations upserted.
   */
  upsertLocations (
    locations: Location[],
  ): Promise<number>;
}

Enter fullscreen mode Exit fullscreen mode

And install the following types from Cloudflare Workers if you haven’t already: pnpm install @cloudflare/workers-types.

Database types in /server/database/types.ts:

import * as schema from './schema';

export type Location = typeof schema.locations.$inferSelect;

export type Agent = typeof schema.agents.$inferSelect;

export type Property = typeof schema.properties.$inferSelect;

Enter fullscreen mode Exit fullscreen mode

AI types in /server/types/ai-models.ts:

export interface BGEEmbeddingResponse {
  data?: number[][];
  response?: number[][];
  shape?: number[];
}
Enter fullscreen mode Exit fullscreen mode

We can now finally implement the upsertLocations() method in our Cloudflare implementation of the LocationVectorRepository interface.

Let’s start by creating the repository class in /server/repositories/cloudflare/CloudflareVectorAIRepository.ts and add the following contents:

import {
  LocationVectorRepository,
} from '~~/server/utils/queueHandlers/repositories/LocationVectorRepository'
import { Location } from '~~/server/database/types'
import type { Ai, VectorizeIndex } from '@cloudflare/workers-types'
import { BGEEmbeddingResponse } from '~~/server/types/ai-models'

export class CloudflareVectorAIRepository implements LocationVectorRepository {

  constructor (
    private readonly vectorIndex: VectorizeIndex,
    private readonly ai: Ai,
  ) {

    if (!vectorIndex) {
      throw new Error(
        'Missing Vectorize binding.')
    }

    if (!ai) {
      throw new Error('Missing AI binding.')
    }
  }

  /**
   * Upserts the given locations into the vector index.
   *
   * @param locations
   */
  public async upsertLocations (locations: Location[]): Promise<number> {

    if (locations.length === 0) {
      return 0
    }

    const AI_BATCH_SIZE = 10
    let successfulUpserts = 0

    for (let i = 0; i < locations.length; i += AI_BATCH_SIZE) {
      const batch = locations.slice(i, i + AI_BATCH_SIZE)

      try {
        const textsToEmbed = batch.map((location) =>
          this.buildTextForEmbedding(location),
        )

        const vectors = await this.embedTextBatch(textsToEmbed)

        const vectorObjects = batch.map((location, idx) => {
          const vector = vectors[idx]
          if (!vector) return null
          return {
            id: location.id,
            values: vector,
            metadata: {
              locationId: location.id,
            },
          }
        }).filter((obj): obj is NonNullable<typeof obj> => obj !== null)

        if (vectorObjects.length > 0) {
          const result = await this.vectorIndex.upsert(vectorObjects)
          console.log('[Repo] Upsert result:', JSON.stringify(result))
          successfulUpserts += vectorObjects.length
        }
      } catch (error) {
        console.error(
          `[CloudflareVectorAIRepository] Failed to process batch starting at index ${i}`,
          error,
        )
      }
    }
    return successfulUpserts
  }

  /**
   * Embeds a batch of texts using the BGE-M3 model.
   *
   * @param texts
   * @private
   */
  private async embedTextBatch (texts: string[]): Promise<number[][]> {
    const response = (await this.ai.run(
      '@cf/baai/bge-m3', {
        text: texts,
        response_format: 'embedding_vector',
      },
    )) as BGEEmbeddingResponse

    const vectors = response.data ?? response.response

    if (!vectors) throw new Error('BGE-M3 invalid response')

    return vectors
  }

  /**
   * Builds the text representation for embedding from a Location.
   *
   * @param location
   * @private
   */
  private buildTextForEmbedding (location: Location): string {
    return `${location.name} | ${location.postcodes.join(', ')}`
  }

}

Enter fullscreen mode Exit fullscreen mode

The repository requires Cloudflare’s Vector Index and AI bindings in its constructor. We will resolve these from Cloudflare’s environment in the queue handler later.

The upsert locations method takes a list of locations from our database, builds text embeddings for each location in batches of 10, maps them to vector objects with the index metadata, and then upserts the vector objects to the vector store.

I’m intentionally excluding unit and integration tests from this demo application for brevity. Before we can test this repository end-to-end, we need to set up our Cloudflare Queue.

How to dispatch and handle Cloudflare Queues in Nuxt

Queues allow us to process tasks in the background, which is essential for decoupling long-running operations from user requests. Like most queue services, Cloudflare Queues includes features such as batching and retries.

In our Nuxt application, we will implement this in two parts:

  • The Producer (Nitro Task): We will dispatch messages containing just the Location ID from our database. To be efficient, we will use sendBatch to dispatch up to ten IDs at a time.
  • The Consumer (Nitro Plugin): We will listen for these messages within the same Nuxt application (running as a Cloudflare Worker) using a Nitro plugin to process the data.

Create a Cloudflare Queue and bind the producer and consumer to the same Nuxt application

⚠️ Cloudflare Queues are only available on paid Worker Plans. Currently at $5 per month, plus extra usage charges.

We can create the vector sync queue using Wrangler CLI:

npx wrangler queues create vector-sync-queue
Enter fullscreen mode Exit fullscreen mode

And add the bindings in /wrangler.toml similar to:

[[queues.producers]]
queue = "vector-sync-queue"
binding = "VECTOR_SYNC_QUEUE"

[[queues.consumers]]
queue = "vector-sync-queue"
max_batch_size = 10    # Recommended: Process 10 vectors at a time
max_batch_timeout = 5  # Wait up to 5s to fill a batch
Enter fullscreen mode Exit fullscreen mode

Once you build and deploy, Cloudflare automatically configures your Nuxt Worker to act as the consumer for this queue.

Dispatch to a Cloudflare Queue from a Nuxt application through a Nitro Task

Next, we will write a Nitro Task that dispatches messages to our newly created queue. If you’re not following along from the previous article, make sure you add Nitro’s experimental tasks and async context to Nuxt’s config:

 nitro: {

    preset: 'cloudflare_module',

    experimental: {
      tasks: true,
      asyncContext: true,
    },
  },
Enter fullscreen mode Exit fullscreen mode

Let’s first add some types in /server/types/queues.ts:

import type {
  Ai,
  Queue,
  VectorizeIndex,
  D1Database,
  ExecutionContext,
} from '@cloudflare/workers-types'

export interface CloudflareEnv {
  AI: Ai;
  VECTORIZE: VectorizeIndex;
  VECTOR_SYNC_QUEUE: Queue;
  DB: D1Database

  [key: string]: unknown;
}

export interface VectorSyncQueueMessageBody {
  locationId: string;
}

export interface CloudflareTaskContext {
  cloudflare: {
    env: CloudflareEnv;
    context: ExecutionContext;
  };
}

Enter fullscreen mode Exit fullscreen mode

And create /server/tasks/build-location-embeddings.ts with:

import { drizzle } from 'drizzle-orm/d1'
import * as schema from '~~/server/database/schema'
import type { D1Database, Queue } from '@cloudflare/workers-types'
import { CloudflareTaskContext } from '~~/server/types/queues'

export default defineTask({
  meta: {
    name: 'build-location-embeddings',
    description: 'Queues all location IDs for vector embedding',
  },
  async run(event) {
    const context = event.context as CloudflareTaskContext
    const env = context.cloudflare?.env

    if (!env?.DB) {
      return { error: 'DB binding not found.' }
    }
    const db = drizzle(env.DB as D1Database, { schema })

    // Queue Init
    const queue = env.VECTOR_SYNC_QUEUE as Queue
    if (!queue) {
      return { error: 'Queue binding (VECTOR_SYNC_QUEUE) not found.' }
    }

    try {
      const allLocations = await db.query.locations.findMany({
        columns: { id: true },
      })

      if (allLocations.length === 0) return { result: 'No locations found in DB.' }

      const total = allLocations.length
      console.log(`[Task] Found ${total} locations. Dispatching to queue...`)

      const messages = allLocations.map((location) => ({
        body: { locationId: location.id },
      }))

      const CHUNK_SIZE = 10
      for (let i = 0; i < messages.length; i += CHUNK_SIZE) {
        const batch = messages.slice(i, i + CHUNK_SIZE)
        await queue.sendBatch(batch)
      }

      return {
        result: `Dispatched ${total} locations to VECTOR_SYNC_QUEUE.`,
      }
    } catch (error: any) {
      console.error('[Task] Error during queue dispatch:', error)
      return { error: error.message || 'Unknown error occurred.' }
    }
  },
})

Enter fullscreen mode Exit fullscreen mode

Now that the dispatcher is done. We can trigger the Nitro task from CLI, DevTools, CRON, or programmatically. Since Cloudflare’s Workers AI runs on remote infrastructure, we will build an API endpoint that triggers the Nitro task programmatically after we implement the listener and handler.

💡There are several ways you can safely test Cloudflare’s AI without touching the production environment. You can create bindings specific to a staging environment and also use the getPlatformProxy() wrangler function in tests.

Listen and handle Cloudflare Queue messages from a Nuxt application using a Nitro Plugin

With the queue dispatcher completed, we now need a Nitro Plugin to listen to the dispatched queue and handle it. In the next part of this article series, we will use another queue to synchronise the Agents’ listings. To avoid a massive queue handler class, we will split the listener and handler code.

The following is the handler that depends on the LocationVectorRepository interface we declared earlier.

Create the handler in /server/utils/queueHandlers/VectorSyncQueueHandler.ts with:

import {
  LocationVectorRepository,
} from '~~/server/utils/queueHandlers/repositories/LocationVectorRepository'
import { Location } from '~~/server/database/types'
import * as schema from '~~/server/database/schema'
import { drizzle } from 'drizzle-orm/d1'
import { inArray } from 'drizzle-orm'
import type { D1Database, MessageBatch } from '@cloudflare/workers-types'
import type {
  VectorSyncQueueMessageBody,
  CloudflareEnv,
} from '~~/server/types/queues'

export class VectorSyncQueueHandler {

  constructor (
    private readonly locationVectorRepository: LocationVectorRepository,
  ) {}

  public async handle (
    batch: MessageBatch<VectorSyncQueueMessageBody>,
    env: CloudflareEnv,
  ): Promise<void> {

    const messages = batch.messages
    const db = drizzle(env.DB as D1Database, { schema })

    // Extract Unique IDs
    const locationIds = new Set<string>()
    for (const message of messages) {
      if (message.body?.locationId) {
        locationIds.add(message.body.locationId)
      }
    }

    if (locationIds.size === 0) {
      console.log('[Vector Sync Handler] No valid locationIds found in batch.')
      return
    }

    try {
      const ids = Array.from(locationIds)

      // Fetch Data
      const locationsToUpsert: Location[] = await db.query.locations.findMany({
        where: inArray(schema.locations.id, ids),
      })

      if (locationsToUpsert.length > 0) {
        // Update Vector Index
        await this.locationVectorRepository.upsertLocations(locationsToUpsert)
        console.log(
          `[Vector Sync Handler] Successfully processed ${locationsToUpsert.length} locations.`)
      }

    } catch (error: any) {
      console.error('[Vector Sync Handler] Error:', error.message)
      // Throwing error triggers Cloudflare's automatic queue retry logic
      throw error
    }
  }
}

Enter fullscreen mode Exit fullscreen mode

The handler class above loops through the queue messages, retrieves the full location data from our D1 database, and uses the LocationVectorRepository to upsert the data into the Cloudflare Vectorize store.

Finally, we need a Nitro Plugin to hook into and thus listen to Cloudflare Queues. The plugin will be the entry point for all dispatched Cloudflare Queues and will delegate the specific handler based on the queue name.

Create the plugin at /server/plugins/queue-handler.ts with:

import { VectorSyncQueueHandler } from '~~/server/utils/queueHandlers/VectorSyncQueueHandler'
import { CloudflareVectorAIRepository } from '~~/server/repositories/cloudflare/CloudflareVectorAIRepository'
import type { VectorSyncQueueMessageBody, CloudflareEnv } from '~~/server/types/queues'
import type { MessageBatch } from '@cloudflare/workers-types'

export default defineNitroPlugin((nitroApp) => {
  nitroApp.hooks.hook('cloudflare:queue', async (payload) => {
    const batch = payload.batch
    const env = payload.env as CloudflareEnv

    if (batch.queue === 'vector-sync-queue') {
      console.log(`[Queue] Received batch of ${batch.messages.length} for ${batch.queue}`)

      try {
        const repo = new CloudflareVectorAIRepository(env.VECTORIZE, env.AI)
        const handler = new VectorSyncQueueHandler(repo)

        await handler.handle(
          batch as MessageBatch<VectorSyncQueueMessageBody>,
          env
        )

      } catch (error: any) {
        console.error('[Queue] Error processing batch:', error)
        // This will trigger a retry as per the wrangler.toml queue settings
        throw error
      }
    }
  })
})

Enter fullscreen mode Exit fullscreen mode

Populating the Cloudflare Vectorize index from our Nuxt application

We’ve built everything we need to populate the vector store with our location data, and we’re now ready for the most exciting part of this article.

We will use an API endpoint to run it remotely. Create /server/api/internals/tasks/build-location-embeddings.get.ts with:

export default defineEventHandler(async (event) => {

  const config = useRuntimeConfig();

  if (getHeader(event, 'x-secret') !== config.internalApiSecret) {
    throw createError({ statusCode: 401, statusMessage: 'Unauthorized' });
  }

  const result = await runTask(
    'build-location-embeddings',
    {
      payload: {},
      context: {
        cloudflare: event.context.cloudflare
      }
    }
  );

  return {
    status: 'Build Location Embeddings Task Triggered',
    result,
  };

})
Enter fullscreen mode Exit fullscreen mode

This endpoint triggers our Nitro Task programatically using the runTask() function. Notice we’re using the internal API secret we added as an environment variable to the Cloudflare Worker in the previous article. Also good to note that we’re setting the context to use Cloudflare’s context, since the Nitro Task depends on Cloudflare’s D1 database and the Vector Sync Queue.

With the API endpoint added, go ahead, deploy and test:

pnpm run build
npx wrangler deploy

curl -H "x-secret: [YOUR_SECRET]" "https://[YOUR_WORKER_URL]/api/internals/tasks/build-location-embeddings"
Enter fullscreen mode Exit fullscreen mode

In your Cloudflare worker dashboard, you can see the endpoint trigger and then the queue. We can also verify that the vector index is populated through Wrangler, but you need to wait a couple of minutes for the queue to be processed.

npx wrangler vectorize info locations-index

# Should return something like:

 ⛅️ wrangler 4.53.0
───────────────────
📋 Fetching index info...
┌────────────┬─────────────┬──────────────────────────────────────┬──────────────────────────┐
│ dimensions │ vectorCount │ processedUpToMutation                │ processedUpToDatetime    │
├────────────┼─────────────┼──────────────────────────────────────┼──────────────────────────┤
│ 1024       │ 4           │ aa21b6a6-e25f-4b1c-afa8-98846e801ec6 │ 2025-12-08T09:47:59.146Z │
└────────────┴─────────────┴──────────────────────────────────────┴──────────────────────────┘

npx wrangler vectorize list-vectors locations-index

# Should list the Location ID:

 ⛅️ wrangler 4.53.0
───────────────────
📋 Listing vectors in index 'locations-index'...
┌───┬──────────────────────────────────────┐
│ # │ Vector ID                            │
├───┼──────────────────────────────────────┤
│ 1 │ 1e212fe9-3994-4c7a-a90f-22df3fdee8d5 │
├───┼──────────────────────────────────────┤
│ 2 │ 0168e16d-0571-466d-809b-d717e58d7cab │
├───┼──────────────────────────────────────┤
│ 3 │ 1818d8a6-baf1-4780-b6d7-a8e6f45b3369 │
├───┼──────────────────────────────────────┤
│ 4 │ 76d29d74-5dfe-4a46-b15d-802e976d5250 │
└───┴──────────────────────────────────────┘

Showing 4 of 4 total vectors

Enter fullscreen mode Exit fullscreen mode

Moving forward with implementing our Semantic Matching feature

The final part of this Nuxt & Cloudflare AI Vector Pipeline Series is where the magic happens: we will implement the location semantic matching to tie everything together.

I hope you’re enjoying this series! As always, feel free to ask questions in the comments below or on social media. Please do share this with anyone you think might find it useful, and subscribe if you’d like me to email you when I prepare more code examples and articles.

Peace ✌🏽

Need help with Nuxt & Cloudflare?

Keith Mifsud is an Official Nuxt Agency Partner, learn more.

Top comments (0)