π Go-like WaitGroup Pattern in Async OCR
π Location of WaitGroup Implementation
The Go-like WaitGroup pattern is implemented in:
- 
File: 
src/core/async_ocr_integration.py - 
Class: 
AsyncOCRWaitGroup(lines 28-84) - 
Usage: 
_process_concurrent_operations()method 
π How the WaitGroup Works (Like Go's sync.WaitGroup)
1. WaitGroup Initialization
# Line 189-190: Create WaitGroup for coordinating async operations
wait_group = AsyncOCRWaitGroup()
2. Add Operations to WaitGroup
# Line 330: Add N operations to WaitGroup (like Go's wg.Add(n))
await wait_group.add(len(all_operations))
Output: π’ [AsyncOCRWaitGroup] Counter: 3 (delta: 3)
3. Start Concurrent Operations
# Lines 332-340: Create async tasks for each detection
tasks = []
for i, operation in enumerate(all_operations):
    task = asyncio.create_task(
        self._process_single_operation_safe(operation, wait_group, updated_detections, i)
    )
    tasks.append(task)
4. Each Operation Signals Completion
# Line 472-473: Each operation calls done() when finished (like Go's wg.Done())
finally:
    await wait_group.done()  # Decrements counter by 1
Output: β
 [AsyncOCRWaitGroup] Operation completed, counter: 2
Output: β
 [AsyncOCRWaitGroup] Operation completed, counter: 1
Output: β
 [AsyncOCRWaitGroup] Operation completed, counter: 0
5. Wait for All Operations
# Line 199: Wait for all operations to complete (like Go's wg.Wait())
await wait_group.wait(timeout=self.operation_timeout)
Output: π [AsyncOCRWaitGroup] All 3 operations completed
π― Detailed Flow Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β                   ASYNC OCR WAITGROUP FLOW                  β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1. INITIALIZATION
   ββββββββββββββββββββ
   β Multiple         β
   β food_label       ββββ
   β detections found β  β
   ββββββββββββββββββββ  β     βββββββββββββββββββββββ
                         ββββββΆβ Create WaitGroup    β
   ββββββββββββββββββββ  β     β wait_group = new()  β
   β QR/invoice       β  β     βββββββββββββββββββββββ
   β detections found ββββ              β
   ββββββββββββββββββββ                 β
                                        βΌ
2. ADD OPERATIONS TO WAITGROUP
   βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
   β await wait_group.add(3)  # 3 detections to process  β
   β π’ Counter: 3 (delta: +3)                          β
   βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
                               β
                               βΌ
3. START CONCURRENT TASKS
   βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
   β                  CONCURRENT EXECUTION                   β
   β                                                         β
   β Task 1: food_label   Task 2: food_label   Task 3: QR    β
   β βββββββββββββββββββ  βββββββββββββββββββ  βββββββββββ  β
   β β OCR Processing  β  β OCR Processing  β  β QR Proc β  β
   β β βββββββββββββββ β  β βββββββββββββββ β  β βββββββ β  β
   β β β   gRPC      β β  β β   gRPC      β β  β βgRPC β β  β
   β β β OCR Service β β  β β OCR Service β β  β β QR  β β  β
   β β βββββββββββββββ β  β βββββββββββββββ β  β βββββββ β  β
   β βββββββββββββββββββ  βββββββββββββββββββ  βββββββββββ  β
   β         β                    β                 β        β
   β         βΌ                    βΌ                 βΌ        β
   β βββββββββββββββββββ  βββββββββββββββββββ  βββββββββββ  β
   β β await wg.done() β  β await wg.done() β  β await    β  β
   β β Counter: 2      β  β Counter: 1      β  β wg.done()β  β
   β βββββββββββββββββββ  βββββββββββββββββββ  βCounter:0β  β
   β                                           βββββββββββ  β
   βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
                               β
                               βΌ
4. WAITGROUP COORDINATION
   βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
   β                WAITGROUP COORDINATION                   β
   β                                                         β
   β β
 Operation 1 complete β Counter: 2                    β
   β β
 Operation 2 complete β Counter: 1                    β
   β β
 Operation 3 complete β Counter: 0                    β
   β                                                         β
   β When Counter = 0:                                       β
   β π All 3 operations completed!                          β
   β π€ Release await wait_group.wait()                      β
   βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
                               β
                               βΌ
5. COMPLETION
   βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
   β β¨ Successfully processed 3 operations concurrently     β
   β π Return updated detections with OCR results           β
   β β±οΈ Total time: ~200ms (instead of 600ms sequential)     β
   βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Key Components Explained
  
  
  AsyncOCRWaitGroup Class (lines 28-84)
class AsyncOCRWaitGroup:
    def __init__(self):
        self._counter = 0           # Number of pending operations
        self._event = asyncio.Event()  # Event to signal completion
        self._lock = asyncio.Lock()    # Thread-safe counter updates
    async def add(self, delta: int = 1):
        # Like Go's wg.Add(n) - adds n operations to wait for
        self._counter += delta
        print(f"π’ Counter: {self._counter} (delta: {delta})")
    async def done(self):
        # Like Go's wg.Done() - signals one operation completed
        self._counter -= 1
        print(f"β
 Operation completed, counter: {self._counter}")
        if self._counter == 0:
            self._event.set()  # Wake up wait()
    async def wait(self, timeout=None):
        # Like Go's wg.Wait() - blocks until counter = 0
        await self._event.wait()
        print(f"π All {self._initial_counter} operations completed")
  
  
  Where Operations Start (line 330)
# Add all operations to WaitGroup
await wait_group.add(len(all_operations))  # e.g., add(3)
  
  
  Where Each Operation Finishes (line 472-473)
# In _process_single_operation_async
finally:
    # Always mark operation as done in WaitGroup
    await wait_group.done()  # Decrements counter
  
  
  Where We Wait for All (line 199)
# Wait for all operations to complete with timeout
await wait_group.wait(timeout=self.operation_timeout)
π Real Log Example
When you have 3 food_label detections:
π [AsyncOCR] Processing 3 detections concurrently
π [AsyncOCR] Starting 3 concurrent operations
π’ [AsyncOCRWaitGroup] Counter: 3 (delta: 3)           β WaitGroup.Add(3)
[Concurrent gRPC OCR calls happen]
β
 [AsyncOCR] Completed food_label operation for detection 0
β
 [AsyncOCRWaitGroup] Operation completed, counter: 2  β WaitGroup.Done()
β
 [AsyncOCR] Completed food_label operation for detection 1  
β
 [AsyncOCRWaitGroup] Operation completed, counter: 1  β WaitGroup.Done()
β
 [AsyncOCR] Completed food_label operation for detection 2
β
 [AsyncOCRWaitGroup] Operation completed, counter: 0  β WaitGroup.Done()
π [AsyncOCRWaitGroup] All 3 operations completed      β WaitGroup.Wait() unblocks
β
 [AsyncOCR] Completed 3 operations in 221.9ms
β¨ [AsyncOCR] Successfully processed 2 operations concurrently
π― Go vs Python Comparison
Go Version
var wg sync.WaitGroup
wg.Add(3)  // 3 operations
// Start goroutines
for _, detection := range detections {
    go func(d Detection) {
        defer wg.Done()  // Signal completion
        processOCR(d)
    }(detection)
}
wg.Wait()  // Wait for all to complete
Python Version (Our Implementation)
wait_group = AsyncOCRWaitGroup()
await wait_group.add(3)  # 3 operations
# Start async tasks
tasks = []
for detection in detections:
    task = asyncio.create_task(process_ocr_async(detection, wait_group))
    tasks.append(task)
await wait_group.wait()  # Wait for all to complete
# In each process_ocr_async:
try:
    # Do OCR processing
    result = await ocr_service.process(detection)
finally:
    await wait_group.done()  # Signal completion
The pattern is identical - we wait for a counter to reach zero as each concurrent operation signals completion! π
    
Top comments (0)