如何使用线程池?

2021-03-19 03:25

阅读:710

标签:+=   思路   web视频监控   exe   cli   cal   range   web   time   

需求:
我们之前实现了一个多线程web视频监控服务器,由于我们服务器资源有限(CPU、内存、带宽),需要对请求连接数(线程数)做限制,避免因资源耗尽而瘫痪
可以使用线程池代替原来的每次请求创建线程

思路:
使用标准库中concurrent.futures下的ThreadPoolExecutor,对象的submit和map方法可以启动线程池中的线程来执行任务

代码:

import os, cv2, time, struct, threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import TCPServer, ThreadingTCPServer
from threading import Thread, RLock
from select import select

class JpegStreamer(Thread):
    def __init__(self, camera):
        super().__init__()
        self.cap = cv2.VideoCapture(camera)
        self.lock = RLock()
        self.pipes = {}

    def register(self):
        pr, pw = os.pipe()
        self.lock.acquire()
        self.pipes[pr] = pw
        self.lock.release()
        return pr

    def unregister(self, pr):
        self.lock.acquire()
        pw = self.pipes.pop(pr)
        self.lock.release()
        os.close(pr)
        os.close(pw)

    def capture(self):
        cap = self.cap
        while cap.isOpened():
            ret, frame = cap.read()
            if ret:
                ret, data = cv2.imencode(‘.jpg‘, frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
                yield data.tostring()

    def send_frame(self, frame):
        n = struct.pack(‘l‘, len(frame))
        self.lock.acquire()
        if len(self.pipes):
            _, pipes, _ = select([], self.pipes.values(), [], 1)
            for pipe in pipes:
                os.write(pipe, n)
                os.write(pipe, frame)
        self.lock.release()

    def run(self):
        for frame in self.capture():
            self.send_frame(frame)

class JpegRetriever:
    def __init__(self, streamer):
        self.streamer = streamer
        self.local = threading.local()

    def retrieve(self):
        while True:
            ns = os.read(self.local.pipe, 8)
            n = struct.unpack(‘l‘, ns)[0]
            data = os.read(self.local.pipe, n)
            yield data

    def __enter__(self):
        if hasattr(self.local, ‘pipe‘):
            raise RuntimeError()

        self.local.pipe = streamer.register()
        return self.retrieve()

    def __exit__(self, *args):
        self.streamer.unregister(self.local.pipe)
        del self.local.pipe
        return True

class WebHandler(BaseHTTPRequestHandler):
    retriever = None

    @staticmethod
    def set_retriever(retriever):
        WebHandler.retriever = retriever

    def do_GET(self):
        if self.retriever is None:
            raise RuntimeError(‘no retriver‘)

        if self.path != ‘/‘:
            return

        self.send_response(200) 
        self.send_header(‘Content-type‘, ‘multipart/x-mixed-replace;boundary=jpeg_frame‘)
        self.end_headers()

        with self.retriever as frames:
            for frame in frames:
                self.send_frame(frame)

    def send_frame(self, frame):
        sh  = b‘--jpeg_frame\r\n‘
        sh += b‘Content-Type: image/jpeg\r\n‘
        sh += b‘Content-Length: %d\r\n\r\n‘ % len(frame)
        self.wfile.write(sh)
        self.wfile.write(frame)

from concurrent.futures import ThreadPoolExecutor
class ThreadingPoolTCPServer(ThreadingTCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, thread_n=100):
        super().__init__(server_address, RequestHandlerClass, bind_and_activate=True)

        self.executor = ThreadPoolExecutor(thread_n)

    def process_request(self, request, client_address):
        self.executor.submit(self.process_request_thread, request, client_address)

if __name__ == ‘__main__‘:
    # 创建Streamer,开启摄像头采集。
    streamer = JpegStreamer(0)
    streamer.start()

    # http服务创建Retriever
    retriever = JpegRetriever(streamer)
    WebHandler.set_retriever(retriever)

    # 开启http服务器
    HOST = ‘localhost‘
    PORT = 9000
    print(‘Start server... (http://%s:%s)‘ % (HOST, PORT))
    httpd = ThreadingPoolTCPServer((HOST, PORT), WebHandler, thread_n=3)
    #httpd = ThreadingTCPServer((HOST, PORT), WebHandler)
    httpd.serve_forever()

=====================================================================

>>> import threading,time,random
>>> def f(a,b):
...     print(threading.current_thread().name,‘:‘,a,b)
...     time.sleep(random.randint(5,10))
...     return a * b
... 
>>> from concurrent.futures import ThreadPoolExecutor
>>> executor = ThreadPoolExecutor(3)
>>> executor.submit(f,2,3)
ThreadPoolExecutor-0_0 : 2 3

>>> future = executor.submit(f,2,3)
ThreadPoolExecutor-0_1 : 2 3
>>> future.result()
6
>>> executor.map(f, range(1,6),range(2,7))
ThreadPoolExecutor-0_2 : 1 2
.result_iterator at 0x7f17568aa830>
ThreadPoolExecutor-0_0 : 2 3
ThreadPoolExecutor-0_1 : 3 4
ThreadPoolExecutor-0_1 : 4 5
ThreadPoolExecutor-0_2 : 5 6
>>> list(executor.map(f, range(1,6),range(2,7)))
ThreadPoolExecutor-0_2 : 1 2
ThreadPoolExecutor-0_1 : 2 3
ThreadPoolExecutor-0_0 : 3 4
ThreadPoolExecutor-0_2 : 4 5
ThreadPoolExecutor-0_1 : 5 6
[2, 6, 12, 20, 30]
>>> 

如何使用线程池?

标签:+=   思路   web视频监控   exe   cli   cal   range   web   time   

原文地址:https://www.cnblogs.com/Richardo-M-Q/p/13951602.html


评论


亲,登录后才可以留言!