RPC
原理
什么是Stub?
Stub是一段代码,用来转换RPC过程中传递的参数。处理内容包括不同OS之间的大小端问题。另外,Client端一般叫Stub,Server端一般叫Skeleton。
生产方式:
- 手动生成,比较麻烦;
- 自动生成,使用IDL(InterfaceDescriptionLanguate),定义C/S的接口
RPC的套路:
自古深情留不住 唯有套路留人心
RPC最本质的就是通过socket把方法信息传输到远程服务器并执行相应method
在java界的rpc框架的实现手法:
- 服务端:socket + 反射
- 客户端:动态代理 + socket
之前也解析过motain框架,《motain客服端分析》、《motain服务端分析》
thrift
由于我司框架是通过thrift改造,发现这个框架没有按java套路出牌,可能这是跨语言类RPC的套路,有必要了解一下
thrift最初由facebook开发用做系统内各语言之间的RPC通信 。2007年由facebook贡献到apache基金 ,08年5月进入apache孵化器,支持多种语言之间的RPC方式的通信:php语言client可以构造一个对象,调用相应的服务方法来调用java语言的服务 ,跨越语言的C/S RPC调用
示例
IDL文件
1 | //HelloService.thrfit |
生成代码
1 | 运行 thrift -gen HelloService.thrfit |
会生成一个HelloService类
实现服务端与客服端
让服务端打印出客户端传入的参数
服务端
1 | public class ThriftServer { |
客户端
1 | public class ThriftClient { |
解析
可以看出server,client代码相对很简单,主要看看生成的HelloService类,这个类就是stub代码
来看一下,这个类是如何封装,把method和args传输到远程的
client
1 | HelloService.Client client = new HelloService.Client(protocol); |
关键点在HelloService.Client.helloString()方法1
2
3
4
5public String helloString(String what) throws org.apache.thrift.TException
{
send_helloString(what);
return recv_helloString();
}
发送消息
1 | public void send_helloString(String what) throws org.apache.thrift.TException |
- 把args抽象成了一个类
- 属性赋值
- 发送
主要看下sendBase()方法1
2
3
4
5
6private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
args.write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();
}
- 1.oprot_.writeMessageBegin 根据Protocol写数据,比如这儿使用的TBinaryProtocol,以二进制写数据
1
2
3
4
5
6
7
8
9
10
11
12public void writeMessageBegin(TMessage message) throws TException {
if (strictWrite_) {
int version = VERSION_1 | message.type;
writeI32(version);
writeString(message.name);
writeI32(message.seqid);
} else {
writeString(message.name);
writeByte(message.type);
writeI32(message.seqid);
}
}
再深入看看怎么写二进制数据的
int类型1
2
3
4
5
6
7public void writeI32(int i32) throws TException {
inoutTemp[0] = (byte)(0xff & (i32 >> 24));
inoutTemp[1] = (byte)(0xff & (i32 >> 16));
inoutTemp[2] = (byte)(0xff & (i32 >> 8));
inoutTemp[3] = (byte)(0xff & (i32));
trans_.write(inoutTemp, 0, 4);
}
string类型,先写长度,再写bytes1
2
3
4
5
6
7
8
9public void writeString(String str) throws TException {
try {
byte[] dat = str.getBytes("UTF-8");
writeI32(dat.length);
trans_.write(dat, 0, dat.length);
} catch (UnsupportedEncodingException uex) {
throw new TException("JVM DOES NOT SUPPORT UTF-8");
}
}
这儿写最终还是使用Transport.write,比如这儿使用的TSocket1
2
3
4
5
6
7
8
9
10public void write(byte[] buf, int off, int len) throws TTransportException {
if (outputStream_ == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
}
try {
outputStream_.write(buf, off, len);
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
}
就是写到1
outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
- 2.args.write(oprot_);
1
2
3
4
5
6
7
8
9
10
11
12public void write(org.apache.thrift.protocol.TProtocol oprot, helloString_args struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
if (struct.what != null) {
oprot.writeFieldBegin(WHAT_FIELD_DESC);
oprot.writeString(struct.what);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
这就是写field,也就是向输出流里写参数内容
3.oprot_.writeMessageEnd();
这表示消息写完成了,各个协议处理不同,比如二进制就是空实现,但如json就需要写个”}”,以完成json格式4.oprot_.getTransport().flush(); 直接flush
1
2
3
4
5
6
7
8
9
10
11
12
13/**
* Flushes the underlying output stream if not null.
*/
public void flush() throws TTransportException {
if (outputStream_ == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");
}
try {
outputStream_.flush();
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
}
client总结
整个发送消息就结束了,虽然没有按套路使用动态代理,而是通过生成的stub代码,把methodName,args给封装好了
server
服务端也没有通过反射的方式
主要逻辑在生成的HelloService$Processor类中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());
public Processor(I iface) {
super(iface, getProcessMap(new java.util.HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
protected Processor(I iface, java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
super(iface, getProcessMap(processMap));
}
private static <I extends Iface> java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
processMap.put("helloString", new helloString());
return processMap;
}
public static class helloString<I extends Iface> extends org.apache.thrift.ProcessFunction<I, helloString_args> {
public helloString() {
super("helloString");
}
public helloString_args getEmptyArgsInstance() {
return new helloString_args();
}
protected boolean isOneway() {
return false;
}
@Override
protected boolean handleRuntimeExceptions() {
return false;
}
public helloString_result getResult(I iface, helloString_args args) throws org.apache.thrift.TException {
helloString_result result = new helloString_result();
result.success = iface.helloString(args.what);
return result;
}
}
}
- 1.先看构造函数
1
2
3
4
5
6
7
8protected Processor(I iface, java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
super(iface, getProcessMap(processMap));
}
private static <I extends Iface> java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
processMap.put("helloString", new helloString());
return processMap;
}
这段把methodName与对应的处理类映射,那后面的事就简单了,当接受到消息,取得methodName,通过map获取对就的处理类回调就可以
1 | public static class helloString<I extends Iface> extends org.apache.thrift.ProcessFunction<I, helloString_args> { |
处理类,继承ProcessFunction类,实现getResult(),这个方法就是调用了对应service.helloString()
可以再深入看一下,在socket监听消息时1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18client = serverTransport_.accept();
if (client != null) {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
if (eventHandler_ != null) {
connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
}
while (true) {
if (eventHandler_ != null) {
eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
if(!processor.process(inputProtocol, outputProtocol)) {
break;
}
}
关键行:processor.process(inputProtocol, outputProtocol)
1 | public boolean process(TProtocol in, TProtocol out) throws TException { |
这就很明显了,通过methodName从map中取得ProccessFunction,再执行process方法,调用相应service的方法
总结
虽然thrift没有按以往java套路出牌,但最根本的把method发送到远程执行是一致的。可能对于多语言来讲,便于所以语言一致性,的确需要通过生成的stub代码手法来实现RPC
当然thrift并不简单,还有很多的内容需要深挖学习,但至少这个简单示例可以了解跨语言型的RPC,相关IDL,Stub的知识,有清晰认知,而不局限于概念