thrift协议分析、skywalking消息头实现

本篇分两块来介绍thrift协议。
thrift定义文件:

struct MiRequest {
   2: required string name;
   3: optional i32 age;
}

exception MiRequestException {
   1: required i32 code;
   2: optional string reason;
}

service MiTestService {
   string miTestMethod(1: MiRequest request) throws (1:MiRequestException qe); 
   string miTestOtherMethod(1: MiRequest request, 2: string pTwo) throws (1:MiRequestException qe); 

}

执行thrift --gen java:beans Test.thrift
生成MiTestService.java,本篇后文的通信协议介绍都是基于MiTestService.java,类结构如下:


image.png

简介:
MiTestService.AsyncClient:异步调用-调用方协议处理类
MiTestService.AsyncIface:异步调用-接口定义
MiTestService.Client:同步调用-调用方协议处理类
MiTestService.Iface:同步调用-接口定义
MiTestService.miTestMethod_args:方法miTestMethod的入参
MiTestService.miTestMethod_result:方法miTestMethod的返回值
MiTestService.miTestOtherMethod_args:方法miTestOtherMethod的入参
MiTestService.miTestOtherMethod_result:方法miTestOtherMethod的返回值
MiTestService.Processor:服务方协议处理器

以同步远程调用miTestOtherMethod方法为例:

一、调用方--方法调用和消息发送

MiTestService.Client构造方法

    public Client(TProtocol prot)
    {
      this(prot, prot);
    }

    public Client(TProtocol iprot, TProtocol oprot)
    {
      iprot_ = iprot;
      oprot_ = oprot;
    }

TProtocol为底层通信处理类。
备注:
MiTestService.Client协议处理类:负责通信协议消息体的定义,消息体的发送和二进制流的解析
TProtocol通信处理类:负责处理二进制字节流的发送和接收

调用方发起方法调用:MiTestService.Client.miTestOtherMethod

public String miTestOtherMethod(MiRequest request, String pTwo) throws MiRequestException, TException
    {
      send_miTestOtherMethod(request, pTwo);
      return recv_miTestOtherMethod();
    }

    public void send_miTestOtherMethod(MiRequest request, String pTwo) throws TException
    {
      oprot_.writeMessageBegin(new TMessage("miTestOtherMethod", TMessageType.CALL, ++seqid_));
      miTestOtherMethod_args args = new miTestOtherMethod_args();
      args.setRequest(request);
      args.setPTwo(pTwo);
      args.write(oprot_);
      oprot_.writeMessageEnd();
      oprot_.getTransport().flush();
    }

重点说明:
TProtocol.writeMessageBegin发送TMessage指定本次调用的是哪个方法
miTestOtherMethod_args.write发送本次方法调用的入参

两块的详细源码和说明如下:

public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {
      int version = VERSION_1 | message.type;
      writeI32(version);
      writeString(message.name);
      writeI32(message.seqid);
    } else {
      writeString(message.name);
      writeByte(message.type);
      writeI32(message.seqid);
    }
}

public void write(TProtocol oprot) throws TException {
      validate();
      //空实现
      oprot.writeStructBegin(STRUCT_DESC);
      //发送入参:复合对象request
      if (this.request != null) {
        //REQUEST_FIELD_DESC = new TField("request", TType.STRUCT, (short)1);
        //写入TField.type、TField.id
        oprot.writeFieldBegin(REQUEST_FIELD_DESC);
        //写入具体内容
        this.request.write(oprot);
        //空实现
        oprot.writeFieldEnd();
      }
      //发送入参:基础类型string
      if (this.pTwo != null) {
        //P_TWO_FIELD_DESC = new TField("pTwo", TType.STRING, (short)2);
        //写入TField.type、TField.id
        oprot.writeFieldBegin(P_TWO_FIELD_DESC);
        //写入具体内容
        oprot.writeString(this.pTwo);
        //空实现
        oprot.writeFieldEnd();
      }
      //写入一个字节:TType.STOP(整型1)
      oprot.writeFieldStop();
      //空实现
      oprot.writeStructEnd();
}

远程调用请求信息发送完毕。

二、服务方--消息接收和解析

MiTestService.Processor构造方法:传入MiTestService.Iface的具体实现类

public Processor(Iface iface)
    {
      iface_ = iface;
      //方法名-方法处理类映射,每个方法生成了一个具体的处理类
      //可以看出方法名称必须唯一,不支持方法重载
      //方法处理类实现了MiTestService.Processor.ProcessFunction接口
      processMap_.put("miTestMethod", new miTestMethod());
      processMap_.put("miTestOtherMethod", new miTestOtherMethod());
}

protected static interface ProcessFunction {
      public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException;
}

//private class miTestMethod implements ProcessFunction

//private class miTestOtherMethod implements ProcessFunction

服务方调用TServer.serve开启端口监听,
接收到消息后将调用MiTestService.Processor.process方法

    public boolean process(TProtocol iprot, TProtocol oprot) throws TException
    {
      //解析出TMessage消息,主要包含请求调用的方法名称
      TMessage msg = iprot.readMessageBegin();
      //
      ProcessFunction fn = processMap_.get(msg.name);
      fn.process(msg.seqid, iprot, oprot);
      return true;
    }

以miTestOtherMethod处理流程为例,主要代码如下:

public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
{
        //解析参数
        miTestOtherMethod_args args = new miTestOtherMethod_args();
        //此处划重点:涉及第三部分可扩展性的讲解
        args.read(iprot);
        //空实现
        iprot.readMessageEnd();
        //结果对象
        miTestOtherMethod_result result = new miTestOtherMethod_result();
        //发起方法调用
        result.success = iface_.miTestOtherMethod(args.request, args.pTwo);
        //调用结果写回
        oprot.writeMessageBegin(new TMessage("miTestOtherMethod", TMessageType.REPLY, seqid));
        result.write(oprot);
        //空实现
        oprot.writeMessageEnd();
        oprot.getTransport().flush();
}

三、thrift协议--字节码增强、skywalking可扩展消息头

第三部分将根据miTestOtherMethod_args.read方法的具体实现,
来解读thrift协议的可扩展消息头的实现原理

public void read(TProtocol iprot) throws TException {
      TField field;
      //非读--空实现
      iprot.readStructBegin();
      while (true)
      {
        //读取Field.type、Field.id
        field = iprot.readFieldBegin();
        if (field.type == TType.STOP) { 
          break;
        }
        //根据Field.id匹配关系,解析出每一个方法入参
        switch (field.id) {
          case 1: // REQUEST
            if (field.type == TType.STRUCT) {
              this.request = new MiRequest();
              this.request.read(iprot);
            } else { 
              TProtocolUtil.skip(iprot, field.type);
            }
            break;
          case 2: // P_TWO
            if (field.type == TType.STRING) {
              this.pTwo = iprot.readString();
            } else { 
              TProtocolUtil.skip(iprot, field.type);
            }
            break;
          //匹配不上的消息则跳过
          default:
            TProtocolUtil.skip(iprot, field.type);
        }
        iprot.readFieldEnd();
      }
      //空实现
      iprot.readStructEnd();
      validate();
    }

由上面的服务方方法参数解析实现可以得出:
1、每一个方法的方法入参都严格按照定义的Filed.id来匹配
2、匹配失败则走到default逻辑,跳过匹配异常的消息体

由此可以实现:
字节码增强TBinaryProtocol.writeMessageBegin,在此方法执行后延(即写入TMessage后)写入自定义Field.id的消息体;
字节码增强TBinaryProtocol.readMessageBegin 或者 MiTestService.Processor.process,在字节流解析出TMessage后解析自定义Field.id的消息体。

自定义消息体可以为任何自定义类型的thrift复合类型或基础类型,传入skywalking上下文信息。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,841评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,415评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,904评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,051评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,055评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,255评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,729评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,377评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,517评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,420评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,467评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,144评论 3 317
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,735评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,812评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,029评论 1 256
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,528评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,126评论 2 341