Introduction
The Active Object Pattern is a concurrency design pattern that decouples method execution from method invocation. The primary goal of this pattern is to introduce asynchronous behavior by executing operations in a separate thread, while providing a synchronous interface to the client. This is achieved using a combination of message passing, request queues, and scheduling mechanisms.
Key Components
- Proxy: Represents the public interface to the client. In even simpler terms, this is what the client is going to interact to. It translates method calls into requests for the active object.
- Scheduler: Manages the request queue and determines the order of request execution.
- Servant: Contains the actual implementation of the methods being invoked. This is where actual computation logic goes.
- Activation Queue: Stores the requests from the proxy until the scheduler processes them.
- Future/Callback: A placeholder for the result of an asynchronous computation.
Workflow
- A client invokes a method on the proxy.
- The proxy creates a request and places it in the activation queue.
- The scheduler picks up the request and forwards it to the servant for execution.
- The result is returned to the client via a future object.
Use Cases
- Real-time systems requiring predictable execution patterns.
- GUI applications to keep the main thread responsive.
- Distributed systems for handling asynchronous requests.
Implementation
Let's say we need to do a computation, maybe a API call, a database query, etc. I am not going to implement any exception handling because I am too lazy.
def compute(x, y):
time.sleep(2) # Some time taking task
return x + y
Without Active Object Pattern
Below is an example of how we might handle concurrent requests without using the Active Object Pattern.
import threading
import time
def main():
# Start threads directly
results = {}
def worker(task_id, x, y):
results[task_id] = compute(x, y)
print("Submitting tasks...")
thread1 = threading.Thread(target=worker, args=(1, 5, 10))
thread2 = threading.Thread(target=worker, args=(2, 15, 20))
thread1.start()
thread2.start()
print("Doing other work...")
thread1.join()
thread2.join()
# Retrieve results
print("Result 1:", results[1])
print("Result 2:", results[2])
if __name__ == "__main__":
main()
Drawbacks of the Above Approach
Thread Management: Direct management of threads increases complexity, especially as the number of tasks grows.
Lack of Abstraction: The client is responsible for managing the lifecycle of threads, coupling task management with business logic.
Scalability Issues: Without a proper queue or scheduling mechanism, there’s no control over task execution order.
Limited Responsiveness: The client has to wait for threads to join before accessing results.
Implementation using Active Object Pattern
Below is a Python implementation of the Active Object Pattern using threading and queues for doing the same thing as above. We'll walk through each part one by one:
MethodRequest: Encapsulates the method, arguments, and a Future
to store the result.
class MethodRequest:
def __init__(self, method, args, kwargs, future):
self.method = method
self.args = args
self.kwargs = kwargs
self.future = future
def execute(self):
try:
result = self.method(*self.args, **self.kwargs)
self.future.set_result(result)
except Exception as e:
self.future.set_exception(e)
Scheduler: Continuously processes requests from the activation_queue
in a separate thread.
import threading
import queue
class Scheduler(threading.Thread):
def __init__(self):
super().__init__()
self.activation_queue = queue.Queue()
self._stop_event = threading.Event()
def enqueue(self, request):
self.activation_queue.put(request)
def run(self):
while not self._stop_event.is_set():
try:
request = self.activation_queue.get(timeout=0.1)
request.execute()
except queue.Empty:
continue
def stop(self):
self._stop_event.set()
self.join()
Servant: Implements the actual logic (e.g., the compute
method).
import time
class Servant:
def compute(self, x, y):
time.sleep(2)
return x + y
Proxy: Translates method calls into requests and returns a Future
for the result.
from concurrent.futures import Future
class Proxy:
def __init__(self, servant, scheduler):
self.servant = servant
self.scheduler = scheduler
def compute(self, x, y):
future = Future()
request = MethodRequest(self.servant.compute, (x, y), {}, future)
self.scheduler.enqueue(request)
return future
Client: Submits tasks asynchronously and retrieves results when needed.
def main():
# Initialize components
scheduler = Scheduler()
scheduler.start()
servant = Servant()
proxy = Proxy(servant, scheduler)
# Client makes an asynchronous call
print("Submitting tasks...")
future1 = proxy.compute(5, 10)
future2 = proxy.compute(15, 20)
# Perform other tasks while computation is ongoing
print("Doing other work...")
# Retrieve results
print("Result 1:", future1.result())
print("Result 2:", future2.result())
# Shutdown scheduler
scheduler.stop()
if __name__ == "__main__":
main()
Advantages
- Decoupled Interface: Clients can invoke methods without worrying about the execution details.
- Responsiveness: Asynchronous execution ensures that the client remains responsive.
- Scalability: Supports multiple concurrent requests.
Disadvantages
- Complexity: Increases architectural complexity.
- Overhead: Requires additional resources for managing threads and queues.
- Latency: Asynchronous processing may introduce additional latency.
Conclusion
The Active Object Pattern is a powerful tool for managing asynchronous operations in multi-threaded environments. By separating method invocation from execution, it ensures better responsiveness, scalability, and a cleaner codebase. While it comes with some complexity and potential performance overhead, its benefits make it an excellent choice for scenarios requiring high concurrency and predictable execution. However, its use depends on the specific problem at hand. As with most patterns and algorithms, there is no one-size-fits-all solution.
Top comments (0)