`
manzhizhen
  • 浏览: 289373 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论
阅读更多

         刚开始使用Dubbo的人,可能对Dubbo的第一印象就是它是一个RPC框架,当然,所有的分布式框架都少不了相互通信的过程,何况Dubbo的任务就是帮助分布式业务系统完成服务的通讯、负载、注册、发现和监控等功能。不得不承认,RPC是Dubbo提供服务的核心流程,为了兼容多种使用场景,Dubbo显然需要提供多种RPC方式(协议).

         开发一个简单的RPC框架,重点需要考虑的是两点,即编解码方式和底层通讯协议的选型,编解码方式指的是需要传输的数据在调用方将以什么组织形式拆解成字节流并在服务提供方以什么形式解析出来。编解码方式的设计需要考虑到后期的版本升级,所以很多RPC协议在设计时都会带上当前协议的版本信息。而底层通讯协议的选型都大同小异,一般都是TCP(当然也可以选择建立于TCP之上更高级的协议,比如Avro、Thrift和HTTP等),在Java语言中就是指套接字Socket,当然,在Netty出现后,很少RPC框架会直接以自己写Socket作为默认实现的通讯方式,但通常也会自己实现一个aio、nio或bio版本给那些“不方便”依赖Netty库的应用系统来使用。

         在Dubbo的源码中,有一个单独模块dubbo-rpc,其中,最重要的应该是Protocol和Invoker两个接口,代表着协议(编解码方式)和调用过程(通讯方式)。Invoker接口继承于Node接口,Node接口规范了Dubbo体系中各组件之间通讯的基本要素:

 

public interface Node {

    // 协议数据载体

    URL getUrl();

    // 状态监测,当前是否可用

    boolean isAvailable();

    // 销毁方法

    void destroy();

}

 

而Invoker接口则更简单:

 

public interface Invoker<T> extends Node {

    // 获取调用的接口

    Class<T> getInterface();

    // 调用过程

    Result invoke(Invocation invocation) throws RpcException;

}

 

从源代码dubbo-rpc下的子模块来看,我们能知道目前Dubbo支持dubbo(默认)、hessian、http、injvm(本地调用)、memcached、redis、rmi、thrift和webservice等9中RPC方式。根据Dubbo的官方手册,injvm是一个伪协议,它不开启端口,不发起远程调用,只在JVM内直接关联,但执行Dubbo的Filter链,所以这一般用于线下测试。可是为啥Memcached和Redis也能用作RPC?这里是指Dubbo端作为服务消费方,而Memcached或Redis作为服务提供方。

       我们这里重点看调用方(服务消费方)部分的代码。

       虽然Invoker接口中定义的是invoke方法,invoker方法的实现理应RPC的整个操作,但为了状态检查、上下文切换和准备、异常捕获等,抽象类AbstractInvoker中定义了一个doInvoker抽象方法来支持不同的RPC方式所应做的纯粹而具体的RPC过程,我们直接看AbstractInvoker中的invoker实现:

 

public Result invoke(Invocation inv) throws RpcException {

    if(destroyed) {

        throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()

                                        + " use dubbo version " + Version.getVersion()

                                        + " is DESTROYED, can not be invoked any more!");

    }

    RpcInvocation invocation = (RpcInvocation) inv;

    invocation.setInvoker(this);

    // 填充接口参数

    if (attachment != null && attachment.size() > 0) {

       invocation.addAttachmentsIfAbsent(attachment);

    }

    // 填充业务系统需要透传的参数

    Map<String, String> context = RpcContext.getContext().getAttachments();

    if (context != null) {

       invocation.addAttachmentsIfAbsent(context);

    }

    // 默认是同步调用,但也支持异步

    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){

       invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());

    }

 

    /**

     * 幂等操作:异步操作默认添加invocation id,它是一个自增的AtomicLong

     * 可以在RpcContext中设置attachments{@link Constants.ASYNC_KEY}值来设置是同步还是异步

     */

    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

 

    try {

       

        // 执行具体的RPC操作

        return doInvoke(invocation);

 

    // 异常处理的代码略去

    } catch (InvocationTargetException e) {

    } catch (RpcException e) {

    } catch (Throwable e) {

    }

}

       可以看出主要是用来做参数填充(包括方法参数、业务参数和Dubbo内定的参数),然后就直接调用具体的doInvoker方法了。Dubbo所支持的RPC协议都需继承AbstractInvoker类。

         我们先来看看Dubbo中默认的dubbo协议的实现,即DubboInvoker,直接看其doInvoker的实现:

 

@Override

protected Result doInvoke(final Invocation invocation) throws Throwable {

    RpcInvocation inv = (RpcInvocation) invocation;

    final String methodName = RpcUtils.getMethodName(invocation);

    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());

    inv.setAttachment(Constants.VERSION_KEY, version);

 

    // 确定此次调用该使用哪个client(一个client代表一个connection

    ExchangeClient currentClient;

    if (clients.length == 1) {

        currentClient = clients[0];

    } else {

        // 如果是多个client,则使用简单的轮询方式来决定

        currentClient = clients[index.getAndIncrement() % clients.length];

    }

    try {

        // 是否异步调用

        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);

        // 是否单向调用,注意,单向调用和异步调用相比不同,单向调用不等待被调用方的应答就直接返回

        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);

        if (isOneway) {

           boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);

            // 单向调用只负责发送消息,不等待服务端应答,所以没有返回值

            currentClient.send(inv, isSent);

            RpcContext.getContext().setFuture(null);

            return new RpcResult();

        } else if (isAsync) {

           ResponseFuture future = currentClient.request(inv, timeout);

            // 异步调用先保存future,便于后期处理

            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));

            return new RpcResult();

        } else {

            // 默认的同步调用

           RpcContext.getContext().setFuture(null);

            return (Result) currentClient.request(inv, timeout).get();

        }

    } catch (TimeoutException e) {

        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);

    } catch (RemotingException e) {

        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);

    }

}

 

从上面的代码可以看出dubbo协议中分为三种调用方式同步(默认)、异步和OneWay,同步好理解,就是阻塞等拿到被调用方的结果再返回,异步也好理解,不等待被调用者的处理结果就直接返回,但需要等到被调用者接收到异步请求的应答,OneWay(单向调用)在很多MQRPC框架中都有出现,即调用方只负责调用一次,不管被调用方是否接收到该请求,更不会去理会被调用方的任何应答,OneWay一般只会在无需保证调用结果的时候使用。在《Dubbo源代码实现二》中我们已经提到过,负载的策略决定此次服务调用是请求哪个服务提供方(也就是哪台服务器),当确定了调用哪个服务提供房后,其实也就是确定了使用哪个Invoker,这里指DubboInvoker实例。RPC框架为了提高服务的吞吐量,通常服务消费方和服务提供方的服务器之间会建立多个连接,如上面代码中的clients所以在确定使用哪个DubboInvoker实例后,会从中选择一个(如上面代码的取模轮询)client来进行RPC调用。从上面给出的代码可以看出,同步和异步的区别只是同步直接在currentClient.request返回的Future对象上进行了get操作来直接等待结果的返回。

       Dubbo中的Client实例都是ExchangeClient的实现,而每个Client实例都会绑定一个Channel的实例,来处理通讯的具体细节,而所有的Channel实例都实现了ExchangeChannel接口。这里我们先来看看HeaderExchangeChannel#request的实现:

 

public ResponseFuture request(Object request, int timeout) throws RemotingException {

    if (closed) {

        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");

    }

    // create request.

    Request req = new Request();

    req.setVersion("2.0.0");

    // 相比OneWay,同步和异步调用属于TwoWay

    req.setTwoWay(true);

    req.setData(request);

    // 创建DefaultFuture,用于将请求和应答关联起来

    DefaultFuture future = new DefaultFuture(channel, req, timeout);

    try{

        // 直接发送调用请求

        channel.send(req);

    }catch (RemotingException e) {

        future.cancel();

        throw e;

    }

 

    // future返回,用于拿到服务调用的返回值

    return future;

}

 

 

从上面代码可以看出,在直接调用channel.send发送数据时,先创建了一个DefaultFuture,它主要用于关联请求和应答,DefaultFuture将稍后分析。后面,直接调用了Channel的send方法,dubbo协议底层直接使用了Netty框架,所以这里指的是NettyChannel见NettyChannel#send的代码:

     

public void send(Object message, boolean sent) throws RemotingException {

    super.send(message, sent);

   

    boolean success = true;

    int timeout = 0;

    try {

        ChannelFuture future = channel.write(message);

        /**

         * sent值只是为了性能调优,默认是false

         */

        if (sent) {

            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            success = future.await(timeout);

        }

        Throwable cause = future.getCause();

        if (cause != null) {

            throw cause;

        }

    } catch (Throwable e) {

        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);

    }

   

    // senttrue且数据发送时间超过指定的超时时间时,由Dubbo负责抛出异常

    if(! success) {

        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()

                + "in timeout(" + timeout + "ms) limit");

    }

}

  

根据Dubbo用户手册中所说,sent参数的配置主要用于性能调优,这里当sent为true时(默认为false),将直接使用Netty的ChannelFuture来实现在给定的超时时间内等待,如果数据发送时间超过指定的超时时间,则抛出异常。之所以这样做,是为了将Netty框架处理时间控制在超时时间范围内,否则Dubbo框架在外围做的超时机制(DefaultFuture)将徒劳。

       接下来,我们看看Dubbo如何将请求和应答关联起来的,前面看到的HeaderExchangeChannel#request实现中,创建了一个Request对象,Request中有一个mId,用来唯一表示一个请求对象,而该mId在new的时候就会创建:

 

public Request() {

    mId = newId();

}

 

private static long newId() {

    // getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID

    return INVOKE_ID.getAndIncrement();

}

 

DefaultFuture靠的就是这个mId来关联请求和应答消息,DefaultFuture中有两个很重要的属性:FUTURSCHANNELS,它们类型都是ConcurrentHashMapkeymId,在新建DefaultFuture对象时会把mId和相关的Future和Channel塞到这两个Map中,还有一个ReentrantLock类型的lock属性,用于阻塞来等待应答,我们直接看DefaultFuture中获取结果和接收到应答后的实现:

 

public Object get(int timeout) throws RemotingException {

    if (timeout <= 0) {

        // 默认的超时时间是1

        timeout = Constants.DEFAULT_TIMEOUT;

    }

    if (! isDone()) {

        long start = System.currentTimeMillis();

        lock.lock();

        try {

            while (! isDone()) {

                // 最多等制定的超时时间

                done.await(timeout, TimeUnit.MILLISECONDS);

                // 如果已经有结果或者已经超过超时时间,则break

                if (isDone() || System.currentTimeMillis() - start > timeout) {

                    break;

                }

            }

        } catch (InterruptedException e) {

            throw new RuntimeException(e);

        } finally {

            lock.unlock();

        }

        if (! isDone()) {

            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));

        }

    }

    return returnFromResponse();

}

 

public static void received(Channel channel, Response response) {

    try {

        // 获取并移除该mIdFuture

        DefaultFuture future = FUTURES.remove(response.getId());

        if (future != null) {

            future.doReceived(response);

        } else {

            logger.warn("The timeout response finally returned at "

                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))

                        + ", response " + response

                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()

                            + " -> " + channel.getRemoteAddress()));

        }

    } finally {

        // 获取并移除该mIdChannel

        CHANNELS.remove(response.getId());

    }

}

 

private void doReceived(Response res) {

    lock.lock();

    try {

        response = res;

        if (done != null) {

            // 释放信号

            done.signal();

        }

    } finally {

        lock.unlock();

    }

    if (callback != null) {

        invokeCallback(callback);

    }

}

 

由于received是静态方法,所以可以直接在Netty中注册的Handler中使用。

       那服务消费方和服务提供方的连接数量是由谁决定的呢?这个我们可以直接看DubboInvoker的创建方DubboProtocol中的代码:

 

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {

    // create rpc invoker.

    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);

    invokers.add(invoker);

    return invoker;

}

 

private ExchangeClient[] getClients(URL url){

    //是否共享连接

    boolean service_share_connect = false;

    /** 如果在dubbo:reference中没有设置{@link Constants.CONNECTIONS_KEY},则默认是共享连接  */

    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);

    //如果connections不配置,则共享连接,否则每服务每连接

    if (connections == 0){

        service_share_connect = true;

        connections = 1;

    }

 

    // 一个client维护一个connection

    ExchangeClient[] clients = new ExchangeClient[connections];

    for (int i = 0; i < clients.length; i++) {

        if (service_share_connect){

            // 使用共享的TCP长连接

            clients[i] = getSharedClient(url);

        } else {

            // 单独为该URL建立TCP长连接

            clients[i] = initClient(url);

        }

    }

    return clients;

}

 

 

从getClients的代码可以看出,服务消费方和服务提供方的服务器之间的连接数量是可以配置的,服务消费方和服务提供方都可以配置,当然服务消费方优先级更高,例如:

服务消费方A:<dubbo:reference   interface="com.foo.BarServiceA"   /> 

服务消费方A:<dubbo:reference   interface="com.foo.BarServiceB"  connections="5"  /> 

服务提供方B:<dubbo:service  interface="com.foo.BarServiceA"  /> 

服务提供方B:<dubbo:service  interface="com.foo.BarServiceB"  connections="10"  /> 

对于服务BarServiceA,由于消费方和提供方都没有配置connections,所以,所有类似于BarServiceA这样没有配置connections的服务,消费方服务器和提供方服务器将公用一个TCP长连接,即上面代码说提到的共享连接。而对于服务BarServiceA,因为配置了connections属性,消费方A和提供方B之间将单独建立5个(消费方配置优先级高于服务端配置,所以这里是5而不是10TCP长连接来专门给服务BarServiceA使用,以提高吞吐量和性能,至于每次调用应该如何从这5个连接中选,前面已经提到,这里不再阐述。所以,为了提高某个服务的吞吐量,可以试着配置connections属性,当然,前提是服务提供方性能过剩。

<!--StartFragment--> <!--EndFragment-->

         对于异步调用,Dubbo的默认调用过滤链中有一个FutureFilter,当我们在dubbo:reference中配置了async="true"后,将会执行FutureFilter中的异步逻辑,这里不再阐述,感兴趣的同学可以去阅读FutureFilter#asyncCallback部分的代码。

0
0
分享到:
评论
1 楼 miss_fish 2017-06-11  
这么详细的文章,居然没有人评价。。。 谢谢~~~

相关推荐

    Dubbo源代码分析之远程调用过程(2.5.4开发版)

    该文档分析了 Dubbo 框架中 RPC 调用的整个流程,并基于源代码按照执行 时序进行说明,源码版本为2.5.4开发版。 涉及的关键点包括:Invocation、Invoker、Directory、路由、负载均衡、集群容错、过滤器以及监控模块...

    基于Python实现的一个简单的分布式高并发RPC框架+源代码+文档说明

    ### 五、提供了什么RPC服务? &gt; + 客户端请求服务端计算一个整数值的斐波那契数列值,当然也可以自行定义 ### 六、项目的组成部分 -------- 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传...

    JAVA上百实例源码以及开源项目源代码

    Java从网络取得文件 1个目标文件 简单 Java从压缩包中提取文件 1个目标文件 简单 Java存储与读取对象 1个目标文件 如题 Java调色板面板源代码 1个目标文件 摘要:Java源码,窗体界面,调色板 使用Java语言编写的一款...

    dubbo:dubbo深度解析源码-2.5.4

    Dubbo是一个分布式的高性能RPC框架,可为应用程序提供服务导入/导出功能。 它包含三个关键部分,其中包括: 远程处理:提供异步同步和请求响应消息传递的网络通信框架。 群集:具有负载平衡/故障转移/群集功能的...

    dubbo-go服务框架-其他

    我们的目标是:你可以对这些分层接口进行新的实现,并通过调用 extension 模块的“ extension.SetXXX ”方法来覆盖 dubbo-go [同 go-for-apache-dubbo ]的默认实现,以完成自己的特殊需求而无需修改源代码。...

    smart-doc是一款同时支持JAVA REST API和Apache Dubbo RPC接口文档生成的工具,.rar

    smart-doc是一款同时支持JAVA REST API和Apache Dubbo RPC接口文档生成的工具,smart-doc在业内率先提出基于JAVA泛型定义推导的理念,完全基于接口源码来分析生成接口文档,不采用任何注解侵入到业务代码中。...

    介绍 Golang 简单服务框架,提供如下功能 高性能TCP网络通信服务端&客户端.rar

    不知道dubbo在阿里的内部竞争中败给了HSF,还是阿里有意将其闭源了,官方的代码使用的spring还停留在2.5.6.SEC03的版本,dubbox的spring也只升级到3.2.9.RELEASE。 不管怎样,dubbo还是在电商企业得到广泛的应用,...

    单点登录源码

    | ├── zheng-upms-rpc-service -- rpc服务提供者 | └── zheng-upms-server -- 用户权限系统及SSO服务端[端口:1111] ├── zheng-cms -- 内容管理系统 | ├── zheng-cms-common -- cms系统公共模块 | ├──...

    基于SpringBoot+Shiro+Redis+Jwt+Thymeleaf+MyBatis 开发的后台用户、角色+源代码+文档

    * 为了方便大学家习dubbo的运行机制,本框架将dubbo的provider和customer作了一个整合,将官方demo里的方多应用整合成了一个,即在同一应用内启动消费端和服务端 * 注:如有实际业务需要请将服务端与消费端分离,...

    javaapi源码-smart-doc:Smart-doc是JavaRestfulapi文档生成工具。Smart-doc基于接口源代码分析来生

    Smart-doc基于接口源代码分析来生成接口文档,并实现零注释入侵。 您只需在开发时编写Javadoc注释,智能文档即可帮助您生成Markdown或HTML5文档。 smart-doc不需要像Swagger这样的代码中添加注解。 特征 零注释,零...

    JAVA上百实例源码以及开源项目

    Java源代码实现部分,比较有意思,也具参考性。像坐标控制、旋转矩阵、定时器、生成图像、数据初始化、矩阵乘法、坐标旋转、判断是否是顺时针方向排列、鼠标按下、放开时的动作等,都可在本源码中得以体现。 Java...

    Java微服务架构163课

    096 服务消费者 097 测试 JRebel 热部署 098 登录页 099 首页1 099 首页2 100 使用 thymeleaf 模板 101 使用 iframe 展示功能页 102 频道管理功能-列表页布局 103 新增频道 104 频道列表 105 选择父级频道...

    java开源包3

    同时,任何第三方都可以使用OAUTH认证服务,任 何服务提供商都可以实现自身的OAUTH认证服务,因而OAUTH是开放的。业界提供了OAUTH的多种实现如PHP,JavaScript,Java,Ruby等各种语言开发包,大大节约了程序员的时间...

    java开源包4

    同时,任何第三方都可以使用OAUTH认证服务,任 何服务提供商都可以实现自身的OAUTH认证服务,因而OAUTH是开放的。业界提供了OAUTH的多种实现如PHP,JavaScript,Java,Ruby等各种语言开发包,大大节约了程序员的时间...

    Java微服务架构l零从基础到精通高清视频教程全套 163课

    096 服务消费者 097 测试 JRebel 热部署 098 登录页 099 首页1 099 首页2 100 使用 thymeleaf 模板 101 使用 iframe 展示功能页 102 频道管理功能-列表页布局 103 新增频道 104 频道列表 105 选择父级频道1 105 选择...

    基于Java的智能文档生成工具设计源码

    本资源提供了一套基于Java的智能文档生成工具的设计源码,包含282个文件,其中包括189个Java源代码文件,31个PNG图片文件,以及17个Markdown文档。此外,还包括8个HTML页面文件,6个CSS样式文件,以及5个XML配置文件...

    java开源包8

    同时,任何第三方都可以使用OAUTH认证服务,任 何服务提供商都可以实现自身的OAUTH认证服务,因而OAUTH是开放的。业界提供了OAUTH的多种实现如PHP,JavaScript,Java,Ruby等各种语言开发包,大大节约了程序员的时间...

    java开源包10

    同时,任何第三方都可以使用OAUTH认证服务,任 何服务提供商都可以实现自身的OAUTH认证服务,因而OAUTH是开放的。业界提供了OAUTH的多种实现如PHP,JavaScript,Java,Ruby等各种语言开发包,大大节约了程序员的时间...

    java开源包1

    同时,任何第三方都可以使用OAUTH认证服务,任 何服务提供商都可以实现自身的OAUTH认证服务,因而OAUTH是开放的。业界提供了OAUTH的多种实现如PHP,JavaScript,Java,Ruby等各种语言开发包,大大节约了程序员的时间...

Global site tag (gtag.js) - Google Analytics