DEV Community

Rebecca Anderson
Rebecca Anderson

Posted on

How to Handle Modbus Bus Collisions When Multiple Python Scripts Poll the Same RS485 Sensor

This is the normal IoT developer experience: You write a Python script using pymodbus to read data from an RS485 soil sensor or a solar inverter. It runs perfect on your test bench for hours,

But then, the project scales up. You spin up a second Python script on the same machine to handle real-time MQTT cloud synchronization. Or perhaps the factory's main SCADA system suddenly needs to poll that exact same hardware sensor at the same time.

Boom. Your terminal fills up with ModbusIOException, timeout errors, or complete serial port lockups.

If you are currently trying to fix this by writing complex software semaphores or cross-process locks in Python, stop. Let’s look at why this happens at the hardware level and how to solve it properly through architecture.


The Root Cause: RS485 is Strictly Single-Master

The fundamental limitation isn't your Python code; it's the physics of the RS485 bus.

RS485 is a half-duplex serial interface. This means data can travel in both directions, but only one device can talk on the wire at any given microsecond. The Modbus RTU protocol is strictly a Master-Slave architecture. The Master sends a query, the Slave responds.

When you launch two independent Python scripts executing parallel polling loops, they have no awareness of each other's timing. Eventually, a race condition occurs: both scripts attempt to write to the serial port simultaneously.

The electrical signals collide on the copper wire, resulting in garbage bytes, broken frames, and catastrophic CRC failures.

[Script A] ---- Query (01 03 00 00 ...) ---->  ⚑ COLLISION ⚑
[Script B] ---- Query (01 03 00 0A ...) ---->  [ RS485 Bus Blown ]
Enter fullscreen mode Exit fullscreen mode

Why Software Locks SUCK Here

Your first instinct might be to implement a file lock (fcntl), a Redis-based distributed lock, or a Python threading.Lock() to serialize the access to the serial port.

While this prevents data collisions, it introduces massive software engineering overhead:

  1. Latency Spikes: If Script A is stuck waiting for a slow sensor response, Script B is completely blocked, leading to data gaps in your cloud database.
  2. Deadlocks: If one script crashes while holding the serial lock, the entire data acquisition system freezes until a hard reboot.

The Architectural Fix: Decoupling via Cache

The elegant way to solve multi-host collision is to completely offload the serial polling task from your Python applications. Instead of letting your scripts talk directly to the raw serial line, you place a hardware protocol gateway in between that supports autonomous register caching.

In this architecture, the gateway is the only Master on the RS485 physical bus. It constantly polls the hardware sensors at the physical layer, and caches the latest register data directly in its own high speed internal RAM cache.

Your Python scripts can no longer connect to serial ports. Instead, they make normal, simultaneous Modbus TCP queries over Ethernet (Port 502).

[Python Script A (TCP)] --\
                           +---> [ Caching Gateway ] ---> (Stable RS485 Polling) ---> [Sensors]
[Python Script B (TCP)] --/
Enter fullscreen mode Exit fullscreen mode

The Code: Clean Concurrent Modbus TCP Polling

When you shift to a caching architecture, your Python code becomes remarkably clean. Because Modbus TCP inherently handles multiple network sockets, you can run as many parallel scripts, containers, or threads as you want without a single lock.

Here is a clean implementation simulating two independent applications querying the network simultaneously:

from pymodbus.client import ModbusTcpClient
import threading
import time

# Point this to your hardware caching gateway IP
GATEWAY_IP = '192.168.1.200'
GATEWAY_PORT = 502

def application_task(client_id, register_address):
    # Each independent script or thread opens its own clean TCP socket
    client = ModbusTcpClient(GATEWAY_IP, port=GATEWAY_PORT, timeout=2)

    if not client.connect():
        print(f"App {client_id}: Network unreachable.")
        return

    print(f"App {client_id}: Connected to gateway port 502.")

    while True:
        try:
            # Query the cached hardware registers instantly (usually under 3ms)
            response = client.read_holding_registers(address=register_address, count=2, slave=1)

            if not response.isError():
                print(f"App {client_id} Data Received: {response.registers}")
            else:
                print(f"App {client_id} Modbus Error: {response}")

        except Exception as e:
            print(f"App {client_id} Network Exception: {e}")

        # Poll as fast as you want; no serial collisions possible
        time.sleep(1) 

if __name__ == '__main__':
    # Simulating two completely separate applications running in parallel
    app_cloud_sync = threading.Thread(target=application_task, args=(1, 0))
    app_local_scada = threading.Thread(target=application_task, args=(2, 10))

    app_cloud_sync.start()
    app_local_scada.start()
Enter fullscreen mode Exit fullscreen mode

Hardware Layer Note

To make this architecture work seamlessly in production environments, your hardware gateway must handle the scheduling and isolation. A passive, cheap serial-to-ethernet converter that just passes raw bytes will still cause collisions if two TCP packets arrive at the same time.

In our commercial field deployments for remote solar farms, we completely shifted away from passive dongles to the industrial multi-channel storage gateways manufactured by Valtoris. Their hardware natively manages the asynchronous scheduling, features 3000V galvanic isolation to block ground loops on factory floors, and supports up to 32 concurrent Modbus TCP client connections.

By moving the bus synchronization from unstable Python scripts down to dedicated hardware, your software can focus entirely on data processing and analytics, making your entire IoT stack infinitely more resilient.

Top comments (0)