DEV Community

Yasir Jafri
Yasir Jafri

Posted on

Concurrency Patterns: Active Object

Introduction

The Active Object Pattern is a concurrency design pattern that decouples method execution from method invocation. The primary goal of this pattern is to introduce asynchronous behavior by executing operations in a separate thread, while providing a synchronous interface to the client. This is achieved using a combination of message passing, request queues, and scheduling mechanisms.

Key Components

  1. Proxy: Represents the public interface to the client. In even simpler terms, this is what the client is going to interact to. It translates method calls into requests for the active object.
  2. Scheduler: Manages the request queue and determines the order of request execution.
  3. Servant: Contains the actual implementation of the methods being invoked. This is where actual computation logic goes.
  4. Activation Queue: Stores the requests from the proxy until the scheduler processes them.
  5. Future/Callback: A placeholder for the result of an asynchronous computation.

Workflow

  1. A client invokes a method on the proxy.
  2. The proxy creates a request and places it in the activation queue.
  3. The scheduler picks up the request and forwards it to the servant for execution.
  4. The result is returned to the client via a future object.

Use Cases

  • Real-time systems requiring predictable execution patterns.
  • GUI applications to keep the main thread responsive.
  • Distributed systems for handling asynchronous requests.

Implementation

Let's say we need to do a computation, maybe a API call, a database query, etc. I am not going to implement any exception handling because I am too lazy.

def compute(x, y):
    time.sleep(2)  # Some time taking task
    return x + y
Enter fullscreen mode Exit fullscreen mode

Without Active Object Pattern

Below is an example of how we might handle concurrent requests without using the Active Object Pattern.

import threading
import time


def main():
    # Start threads directly
    results = {}

    def worker(task_id, x, y):
        results[task_id] = compute(x, y)

    print("Submitting tasks...")
    thread1 = threading.Thread(target=worker, args=(1, 5, 10))
    thread2 = threading.Thread(target=worker, args=(2, 15, 20))

    thread1.start()
    thread2.start()

    print("Doing other work...")

    thread1.join()
    thread2.join()

    # Retrieve results
    print("Result 1:", results[1])
    print("Result 2:", results[2])


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Drawbacks of the Above Approach

  • Thread Management: Direct management of threads increases complexity, especially as the number of tasks grows.

  • Lack of Abstraction: The client is responsible for managing the lifecycle of threads, coupling task management with business logic.

  • Scalability Issues: Without a proper queue or scheduling mechanism, there’s no control over task execution order.

  • Limited Responsiveness: The client has to wait for threads to join before accessing results.

Implementation using Active Object Pattern

Below is a Python implementation of the Active Object Pattern using threading and queues for doing the same thing as above. We'll walk through each part one by one:

MethodRequest: Encapsulates the method, arguments, and a Future to store the result.

class MethodRequest:
    def __init__(self, method, args, kwargs, future):
        self.method = method
        self.args = args
        self.kwargs = kwargs
        self.future = future

    def execute(self):
        try:
            result = self.method(*self.args, **self.kwargs)
            self.future.set_result(result)
        except Exception as e:
            self.future.set_exception(e)
Enter fullscreen mode Exit fullscreen mode

Scheduler: Continuously processes requests from the activation_queue in a separate thread.

import threading
import queue


class Scheduler(threading.Thread):
    def __init__(self):
        super().__init__()
        self.activation_queue = queue.Queue()
        self._stop_event = threading.Event()

    def enqueue(self, request):
        self.activation_queue.put(request)

    def run(self):
        while not self._stop_event.is_set():
            try:
                request = self.activation_queue.get(timeout=0.1)
                request.execute()
            except queue.Empty:
                continue

    def stop(self):
        self._stop_event.set()
        self.join()
Enter fullscreen mode Exit fullscreen mode

Servant: Implements the actual logic (e.g., the compute method).

import time


class Servant:
    def compute(self, x, y):
        time.sleep(2)
        return x + y
Enter fullscreen mode Exit fullscreen mode

Proxy: Translates method calls into requests and returns a Future for the result.

from concurrent.futures import Future


class Proxy:
    def __init__(self, servant, scheduler):
        self.servant = servant
        self.scheduler = scheduler

    def compute(self, x, y):
        future = Future()
        request = MethodRequest(self.servant.compute, (x, y), {}, future)
        self.scheduler.enqueue(request)
        return future
Enter fullscreen mode Exit fullscreen mode

Client: Submits tasks asynchronously and retrieves results when needed.

def main():
    # Initialize components
    scheduler = Scheduler()
    scheduler.start()

    servant = Servant()
    proxy = Proxy(servant, scheduler)

    # Client makes an asynchronous call
    print("Submitting tasks...")
    future1 = proxy.compute(5, 10)
    future2 = proxy.compute(15, 20)

    # Perform other tasks while computation is ongoing
    print("Doing other work...")

    # Retrieve results
    print("Result 1:", future1.result())
    print("Result 2:", future2.result())

    # Shutdown scheduler
    scheduler.stop()

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Advantages

  • Decoupled Interface: Clients can invoke methods without worrying about the execution details.
  • Responsiveness: Asynchronous execution ensures that the client remains responsive.
  • Scalability: Supports multiple concurrent requests.

Disadvantages

  • Complexity: Increases architectural complexity.
  • Overhead: Requires additional resources for managing threads and queues.
  • Latency: Asynchronous processing may introduce additional latency.

Conclusion

The Active Object Pattern is a powerful tool for managing asynchronous operations in multi-threaded environments. By separating method invocation from execution, it ensures better responsiveness, scalability, and a cleaner codebase. While it comes with some complexity and potential performance overhead, its benefits make it an excellent choice for scenarios requiring high concurrency and predictable execution. However, its use depends on the specific problem at hand. As with most patterns and algorithms, there is no one-size-fits-all solution.

References

Wikipedia - Active Object

Top comments (0)