如何使用线程池?
2021-03-19 03:25
标签:+= 思路 web视频监控 exe cli cal range web time 需求: 思路: 代码: 如何使用线程池? 标签:+= 思路 web视频监控 exe cli cal range web time 原文地址:https://www.cnblogs.com/Richardo-M-Q/p/13951602.html
我们之前实现了一个多线程web视频监控服务器,由于我们服务器资源有限(CPU、内存、带宽),需要对请求连接数(线程数)做限制,避免因资源耗尽而瘫痪
可以使用线程池代替原来的每次请求创建线程
使用标准库中concurrent.futures下的ThreadPoolExecutor,对象的submit和map方法可以启动线程池中的线程来执行任务import os, cv2, time, struct, threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import TCPServer, ThreadingTCPServer
from threading import Thread, RLock
from select import select
class JpegStreamer(Thread):
def __init__(self, camera):
super().__init__()
self.cap = cv2.VideoCapture(camera)
self.lock = RLock()
self.pipes = {}
def register(self):
pr, pw = os.pipe()
self.lock.acquire()
self.pipes[pr] = pw
self.lock.release()
return pr
def unregister(self, pr):
self.lock.acquire()
pw = self.pipes.pop(pr)
self.lock.release()
os.close(pr)
os.close(pw)
def capture(self):
cap = self.cap
while cap.isOpened():
ret, frame = cap.read()
if ret:
ret, data = cv2.imencode(‘.jpg‘, frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
yield data.tostring()
def send_frame(self, frame):
n = struct.pack(‘l‘, len(frame))
self.lock.acquire()
if len(self.pipes):
_, pipes, _ = select([], self.pipes.values(), [], 1)
for pipe in pipes:
os.write(pipe, n)
os.write(pipe, frame)
self.lock.release()
def run(self):
for frame in self.capture():
self.send_frame(frame)
class JpegRetriever:
def __init__(self, streamer):
self.streamer = streamer
self.local = threading.local()
def retrieve(self):
while True:
ns = os.read(self.local.pipe, 8)
n = struct.unpack(‘l‘, ns)[0]
data = os.read(self.local.pipe, n)
yield data
def __enter__(self):
if hasattr(self.local, ‘pipe‘):
raise RuntimeError()
self.local.pipe = streamer.register()
return self.retrieve()
def __exit__(self, *args):
self.streamer.unregister(self.local.pipe)
del self.local.pipe
return True
class WebHandler(BaseHTTPRequestHandler):
retriever = None
@staticmethod
def set_retriever(retriever):
WebHandler.retriever = retriever
def do_GET(self):
if self.retriever is None:
raise RuntimeError(‘no retriver‘)
if self.path != ‘/‘:
return
self.send_response(200)
self.send_header(‘Content-type‘, ‘multipart/x-mixed-replace;boundary=jpeg_frame‘)
self.end_headers()
with self.retriever as frames:
for frame in frames:
self.send_frame(frame)
def send_frame(self, frame):
sh = b‘--jpeg_frame\r\n‘
sh += b‘Content-Type: image/jpeg\r\n‘
sh += b‘Content-Length: %d\r\n\r\n‘ % len(frame)
self.wfile.write(sh)
self.wfile.write(frame)
from concurrent.futures import ThreadPoolExecutor
class ThreadingPoolTCPServer(ThreadingTCPServer):
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, thread_n=100):
super().__init__(server_address, RequestHandlerClass, bind_and_activate=True)
self.executor = ThreadPoolExecutor(thread_n)
def process_request(self, request, client_address):
self.executor.submit(self.process_request_thread, request, client_address)
if __name__ == ‘__main__‘:
# 创建Streamer,开启摄像头采集。
streamer = JpegStreamer(0)
streamer.start()
# http服务创建Retriever
retriever = JpegRetriever(streamer)
WebHandler.set_retriever(retriever)
# 开启http服务器
HOST = ‘localhost‘
PORT = 9000
print(‘Start server... (http://%s:%s)‘ % (HOST, PORT))
httpd = ThreadingPoolTCPServer((HOST, PORT), WebHandler, thread_n=3)
#httpd = ThreadingTCPServer((HOST, PORT), WebHandler)
httpd.serve_forever()
=====================================================================
>>> import threading,time,random
>>> def f(a,b):
... print(threading.current_thread().name,‘:‘,a,b)
... time.sleep(random.randint(5,10))
... return a * b
...
>>> from concurrent.futures import ThreadPoolExecutor
>>> executor = ThreadPoolExecutor(3)
>>> executor.submit(f,2,3)
ThreadPoolExecutor-0_0 : 2 3