DEV Community

Cover image for MongoDB equivalent to FOR UPDATE SKIP LOCKED
Franck Pachot
Franck Pachot

Posted on

MongoDB equivalent to FOR UPDATE SKIP LOCKED

SELECT ... FOR UPDATE SKIP LOCKED is a vendor-specific SQL feature available in several relational databases (e.g., PostgreSQL, Oracle, MySQL). It helps parallel workers avoid waiting on locked rows. MongoDB’s concurrency model uses optimistic concurrency: reads don’t block writes, and writes don’t block reads. To coordinate parallel processing, you can reserve a document by writing a lock field so other workers skip it.

I'll use an example discussed in the Reddit question "ACID read then write – Python":

Client in python, multi process. Each process picks and reads one document, calls some public APIs, and add data to the document and save it. Then next document. What is written can depend on the read data. Question is: in python, how can I create and configure transactions in the code to make sure no other process can read or write its current document from the moment a process starts reading it until done writing its additional data? This means concurrent reads should not happen…

In this example, I'll process messages based on their originating IP address. Multiple threads will enrich them with location data fetched from the public API at https://ip-api.com/.

Here is an example of an initial document:

{
  _id: ObjectId('6956e772baea71e37a818e73'),
  originatingIp: '1.1.1.1',
  location: null
}
Enter fullscreen mode Exit fullscreen mode

Here is the document while it is being processed:

{
  _id: ObjectId('6956e772baea71e37a818e73'),
  originatingIp: '1.1.1.1',
  location: null,
  lock: {
    by: 'franck',
    until: datetime.datetime(2026, 1, 1, 22, 33, 10, 833000)
  }
}
Enter fullscreen mode Exit fullscreen mode

Here is the same document after processing:

{
  _id: ObjectId('6956e772baea71e37a818e73'),
  originatingIp: '1.1.1.1',
  location: {
    status: 'success',
    country: 'Hong Kong',
    countryCode: 'HK',
    region: 'HCW',
    regionName: 'Central and Western District',
    city: 'Hong Kong',
    zip: '',
    lat: 22.3193,
    lon: 114.1693,
    timezone: 'Asia/Hong_Kong',
    isp: 'Cloudflare, Inc',
    org: 'APNIC and Cloudflare DNS Resolver project',
    as: 'AS13335 Cloudflare, Inc.',
    query: '1.1.1.1'
  }
}
Enter fullscreen mode Exit fullscreen mode

Storing in-process information avoids long transactions that hide the current status and make troubleshooting difficult when the public API is slow.

Design

This script is designed as a complete, runnable demonstration of how to implement SELECT ... FOR UPDATE SKIP LOCKED-style parallel job claiming in MongoDB. The script will generate everything it needs, process it, and show the end state.

  • insert_test_docs() inserts test data with random IP addresses in a new collection "message", and creates a partial index to get the message to process ({location: null}).
  • claim_document() updates a message to process, adding lock information so that another thread will not pick the same, and fetches the document. The criteria are that it must be processed ({location: null}) and not locked, or the lock must have expired (with a 1s grace to account for clock skew).
  • fetch_location() is the call to the public API, here getting location information for an IP address.
  • process_document() calls claim_document() to get a message to process, with a lock. It calls fetch_location() and updates the document with the location. It ensures the lock is still in place before the update, then unsets it. Each thread runs in a loop, claiming and processing documents until the timeout.
  • main() calls those in sequence and displays the final documents.

This solution avoids explicit transactions, which is preferable because they would include a call to a public API with unpredictable response time. It also avoids using findOneAndUpdate, whose higher overhead comes from storing full pre- and post-images of documents for retryable operations. For large documents—possible in real workloads, even if not shown in this demo—this would lead to significant write amplification. Finally, setting an expiration timestamp allows automatic re-processing if a message fails.

Code

Below is the complete Python program, which you can test using different numbers of documents and threads:

import os
import random
import socket
import threading
import time
from datetime import datetime, timedelta
import requests
from pymongo import MongoClient


# Mongo connection and collection
client = MongoClient("mongodb://127.0.0.1:27017/?directConnection=true")
db = client.test
messages = db.message

#  Test settings (the test inserts documents, then runs the processing threads for some duration)
DOCUMENTS = 10 # number of documents created initially
THREADS = 5    # number of threads that loop to claim a document
SECONDS = 15   # thread stops looping on claim

# Worker identity (to identify the thread, and set an expiration on the lock)
WORKER_ID = f"{socket.gethostname()}-{os.getpid()}"
LOCK_DURATION = timedelta(seconds=60) # assumes processing completes within that duration, if not, it will be claimed by another, and this one will not update it

# Get the time
def utcnow(): return datetime.utcnow()
MAX_CLOCK_SKEW=timedelta(seconds=1) # used as a grace period when lock is expired

# --- Prepare test messages (with random generated IP) ---
def insert_test_docs():
    # Drop the collection completely (removes data + indexes)
    messages.drop()
    # Create the partial index for unprocessed docs  (they have location = null )
    messages.create_index(  [("lock.until", 1)],  partialFilterExpression={"location": None}  )
    # Generate random IPs for the test
    ips = [
        ".".join(str(random.randint(1, 255)) for _ in range(4))
        for _ in range(DOCUMENTS)
    ]
    # Explicitly set location=None to match the partial index filter
    docs = [
        { "originatingIp": ip, "location": None  } # A null location is the marker to process it
        for ip in ips
    ]
    messages.insert_many(docs)
    print(f"[STARTUP] Inserted {DOCUMENTS} test docs into 'message'")
    for doc in messages.find({}, {"_id": 0, "originatingIp": 1, "location": 1}):
        print(doc)


# --- Claim a message ---
def claim_document():
    now = utcnow()
    lock_expiry = now + LOCK_DURATION
    token = random.randint(1000, 9999)  # unique lock token for extra safety
    # Atomic lock claim: match unlocked or steal locks expired
    result = messages.update_one(
        {
          "$and": [
            # the location is not set
            { "location": None },
            # the document is not locked, or locked expired including grace period
            {  "$or": [  { "lock": { "$exists": False } },  { "lock.until": { "$lt": now - MAX_CLOCK_SKEW } }  ]  }
          ]
        },
        { "$set": {  "lock": {  "by": WORKER_ID,  "until": lock_expiry,  "token": token  }  }}
    )
    if result.modified_count == 0:
        return None
    # Fetch exactly the doc we locked — match by worker, expiry, AND token
    doc = messages.find_one({  "lock.by": WORKER_ID,  "lock.until": lock_expiry,  "lock.token": token  })
    if doc:
        print(f"[{WORKER_ID}] {threading.current_thread().name} claimed IP {doc['originatingIp']} with token={token}")
    else:
        print(f"[{WORKER_ID}] {threading.current_thread().name} claim succeeded but fetch failed — possible race?")
    return doc

# --- Call the public API ---
def fetch_location(ip):
    url = f"http://ip-api.com/json/{ip}"
    try:
        resp = requests.get(url, timeout=30)
        if resp.status_code == 200:
            return resp.json()
        print(f"[API] Error: HTTP {resp.status_code} for {ip}")
        return None
    except Exception as e:
        print(f"[API] Exception for {ip}: {e}")
        return None

# --- Process messages in a loop ---
def process_document():
    start_time = time.time()
    timeout = SECONDS  # seconds
    thread_name = threading.current_thread().name
    while True:
        # Try to claim a doc
        doc = claim_document()
        if doc:
            # We successfully claimed a doc — process it
            ip = doc["originatingIp"]
            location_data = fetch_location(ip)
            if not location_data:
                print(f"[{WORKER_ID}] {thread_name} failed to fetch location for {ip}")
                return
            # Final update only if lock is still valid
            now = utcnow()
            result = messages.update_one(
                {
                    "_id": doc["_id"],
                    "lock.by": WORKER_ID,
                    "lock.until": {"$gte": now},
                    "lock.token": doc["lock"]["token"]
                },
                {
                    "$set": {"location": location_data},
                    "$unset": {"lock": ""}
                }
            )
        # No doc claimed — check elapsed time before wait and retry
        elapsed = time.time() - start_time
        if elapsed >= timeout:
            print(f"[{WORKER_ID}] {thread_name} exiting after {elapsed:.2f}s")
            return
        time.sleep(5)  # avoid hammering DB and the public API

# --- Initialize and run multiple processing threads ---
def main():
    print(f"\nInserting documents")
    insert_test_docs()
    print(f"\nStarting threads")
    threads = []
    for i in range(THREADS):
        tname = f"T{i}"
        t = threading.Thread(target=process_document, name=tname)
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    print(f"\n[{WORKER_ID}] Check final documents:")
    for doc in messages.find({}, {"originatingIp": 1, "location.query": 1, "location.country": 1, "location.message": 1, "lock.by": 1, "lock.until": 1}):
        print(doc)

if __name__ == "__main__":
    main()

Enter fullscreen mode Exit fullscreen mode

Technical Insights

MongoDB’s storage engine guarantees atomicity for each update_one through its WriteUnitOfWork and RecoveryUnit mechanisms. However, maintaining read consistency across multiple operations requires application-level coordination. In this implementation, that coordination is provided by an atomic claim with conditional criteria, ensuring that only one worker can lock an unprocessed or expired document at a time.

Several safeguards mitigate race conditions. The claim step narrows matches using the worker ID, lock expiry, and a random token. The final update then re-verifies all these fields before committing changes, preventing stale or stolen locks from being applied. Lock expiration enables automatic recovery from failures, and a small grace window accounts for clock skew in distributed systems.

Write conflicts during concurrent updates are automatically resolved at the storage layer via optimistic concurrency control. This ensures correctness without blocking other operations. The result is a robust, non-blocking parallel processing workflow that preserves document-level ACID guarantees while scaling effectively in shared or cloud environments.

In this design, each thread processes one message at a time, in index order. Enforcing strict global message ordering would be more complex. The primary goal here is the scalability of the parallel processing.

Final Recommendation

When migrating from PostgreSQL to MongoDB—like between any two databases—avoid a direct feature-by-feature mapping, because the systems are fundamentally different. SKIP LOCKED works around blocking FOR UPDATE reads in PostgreSQL, while reads and writes do not block in MongoDB. Instead of replicating another database behavior, clarify the business requirement and design the most appropriate solution. In this example, rather than relying on generic transaction control like SQL, we modeled object states—such as claim acquisition and expiration—and store that state directly in the documents.

Top comments (0)