ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Netty学习之实战RPC框架

2021-07-31 19:33:36  阅读:149  来源: 互联网

标签:实战 Netty interfaces Object RPC new 序列化 服务端 客户端


RPC的实现方式是本地通过远程代理对象调用远程服务。在互联网应用场景中,单体服务极度缺乏弹性伸缩能力,在大规模开发团队中也不便于开发管理。所以往往会把服务根据模块进行垂直拆分,也就是我们说的SOA服务化。服务拆分后系统跟系统直接的业务交互往往依赖于RPC框架进行通讯。

  通常RPC的服务端会提供对应的接口jar包,客户端通过rpc框架功能拿到对应接口的代理实例,整个调用过程数据的包装和通讯都是透明的。

一、调用流程

  首先先来分析下RPC流程是怎样的,如下图:

        

  我们包含三部分,用户、Netty客户端,Netty服务端:

  1. 用户发起调用;
  2. Netty客户端包装请求;
  3. 客户端对请求进行序列化(对象转ByteBuf);
  4. 序列化后发送消息到服务端;
  5. 服务端会对请求进行反序列化解码成具体对象;
  6. 服务端根据客户端发送的请求解析并准备返回结果;
  7. 服务端对返回结果序列化为ByteBuf;
  8. 客户端收到返回信息;
  9. 客户端对返回信息反列化得到Object信息;
  10. 客户端把结果返回给用户调用方,完成整个请求。

二、包含技术

  如上所示,就是整个RPC框架的简单流程,在这个流程中需要使用哪些技术呢?

  • 动态代理:通过java Proxy技术拿到代理对象,invocationHandler实现数据协议包装和通讯。
  • 序列化、反序列化
  • 网络通讯:基于netty的客户端和服务端进行通讯可以获得很好的IO性能
  • 反射:根据客户端请求参数通过反射技术实现服务端对应实例的方法调用

  接下来我们就部分技术的使用进行代码片段分析。

  1、动态代理

//todo 代理对象
QueryStudentClient client = (QueryStudentClient)rpcProxyFactory.factoryRemoteInvoker("localhost",8080,QueryStudentClient.class);

public class RpcProxyFactory<T> {

    public T factoryRemoteInvoker(String host, int port, Class interfaces){
        //动态代理
        return (T) Proxy.newProxyInstance(interfaces.getClassLoader(),new Class[]{interfaces},
                new RemoteInvocationHandler(host,port,interfaces));
    }
}

public class RemoteInvocationHandler implements InvocationHandler {
    private String host;
    private int port;
    private Class interfaces;

    public RemoteInvocationHandler(String host, int port, Class interfaces) {
        this.host = host;
        this.port = port;
        this.interfaces = interfaces;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //todo 封装消息
        RpcContext rpcContext=new RpcContext();
        rpcContext.setClassName(interfaces.getName());
        rpcContext.setMethodName(method.getName());
        rpcContext.setTypes(method.getParameterTypes());
        rpcContext.setParams(args);

        try {
            //通讯
            NettyClient client=new NettyClient(host,port);
            client.connect();
            return client.sendData(rpcContext);
        }catch (Exception e){

        }
        return null;
    }
}

  2、序列化、反序列化

    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
        handler =  new NettyClientHandler(latch);

        HessianEncode hessionEncodeHandler=new HessianEncode();
        HessianDecode hessionDecodeHandler= new HessianDecode();

        LengthFieldPrepender fieldEncoder=new LengthFieldPrepender(2);
//        LengthFieldBasedFrameDecoder fieldDecoder = new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2);

//        出站
        sc.pipeline().addLast(fieldEncoder);
        sc.pipeline().addLast(hessionEncodeHandler);

        //入站        LengthFieldBasedFrameDecoder多线程下不安全,因此使用new
        sc.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
        sc.pipeline().addLast(hessionDecodeHandler);
        sc.pipeline().addLast(handler);
    }

  可以看到在pipeline先后添加了:基于消息头的长度设置的粘包半包处理handler、序列化工具、反序列化工具,此处序列化使用的是Hessian。

  3、反射技术

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcContext model=(RpcContext)msg;

        Class clazz=null;
        if(Registry.map.containsKey(model.getClassName())){
            clazz=Registry.map.get(model.getClassName());
        }

        Object result=null;
        try {
            Method method=clazz.getMethod(model.getMethodName(),model.getTypes());
            result=method.invoke(clazz.newInstance(),model.getParams());
        }catch (Exception e){
            e.printStackTrace();
        }

        ctx.channel().writeAndFlush(result);
    }

  可以看到服务端根据客户端传来的类名,去Registry的map中获取已注册的类,然后根据返回类型、方法名、参数进行反射调用。

三、Netty异步调用线程协作问题

  使用netty实现客户端发送需要注意的点:

  通过Netty的channel调用写数据writeAndFlush 写的事件以及收到响应之后的channelRead事件都是会异步执行,所以需要注意线程协作的问题。可以使用countdowlacth来实现主线程等待channelread执行完之后才去获取收到的响应对象。

    /**
     * 客户端发送数据方法
     * @param rpcRequest
     * @return
     * @throws InterruptedException
     */
    public Object sendData(RpcContext rpcRequest) throws InterruptedException {
        ChannelFuture cf = this.getChannelFuture();//单例模式获取ChannelFuture对象
        if (cf.channel() != null && cf.channel().isActive()) {
            latch=new CountDownLatch(1);
            clientInitializer.reLatch(latch);
            cf.channel().writeAndFlush(rpcRequest);
            latch.await();
        }

        return clientInitializer.getServerResult();
    }
}

  // 客户端从服务端读取数据完成
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        result=msg;
        System.out.println("返回数据读取完毕");
        latch.countDown();
    }

  由此实现了线程协作,否则调用结果无法得到返回。

标签:实战,Netty,interfaces,Object,RPC,new,序列化,服务端,客户端
来源: https://www.cnblogs.com/yb-ken/p/15084992.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有