DEV Community

Cover image for Stream from S3 in Real-Time: CocoIndex Brings True Incremental Processing to the Cloud
Linghua Jin for CocoIndex

Posted on

Stream from S3 in Real-Time: CocoIndex Brings True Incremental Processing to the Cloud

🚀 CocoIndex https://github.com/cocoindex-io/cocoindex Now Supports Amazon S3 for Native, Real-Time Incremental Data Processing

Incremental Processing

We’re excited to announce a major update to CocoIndex:
You can now natively connect Amazon S3 as a data source—and, when combined with AWS Simple Queue Service (SQS), CocoIndex enables true real-time, incremental processing of your data as it’s updated.

This upgrade makes CocoIndex a seamless drop-in for cloud-native teams that rely on S3 for storage and want to power real-time, up-to-date, AI-driven experiences—without reprocessing the world every time a file changes.

We are constantly improving and more features and examples are coming! Stay tuned with latest updates from CocoIndex by star ⭐ our github repo: https://github.com/cocoindex-io/cocoindex

Why Incremental Processing?

Traditional data pipelines often rely on batch processing—recomputing everything at a fixed interval. That’s fine at small scales. But once your data grows—or freshness becomes critical—that model starts to break down.

Incremental processing means focusing only on what’s changed: the new or modified data since the last run. This shift becomes essential in a world where data volume is growing, compute costs are rising, and users (or AI systems) demand the most up-to-date information.

When Incremental Processing Truly Matters

Here are a few key scenarios where this shift isn’t just a nice-to-have—it’s a necessity:

Freshness is Critical

When your app is user-facing, data freshness directly affects trust and usability. If someone updates a document, they expect that update to be reflected instantly in search results or AI responses.

Stale data doesn’t just break UX—it can lead to misleading outputs from downstream systems, especially AI. The worst part? Users often don’t realize the data is outdated until it’s too late.

Compute is Expensive

AI-driven systems often require heavy transformations: generating embeddings, fine-tuning models, cleaning unstructured data, etc.

Rerunning those processes on unchanged data leads to wasted cycles and bloated bills. Incremental processing avoids this waste by doing only the necessary work.

Working at Scale

Reprocessing terabytes—or even petabytes—of data every time there's a small change is simply not sustainable. CocoIndex makes scale manageable by processing only what’s necessary.

What Makes CocoIndex’s Approach Unique?

With the new S3 and SQS support, CocoIndex brings you:

Optimized Resource Utilization

We scan for new or updated S3 files only—no need to poll or manually manage sync logic. Compute cycles are focused on just the data that changed.

True Real-Time Processing with AWS SQS

CocoIndex listens to S3 change events via SQS queues. This means that as soon as a file is uploaded or modified, it can trigger downstream transformation—without waiting for a batch job or cron schedule.

Built-In State & Lineage Tracking

CocoIndex maintains a persistent internal state. It remembers what’s already been processed and tracks how each file (or chunk) was transformed. That enables reproducibility and auditability across runs.

Fine-Grained Smart Caching

Instead of reprocessing entire files, CocoIndex dives deeper. Files are broken into logical chunks. If only a subset of those chunks change, we only recompute the affected parts.

This is especially powerful in workflows like vector indexing, where only modified sections need new embeddings.

Real-Time Data Processing for the AI Era

If your systems depend on accurate, timely data—and your datasets are too large to rebuild every few hours—incremental, event-driven processing isn’t a luxury. It’s the only path forward.

With this new S3 + SQS integration, CocoIndex makes it easy to bring your existing cloud storage into a real-time, intelligent, and cost-efficient processing workflow—out of the box.

What Is Amazon SQS (Simple Queue Service)?

Amazon SQS (Simple Queue Service) is a fully managed message queuing service from AWS that helps you decouple, scale, and streamline communication between microservices, distributed systems, and serverless applications.

With Amazon SQS, developers can build reliable, fault-tolerant systems by using scalable, secure message queues that temporarily store messages as they move between different components of a cloud-native application.

Designed for high availability and performance, SQS supports both standard queues (high throughput, at-least-once delivery) and FIFO queues (exactly-once processing, ordered messages), making it ideal for a wide range of use cases, from real-time data pipelines to asynchronous task processing.

When files are uploaded to or modified in S3, SQS receives notifications about these changes and queues them as messages. Each message contains metadata about the S3 event, such as:

The type of event (e.g., ObjectCreated, ObjectRemoved)

  • The S3 bucket name
  • The object key (file path)
  • Timestamp of the event
  • Other relevant metadata

These messages remain in the queue until they are successfully processed, ensuring no events are lost even if the processing system is temporarily unavailable.

Live update out of the box with SQS

CocoIndex provides two modes to run your pipeline, one time update and live update, both leverage the incremental processing. Particularly with AWS SQS, you could leverage the live update mode -
where CocoIndex continuously monitors and reacts to the events in SQS, updating the target data in real-time. This is ideal for use cases where data freshness is critical.

How does it work?

Let's take a look at simple example of how to build a real-time data transformation pipeline with S3 and CocoIndex. It builds a vector database of text embeddings from markdown files in S3. You could find a similar example to process local files in this blog.

S3 bucket and SQS setup

Please follow the documentation here to setup S3 bucket and SQS queue.

S3 bucket

  • Creating an AWS account.
  • Configuring IAM permissions.
  • Configure policies. You'll need at least the AmazonS3ReadOnlyAccess policy, and if you want to enable change notifications, you'll also need the AmazonSQSFullAccess policy.

Permission Config

SQS queue

For real-time change detection, you'll need to create an SQS queue and configure it to receive notifications from your S3 bucket.
Please follow the documentation to configure the S3 bucket to send event notifications to the SQS queue.

SQS

Particularly, the SQS queue needs a specific access policy that allows S3 to send messages to it.

{
  ...
  "Statement": [
    ...
    {
      "Sid": "__publish_statement",
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Resource": "${SQS_QUEUE_ARN}",
      "Action": "SQS:SendMessage",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "${S3_BUCKET_ARN}"
        }
      }
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Then you can upload your files to the S3 bucket.

s3

Define Indexing Flow

Flow Design

Flow Design

The flow diagram illustrates how we'll process our codebase:

  1. Read text files from the Amazon S3 bucket
  2. Chunk each document
  3. For each chunk, embed it with a text embedding model
  4. Store the embeddings in a vector database for retrieval

AWS File Ingestion

Define the AWS endpoint and the SQS queue name in .env file:

# Database Configuration
DATABASE_URL=postgresql://localhost:5432/cocoindex

# Amazon S3 Configuration
AMAZON_S3_BUCKET_NAME=your-bucket-name
AMAZON_S3-SQS_QUEUE_URL=https://sqs.us-west-2.amazonaws.com/123456789/S3ChangeNotifications
Enter fullscreen mode Exit fullscreen mode

Define indexing flow and ingest from Amazon S3 SQS queue:

@cocoindex.flow_def(name="AmazonS3TextEmbedding")
def amazon_s3_text_embedding_flow(
    flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
):
    bucket_name = os.environ["AMAZON_S3_BUCKET_NAME"]
    prefix = os.environ.get("AMAZON_S3_PREFIX", None)
    sqs_queue_url = os.environ.get("AMAZON_S3_SQS_QUEUE_URL", None)

    data_scope["documents"] = flow_builder.add_source(
        cocoindex.sources.AmazonS3(
            bucket_name=bucket_name,
            prefix=prefix,
            included_patterns=["*.md", "*.mdx", "*.txt", "*.docx"],
            binary=False,
            sqs_queue_url=sqs_queue_url,
        )
    )

Enter fullscreen mode Exit fullscreen mode

This defines a flow that reads text files from the Amazon S3 bucket.

Ingest

Rest of the flow

For the rest of the flow, we can follow the tutorial in this blog.
The entire project is available here.

Run the flow with live update

cocoindex update main.py -L
Enter fullscreen mode Exit fullscreen mode

-L option means live update, see the documentation for more details.
And you will have a continuous long running process that will update the vector database with any updates in the S3 bucket.

If you need to run the server and indexing in the same process, without spawning two separate jobs, you can add the FlowLiveUpdater in the main() function, see the example here.
And you can run the server with python3 main.py.

Support us

We are constantly improving, and more features and examples are coming soon. If you love this article, please give us a star ⭐ at GitHub to help us grow.

Thanks for reading!

Top comments (0)