ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java-Netty的管道流

2019-11-19 08:02:13  阅读:392  来源: 互联网

标签:stream nio netty java


我正在尝试将“缩略图生成器”实现为微服务.我认为这样的事情可能最适合作为TCP服务器,因此在简要调查了一些我选择了Netty的选项之后.为了使服务尽可能高效地使用内存,我宁愿避免将整个映像加载到内存中,因此一直在尝试构建一个管道,该管道的“ ThumbnailHandler”可以使用管道流来利用Netty的分块读取,以便当Netty接收更多字节时,缩略图生成器可以遍历更多的流.不幸的是,我通常对Netty或NIO模式还不够熟悉,无法知道我是否正在以最好的方式进行操作,因此即使简化版本也无法像我期望的那样工作,我也很难.

这是我的服务器设置:

public class ThumbnailerServer {

    private int port;

    public ThumbnailerServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
        final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
        final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
        final NioEventLoopGroup connectGroup = new NioEventLoopGroup(0, connectFactory, NioUdtProvider.BYTE_PROVIDER);

        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(acceptGroup, connectGroup)
             .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
             .option(ChannelOption.SO_BACKLOG, 128)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<UdtChannel>() {
                 @Override
                 public void initChannel(UdtChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast("handler", new ThumbnailerServerHandler());
                 }
             });

            // bind and start to accept incoming connections.
            b.bind(port).sync().channel().closeFuture().sync();
        } finally {
            connectGroup.shutdownGracefully();
            acceptGroup.shutdownGracefully();
        }
    }

}

和缩略图处理程序:

public class ThumbnailerServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private static final Logger logger = LoggerFactory.getLogger(ThumbnailerServerHandler.class);
    private PipedInputStream toThumbnailer = new PipedInputStream();
    private PipedOutputStream fromClient = new PipedOutputStream(toThumbnailer);

    private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
            Executors.newFixedThreadPool(5));

    private ListenableFuture<OutputStream> future;

    public ThumbnailerServerHandler() throws IOException {
        super(ByteBuf.class, true);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        future = executor.submit(() -> ThumbnailGenerator.generate(toThumbnailer));
        future.addListener(() -> {
            try {
                ctx.writeAndFlush(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }, executor);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        this.fromClient.close();
        this.toThumbnailer.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        int readableBytes = msg.readableBytes();
        msg.readBytes(this.fromClient, readableBytes);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("Encountered error during communication", cause);
        ctx.close();
    }


}

在简化整个流程之前,这里是我的简化“缩略图”:

public class ThumbnailGenerator {

    public static OutputStream generate(InputStream toThumbnailer) {
        OutputStream stream = new ByteArrayOutputStream();
        try {
            IOUtils.copy(toThumbnailer, stream);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return stream;
    }

}

>是否像这样在handlerHandlered方法中剥离异步任务是否合适?有没有更多的“净额”方式来做到这一点?
> IOUtils.copy应该并且确实会阻塞(由于对管道输入流的读取),直到有可供读取的数据,这就是为什么我将其卸载到执行程序池中的原因,因为如果我不能在处理程序中阻塞想要继续接收字节.但是,我发现这从未完成,但是确实取得了进展.那是因为我从未遇到过EOF字节(-1)吗?如何使此流程起作用?
>我是否缺少可以简化此过程的网络结构?我考虑过将其实现为一个解码器,直到它具有整个流时才解码,但是随后我会将所有内容加载到内存中.

解决方法:

好的,事实证明我有一些误解,可以解释为什么我无法正常工作.

1)许多文件类型没有所谓的终端字节.实际上,EOF字节(最常见的是-1,因为它是一个溢出值)通常是阅读器提供的一种实现,用于向其使用者传达已到达内容结尾的信息.文件本身通常不存在该文件.

2)channelReadComplete听起来不那么清晰.在满足netty中配置的最大读取次数(默认为10)后,或者在有理由相信消息已完全发送(如通过读取空缓冲区或接收到小于该缓冲区的缓冲区所示)时,会调用channelReadComplete.配置的块大小.

至于为什么输入流副本似乎挂起了,那是因为管道输入流从未产生终端值(这是EOF字节读取器实现的一个示例). PipedInputStreams仅在驱动它们的输出流关闭后才指示EOF.

为了使此实现可行,我应该将消息计数增加到足够高的数量,并在最后一次返回小于块大小的读取之后调用受信任的channelReadComplete.此时,可以安全地关闭并重置下一条消息的输出流.关闭输出流将导致输入流最终返回该EOF字节,其他所有操作均可继续进行.

标签:stream,nio,netty,java
来源: https://codeday.me/bug/20191119/2034576.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有