motan客户端

RPC的本质

方法调用对于程序员来讲是再正常不过的事了,object.method(),RPC的使用也一样,但底层对这一过程又切分开,有client和server两端,也就是调用者与实现者

因为他们不再在同一进程中,需要通过网络跨JVM实现这一调用过程

在java中的实现手法:动态代理+socket通信;这就是个套路,上层怎么封装实现,但底层就是这样,概莫能外

请求过程

motan的调用实现

先画个简单的序列图,理清一下调用过程
image

motan与spring的结合,后面再写了,spring的扩展也很简单。

基于对RPC本质的认识,可以先找到InvocationHandler的实现类RefererInvocationHandler

这个接口就一个方法

1
2
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable;

在这个方法里面就是去构造socket传输的request对象,request主要就是方法的签名信息与参数,传输到server端,去执行对应的实现方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(isLocalMethod(method)){
if("toString".equals(method.getName())){
return clustersToString();
}
throw new MotanServiceException("can not invoke local method:" + method.getName());
}
DefaultRequest request = new DefaultRequest();

request.setRequestId(RequestIdGenerator.getRequestId());
request.setArguments(args);
request.setMethodName(method.getName());
request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method));
request.setInterfaceName(clz.getName());
request.setAttachment(URLParamType.requestIdFromClient.getName(), String.valueOf(RequestIdGenerator.getRequestIdFromClient()));

// 当 referer配置多个protocol的时候,比如A,B,C,
// 那么正常情况下只会使用A,如果A被开关降级,那么就会使用B,B也被降级,那么会使用C
for (Cluster<T> cluster : clusters) {
String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();

Switcher switcher = switcherService.getSwitcher(protocolSwitcher);

if (switcher != null && !switcher.isOn()) {
continue;
}

request.setAttachment(URLParamType.version.getName(), cluster.getUrl().getVersion());
request.setAttachment(URLParamType.clientGroup.getName(), cluster.getUrl().getGroup());
// 带上client的application和module
request.setAttachment(URLParamType.application.getName(), ApplicationInfo.getApplication(cluster.getUrl()).getApplication());
request.setAttachment(URLParamType.module.getName(), ApplicationInfo.getApplication(cluster.getUrl()).getModule());
Response response = null;
boolean throwException =
Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(),
URLParamType.throwException.getValue()));
try {
//真正执行
response = cluster.call(request);
return response.getValue();
}

invoke交给了Cluster.call,而Cluster又给了HAStragy.call,HA策略通过loadbalance选择负载均衡策略得到Referer

Cluster是什么

在InvocationHandler里面,调用了Cluster的call方法,从代码上看,它的本质就是Referer的集合,并且提供了HA服务以及负载均衡。而Referer是提供服务的一个抽象

HA与LoadBalance

image

HA

HA策略,就提供了两种,

fail-fast

fail-fast很简单,调用失败就抛异常;

fail-over

fail-over相对fail-fast多了重试次数,如果失败,就重试一个referer

LoadBalance

这倒提供了不少

Round-Robin

这个很简单,一个一个往下轮询就行了,
但需要记住上一次的位置

random

随机

Least Load

这个motan实现有点意思

由于Referer List可能很多,比如上百台,如果每次都要从这上百个Referer或者最低并发的几个,性能有些损耗,因此 random.nextInt(list.size())获取一个起始的index,然后获取最多不超过MAX_REFERER_COUNT的 状态是isAvailable的referer进行判断activeCount.

localFirst

本地服务优先获取策略:对referers根据ip顺序查找本地服务,多存在多个本地服务,获取Active最小的本地服务进行服务。当不存在本地服务,但是存在远程RPC服务,则根据ActivWeight获取远程RPC服务;当两者都存在,所有本地服务都应优先于远程服务,本地RPC服务与远程RPC服务内部则根据ActiveWeight进行

NettyClient

上层不管怎么选择服务,最后都需要传输层去传输,nettyclient就是传输作用。

在DefaultRpcReferer中创建了一个nettyClient。向server发送远程调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Response request(Request request, boolean async) throws TransportException {
Channel channel = null;

Response response = null;

try {
// return channel or throw exception(timeout or connection_fail)
channel = borrowObject();

if (channel == null) {
LoggerUtil.error("NettyClient borrowObject null: url=" + url.getUri() + " "
+ MotanFrameworkUtil.toString(request));
return null;
}

// async request
response = channel.request(request);
// return channel to pool
returnObject(channel);

使用了common-pool连接池

在这儿是委托给了nettychannel.request(),nettyclient与nettychannel是什么关系呢?
client有server地址,channel就是这个地址连接的通道。
在nettychannel中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public Response request(Request request) throws TransportException {
int timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(),
URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
if (timeout <= 0) {
throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
NettyResponseFuture response = new NettyResponseFuture(request, timeout, this.nettyClient);
this.nettyClient.registerCallback(request.getRequestId(), response);

ChannelFuture writeFuture = this.channel.write(request);

boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);

if (result && writeFuture.isSuccess()) {
response.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
// 成功的调用
nettyClient.resetErrorCount();
} else {
// 失败的调用
nettyClient.incrErrorCount();
}
}
});
return response;
}


到此,整个请求过程已经完成。

返回处理

调用完成之后,总得得到结果才行

motan返回过程

在上面nettychannel.request方法,会返回一个response,NettyResponseFuture这个类名就说明了一切,使用了Future模式。

在返回response时,构造真实response

1
2
3
4
5
6
7
private Response asyncResponse(Response response, boolean async) {
if (async || !(response instanceof NettyResponseFuture)) {
return response;
}

return new DefaultResponse(response);
}

真实response里面,使用futureresponse去取值

1
2
3
4
5
6
7
public DefaultResponse(Response response) {
this.value = response.getValue();
this.exception = response.getException();
this.requestId = response.getRequestId();
this.processTime = response.getProcessTime();
this.timeout = response.getTimeout();
}

在futureresponse里面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public Object getValue() {
synchronized (lock) {
if (!isDoing()) {
return getValueOrThrowable();
}

if (timeout <= 0) {
try {
lock.wait();
} catch (Exception e) {
cancel(new MotanServiceException("NettyResponseFuture getValue InterruptedException : "
+ MotanFrameworkUtil.toString(request) + " cost="
+ (System.currentTimeMillis() - createTime), e));
}

// don't need to notifylisteners, because onSuccess or
// onFailure or cancel method already call notifylisteners
return getValueOrThrowable();
} else {
long waitTime = timeout - (System.currentTimeMillis() - createTime);

if (waitTime > 0) {
for (;;) {
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
}

if (!isDoing()) {
break;
} else {
waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime <= 0) {
break;
}
}
}
}

if (isDoing()) {
timeoutSoCancel();
}
}
return getValueOrThrowable();
}
}

没有使用java.util.concurrent包中Condition,CountDownLatch之类的工具类,而是使用原始的wait,notify组合

在NettyClient中,得到返回对象后,对responsefuter进行赋值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
pipeline.addLast("handler", new NettyChannelHandler(NettyClient.this, new MessageHandler() {
@Override
public Object handle(Channel channel, Object message) {
//得到返回对象
Response response = (Response) message;
//得到对应request的future
NettyResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId());

if (responseFuture == null) {
LoggerUtil.warn(
"NettyClient has response from server, but resonseFuture not exist, requestId={}",
response.getRequestId());
return null;
}

if (response.getException() != null) {
responseFuture.onFailure(response);
} else {
responseFuture.onSuccess(response);
}

return null;
}
}));

responseFuture的onsuccess方法,进行赋值并notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void onSuccess(Response response) {
this.result = response.getValue();
this.processTime = response.getProcessTime();

done();
}
private boolean done() {
synchronized (lock) {
if (!isDoing()) {
return false;
}

state = FutureState.DONE;
lock.notifyAll();
}

notifyListeners();
return true;
}


总结

到此,客户端部分已经完成,主要就是两方面

  1. 调用请求
  2. 返回处理

还有一些问题:

  1. 客户端怎么服务发现的?
  2. 服务降低怎么处理的?
公众号:码农戏码
欢迎关注微信公众号『码农戏码』