ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Netty源码解析-ReadTimeoutHandler、WriteTimeoutHandler

2021-12-30 20:02:11  阅读:161  来源: 互联网

标签:Netty ctx public 源码 promise WriteTimeoutTask IdleStateHandler WriteTimeoutHandle


前言:

上一篇博客中介绍了IdleStateHandler的使用场景及源码分析,我们可以使用IdleStateHandler来进行心跳检测。

除了这个,还有两个Handler与该IdleStateHandler功能类似,是作为其的补充。本文就来介绍下。

1.ReadTimeoutHandler

/**
 * Raises a {@link ReadTimeoutException} when no data was read within a certain
 * period of time.
 */
public class ReadTimeoutHandler extends IdleStateHandler {
    
}

看注释就很明确了,如果在指定的时间内没有数据被读取,则抛出一个ReadTimeoutException。

我们来直接分析下源码

1.1 ReadTimeoutHandler源码分析

public class ReadTimeoutHandler extends IdleStateHandler {
    private boolean closed;
    // 默认以秒为单位
    public ReadTimeoutHandler(int timeoutSeconds) {
        this(timeoutSeconds, TimeUnit.SECONDS);
    }

    public ReadTimeoutHandler(long timeout, TimeUnit unit) {
        super(timeout, 0, 0, unit);
    }

    // 这个重写了IdleStateHandler.channelIdle()方法
    @Override
    protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        assert evt.state() == IdleState.READER_IDLE;
        readTimedOut(ctx);
    }

    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        // 如果当前Handler没有关闭,则直接抛出一个ReadTimeoutException并关闭当前channel
        if (!closed) {
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
            ctx.close();
            closed = true;
        }
    }
}

代码比较少,重要的就是重写了IdleStateHandler.channelIdle()方法。

我们可以来看下IdleStateHandler.channelIdle()方法

public class IdleStateHandler extends ChannelDuplexHandler {
	protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }
}

原来的处理中直接将READ_IDLE的事件传递下到下一个ChannelHandler,下一个ChannelHandler捕获到该事件后做相应的处理。

而ReadTimeoutHandler处理更直接,不需要下一个ChannelHandler处理了,直接抛出异常,关闭channel。

2.WriteTimeoutHandler

// Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
}

从注释中我们了解到WriteTimeoutHandler与ReadTimeoutHandler比较类似,如果在指定时间内写操作没有完成的话,则直接抛出WriteTimeoutException异常。

这个与IdleStateHandler中的write事件监测有所不同(IdleStateHandler监测的是在指定时间内没有发生写事件,则发送WRITE_IDLE事件),需要注意

2.1 WriteTimeoutHandler构造方法分析

public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
    private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);

	// 指定超时时间
    private final long timeoutNanos;

	// 一个双向链表的task,后续会分析
    private WriteTimeoutTask lastTask;
    private boolean closed;

	public WriteTimeoutHandler(int timeoutSeconds) {
        this(timeoutSeconds, TimeUnit.SECONDS);
    }

	// 对写超时时间进行设置
    public WriteTimeoutHandler(long timeout, TimeUnit unit) {
        ObjectUtil.checkNotNull(unit, "unit");

        if (timeout <= 0) {
            timeoutNanos = 0;
        } else {
            timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
        }
    }
}

2.2 WriteTimeoutHandler.write() 重写写方法

public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
	@Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (timeoutNanos > 0) {
            promise = promise.unvoid();
            // 创建一个定时任务,具体见下面
            scheduleTimeout(ctx, promise);
        }
        ctx.write(msg, promise);
    }

	private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
        // 创建一个WriteTimeoutTask,具体见2.3
        final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
        // 在延迟timeoutNanos后执行task
        task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);

        // 如果task没有执行结束,则将当前task添加到lastTask的后一个节点,并添加监听
        if (!task.scheduledFuture.isDone()) {
            addWriteTimeoutTask(task);
            promise.addListener(task);
        }
    }
}

2.3 WriteTimeoutTask 超时任务

private final class WriteTimeoutTask implements Runnable, ChannelFutureListener {

    private final ChannelHandlerContext ctx;
    private final ChannelPromise promise;

    // WriteTimeoutTask is also a node of a doubly-linked list
    WriteTimeoutTask prev;
    WriteTimeoutTask next;

    ScheduledFuture<?> scheduledFuture;

    WriteTimeoutTask(ChannelHandlerContext ctx, ChannelPromise promise) {
        this.ctx = ctx;
        this.promise = promise;
    }

    @Override
    public void run() {
        // 当前任务执行时机就是在经过timeoutNanos延时后执行的,如果这时write任务还没有完成,说明已经超时了
        if (!promise.isDone()) {
            try {
                // 超时则抛出异常,具体见2.3.1
                writeTimedOut(ctx);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        }
        removeWriteTimeoutTask(this);
    }

	// 用于监听write事件的完成,完成后,直接取消scheduledFuture,并将当前task从链表中删除
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        // scheduledFuture has already be set when reaching here
        scheduledFuture.cancel(false);
        removeWriteTimeoutTask(this);
    }
}

2.3.1 WriteTimeoutHandler.writeTimedOut()超时触发异常

public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
	protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!closed) {
            // 直接抛出异常,并关闭连接
            ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
            ctx.close();
            closed = true;
        }
    }
}

总结:

重要的处理都放在WriteTimeoutTask中,重点还是理解这个定时任务的执行时机,该WriteTimeoutTask 是延时timeoutNanos后执行的,所以,按照该Handler的含义来说,在timeoutNanos后,当前write操作应对是done的,如果没有结束,则直接抛出异常即可。

标签:Netty,ctx,public,源码,promise,WriteTimeoutTask,IdleStateHandler,WriteTimeoutHandle
来源: https://blog.csdn.net/qq_26323323/article/details/122244157

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有