DEV Community

MrChoke
MrChoke

Posted on • Originally published at Medium on

1 2

ปิด Python Threading เมื่อประมวลผลเสร็จ

บันทึกความจำแบบลูกทุ่งๆ เรื่องการจบโปรแกรมที่ใช้ Threading ของ Python ที่เปิด worker แบบ Infinite loop เอาไว้ [[มี update ด้านล่าง]]

ตัวอย่าง

import threading
import queue
import time
import signal
import sys
from datetime import datetime
class myThreading():
def __init__(self, n=3):
self.nprocess = n
self.threads = []
self.q = queue.Queue()
def start(self):
for i in range(self.nprocess):
t = threading.Thread(target=self.worker, args=(str(i), ))
self.threads.append(t)
t.start()
def putQ(self, data):
self.q.put(data)
def worker(self, n):
print('Start:', n)
while True:
if not self.q.empty():
data = self.q.get()
if data != 'END':
self.process(n, data)
self.q.task_done()
time.sleep(1)
if data == 'END' and self.q.empty():
print('Worker:', n, ' END')
break
def process(self, i, data):
print("Process by:", i, ' Data:', data, ' Current:', datetime.now())
def signal_handler(self, signal, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
if __name__ == "__main__":
threads = myThreading(3)
threads.start()
signal.signal(signal.SIGINT, threads.signal_handler)
for i in range(10):
threads.putQ(datetime.now())
threads.putQ('END')

จากตัวอย่างข้างบนเมื่อข้อมูลหยุดส่งให้ Queue แล้วโปรแกรมยังค้างอยู่ให้เพิ่มหนึ่งบรรทัดดังนี้ (line 36)

self.q.put('END')
Enter fullscreen mode Exit fullscreen mode
import threading
import queue
import time
import signal
import sys
from datetime import datetime
class myThreading():
def __init__(self, n=3):
self.nprocess = n
self.threads = []
self.q = queue.Queue()
def start(self):
for i in range(self.nprocess):
t = threading.Thread(target=self.worker, args=(str(i), ))
self.threads.append(t)
t.start()
def putQ(self, data):
self.q.put(data)
def worker(self, n):
print('Start:', n)
while True:
if not self.q.empty():
data = self.q.get()
if data != 'END':
self.process(n, data)
self.q.task_done()
time.sleep(1)
if data == 'END' and self.q.empty():
print('Worker:', n, ' END')
self.q.put('END')
break
def process(self, i, data):
print("Process by:", i, ' Data:', data, ' Current:', datetime.now())
def signal_handler(self, signal, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
if __name__ == "__main__":
threads = myThreading(3)
threads.start()
signal.signal(signal.SIGINT, threads.signal_handler)
for i in range(10):
threads.putQ(datetime.now())
threads.putQ('END')

วิธีนี้น่าจะลูกทุ่งไปหน่อยใครมีวิธีที่เฉียบและง่ายๆ แนะนำมานะครับมึนหมดละ

Update

หลังจากที่ Published บนความไปมีเพื่อนใน FaceBook แนะนำวิธีที่ง่ายกว่ามาให้ตามตัวอย่างด้านล่าง

import concurrent.futures
from datetime import datetime
import time
import random
def process(i, data):
t = random.randrange(5)
time.sleep(t)
print("Process index:", i, ' Sleep:', t, ' Current:', datetime.now())
if __name__ == "__main__":
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for i in range(10):
executor.submit(process, i, datetime.now())

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read full post →

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more