ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

41. 使用线程本地数据

2021-04-25 09:55:33  阅读:104  来源: 互联网

标签:__ self 41 pipe 线程 本地 frame def retriever


例如,我们实现了一个web视频监控服务器,服务器端采集摄像头数据,客户端使用浏览器通过http请求接收数据。服务器使用推送的方式(multipart/x-mixed-replace)一直使用一个tcp连接向客户端传递数据。这种方式将持续占用一个线程,导致单线程服务器无法处理多客户端请求。

要求:改写程序,在每个线程中处理一个客户端请求,支持多客户端访问。

解决方案:

threading.local()函数可以创建线程本地数据空间,其下属性对每个线程独立存在。


  • 对于线程本地数据:

线程本地数据是特定线程的数据。管理线程本地数据,只需要创建一个local(或者一个子类型)的实例并在实例中储存属性:

mydata = threading.local()mydata.x = 1

在不同的线程中,实例的值会不同。class threading.local,一个代表线程本地数据的类。


  • 方案示例:
yum install -y numpy opencv* python-qt4

pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple opencv-contrib-python==3.4.2.17

import os, cv2, time, struct, threadingfrom http.server import HTTPServer, BaseHTTPRequestHandlerfrom socketserver import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectclass JpegStreamer(Thread):
    def __init__(self, camrea):
        super().__init__()
        self.cap = cv2.VideoCapture(camrea)
        self.lock = RLock()
        self.pipes = {}

    def register(self):
        pr, pw = os.pipe()
        self.lock.acquire()
        self.pipes[pr] = pw
        self.lock.release()
        return pr    def unregister(self, pr):
        self.lock.acquire()
        pw = self.pipes.pop(pr)
        self.lock.release()
        os.close(pr)
        os.close(pw)

    def capture(self):
        cap = self.cap        while cap.isOpened():
            ret, frame = cap.read()
            if ret:
                ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
                yield data.tostring()

    def send_frame(self, frame):
        n = struct.pack('1', len(frame))
        self.lock.acquire()
        if len(self.pipes):
            _, pipes, _ = select([], self.pipes.values(), [], 1)
            for pipe in pipes:
                os.write(pipe, n)
                os.write(pipe, frame)
        self.lock.release()

    def run(self):
        for frame in self.capture():
            self.send_frame(frame)class JpegRetriever:
    def __init__(self, streamer):
        self.streamer = streamer    def retrieve(self):
        while True:
            ns = os.read(self.pipe, 8)
            n = struct.unpack('1', ns)[0]
            data = os.read(self.pipe, n)
            yield data    def __enter__(self):
        if hasattr(self, 'pipe'):
            raise RuntimeError()

        self.pipe = streamer.register()
        return self.retrieve()

    def __exit__(self, *args):
        self.streamer.unregister(self.pipe)
        del self.pipe        return Trueclass WebHandler(BaseHTTPRequestHandler):
    retriever = None

    @staticmethod
    def set_retriever(retriever):
        WebHandler.retriever = retriever    def do_GET(self):
        if self.retriever is None:
            raise RuntimeError('no retriever')

        if self.path != '/':
            return

        self.send_response(200)
        self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=jpeg_frame')
        self.end_headers()

        with self.retriever as frames:
            for frame in frames:
                self.send_frame(frame)

    def send_frame(self, frame):
        sh = b'--jpeg_frame\r\n'
        sh += b'Content-Type: image/jpeg\r\n'
        sh += b'Content-Length: %d\r\n\r\n' % len(frame)
        self.wfile.write(sh)
        self.wfile.write(frame)if __name__ == '__main__':
    # 创建Streamer,开启摄像头采集
    streamer = JpegStreamer(0)
    streamer.start()

    # http服务器创建Retriever
    retriever = JpegRetriever(streamer)
    WebHandler.set_retriever(retriever)

    # 开启http服务器
    HOST = '192.168.30.128'             #本机ip
    POST = 9000
    print('Start server...(http://%s:%d)' % (HOST, POST))
    httpd = TCPServer((HOST, POST), WebHandler)
    httpd.serve_forever()

我这里是Windows系统,linux虚拟机root运行提示:

VIDEOIO ERROR: V4L: can't open camera by index 0
Start server...(http://192.168.30.128:9000)

不过无所谓,当通过浏览器访问192.168.30.128:9000时,即使多个窗口同时访问,也只有一个访问记录产生,说明只有一个线程建立。

改进:通过threading.local()函数创建线程本地数据空间,数据对每个线程独立。

import os, cv2, time, struct, threadingfrom http.server import HTTPServer, BaseHTTPRequestHandlerfrom socketserver import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectclass JpegStreamer(Thread):
    def __init__(self, camrea):
        super().__init__()
        self.cap = cv2.VideoCapture(camrea)
        self.lock = RLock()
        self.pipes = {}

    def register(self):
        pr, pw = os.pipe()
        self.lock.acquire()
        self.pipes[pr] = pw
        self.lock.release()
        return pr    def unregister(self, pr):
        self.lock.acquire()
        pw = self.pipes.pop(pr)
        self.lock.release()
        os.close(pr)
        os.close(pw)

    def capture(self):
        cap = self.cap        while cap.isOpened():
            ret, frame = cap.read()
            if ret:
                ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
                yield data.tostring()

    def send_frame(self, frame):
        n = struct.pack('1', len(frame))
        self.lock.acquire()
        if len(self.pipes):
            _, pipes, _ = select([], self.pipes.values(), [], 1)
            for pipe in pipes:
                os.write(pipe, n)
                os.write(pipe, frame)
        self.lock.release()

    def run(self):
        for frame in self.capture():
            self.send_frame(frame)class JpegRetriever:
    def __init__(self, streamer):
        self.streamer = streamer
        self.local = threading.local()              #创建local对象

    def retrieve(self):
        while True:
            ns = os.read(self.local.pipe, 8)
            n = struct.unpack('1', ns)[0]
            data = os.read(self.local.pipe, n)
            yield data    def __enter__(self):
        if hasattr(self.local, 'pipe'):
            raise RuntimeError()

        self.local.pipe = streamer.register()
        return self.retrieve()

    def __exit__(self, *args):
        self.streamer.unregister(self.local.pipe)
        del self.local.pipe        return Trueclass WebHandler(BaseHTTPRequestHandler):
    retriever = None

    @staticmethod
    def set_retriever(retriever):
        WebHandler.retriever = retriever    def do_GET(self):
        if self.retriever is None:
            raise RuntimeError('no retriever')

        if self.path != '/':
            return

        self.send_response(200)
        self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=jpeg_frame')
        self.end_headers()

        with self.retriever as frames:
            for frame in frames:
                self.send_frame(frame)

    def send_frame(self, frame):
        sh = b'--jpeg_frame\r\n'
        sh += b'Content-Type: image/jpeg\r\n'
        sh += b'Content-Length: %d\r\n\r\n' % len(frame)
        self.wfile.write(sh)
        self.wfile.write(frame)if __name__ == '__main__':
    # 创建Streamer,开启摄像头采集
    streamer = JpegStreamer(0)
    streamer.start()

    # http服务器创建Retriever
    retriever = JpegRetriever(streamer)
    WebHandler.set_retriever(retriever)

    # 开启http服务器
    HOST = '192.168.30.128'
    POST = 9000
    print('Start server...(http://%s:%d)' % (HOST, POST))
    httpd = ThreadingTCPServer((HOST, POST), WebHandler)
    httpd.serve_forever()

接下来继续多个浏览器多个窗口访问,可以同时进行多个访问,且每个访问都会新产生一条访问记录。


标签:__,self,41,pipe,线程,本地,frame,def,retriever
来源: https://blog.51cto.com/u_10272167/2730229

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有