ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

gRPC流式传输,以视频传输为例

2022-03-05 18:31:47  阅读:253  来源: 互联网

标签:img 为例 gRPC cv2 server 传输 video frames import


 

  流式传输需要关键字stream,stream 修饰的结构在使用时应该是可以迭代的,即iterable。下面是client -> server 的视频流式传输。

video.proto

syntax = "proto3";

service New{
    rpc Video_transport(stream FrameRequest) returns (FrameResponse);  // no check for all frames
}
message FrameRequest {
    bytes f_data = 1;           // frame data field
    int32 goon = 2;             // if goon == 0, this is end informer, if >0, not the final frame
}
message FrameResponse{
    string state = 1;
}

 

  如client代码中所示,根据yield返回生成器的特性,每次取完一帧,就会用生成器将该帧的信息返回给stub,发送给server。

client.py

from grpc_out.video_pb2 import *
from grpc_out.video_pb2_grpc import NewStub

from PIL import Image

import cv2, grpc
import numpy as np

fheight, fwidth, fchannels, fps = 0,0,0,0
def transport_video(video_path):
    cap = cv2.VideoCapture(video_path)            # read video
    if not cap.isOpened():
        print('the video does not exist.')
        return False

    global fheight, fwidth, fchannels, fps          # information that help to reconstruct the video
    h, w = 256, 256
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    ret, frame = cap.read()
    while ret:
        img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img = np.array(cv2.resize(img, (h, w)))       # resize so that the data block won't be too big
        fheight, fwidth, fchannels = img.shape
        yield FrameRequest(f_data=np.ndarray.tobytes(img), goon=1) # 'yield' could provide an iterable genetator
        ret, frame = cap.read()
    cap.release()
    print('video read ended.')
    yield FrameRequest(f_data=None, goon=0)         # send end signal


def run():
    with grpc.insecure_channel('10.xx.xx.xx:50000') as channel:   # then the channel could close after the 'with' block
        stub = NewStub(channel)
        ## transport video to server

        video_path = './examples/C_30080000.mp4'
        response = stub.Video_transport(transport_video(video_path))
        print(response.state)


if __name__ == "__main__":
    run()

 

server.py

from concurrent import futures
from grpc_out.video_pb2 import *
from grpc_out.video_pb2_grpc import NewServicer, add_NewServicer_to_server

import os, subprocess, cv2, grpc
import numpy as np
from PIL import Image

# the implementation of interfaces in NewServicer
class New(NewServicer):
    def __init__(self) -> None:
        self.frames = []
    
    def save_video(self, video_path):                 # construct video from frames and save to local disk
        video = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (height, width))# things about height, width, channels, fps should be provided
        for img in self.frames:
            img_arr = np.frombuffer(img, dtype=np.uint8)         # bytes -> ndarray
            img_arr = np.reshape(img_arr, (height, width, channels))
            img = cv2.cvtColor(np.array(Image.fromarray(img_arr)), cv2.COLOR_RGB2BGR)
            video.write(img )
        video.release()
        self.frames.clear()

    def Video_transport(self, request_iterator, context):
        print(' *** Reciving video frames...')
        if len(self.frames): 
            print('Clear frames cache...')
            self.frames.clear()
        for f_info in request_iterator:                # it's an iterator, each element contains frame's bytes
            if not f_info.goon:                        # ened, empty frame
                self.save_video(video_path)
                print('---> ' + str(len(self.frames)) + ' frames (all) received.')
                return FrameResponse(state='ok')
            self.frames.append(f_info.f_data)
        
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))             # thread pool supports multi-request
    add_NewServicer_to_server(New(), server)                    # add servicer to server
    server.add_insecure_port('[::]:520')
    server.start()
    server.wait_for_termination()           # block the calling thread if the server terminates

if __name__ == "__main__":
    serve()

 

  上面双方启动后,会产生一个client->server的流,流中每次只传输一帧,只有当一个视频中的所有帧都传输完毕后,Video_transport 这个服务才会结束并给出response。

标签:img,为例,gRPC,cv2,server,传输,video,frames,import
来源: https://www.cnblogs.com/grainrain/p/15968945.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有