摘要: 上一篇博客,介绍的是brave直接通过http发送请求到zipkin,完成最简单的rpc跟踪,适合理解zipkin的跟踪链。这次来完成生产环境真实架构的部署。
先来浏览一下架构图(下图的scribe采用kafka替代)
首先我们完成brave+kafka+zipkin这一步先,这时候采用的存储是zipkin的内存。
下载kafka后
tar -xzf kafka_2.11-0.9.0.0.tgz
cd kafka_2.11-0.9.0.0
启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
(windows 启动 bin\windows\zookeeper-server-start.bat config\zookeeper.properties)
启动kafka服务:
bin/kafka-server-start.sh config/server.properties
(windows 启动 bin\windows\kafka-server-start.bat config\server.properties)
下载zipkin的jar包,
wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'
运行: java -jar zipkin.jar
用解压工具可以看到zipkin.jar里的BOOT-INF\classes\zipkin-server-shared.yml文件,所有的配置都在这个文件里。
改变配置里的值,可以通过java启动脚本里-D带入配置参数。
现在我们通过kafka做数据通道的话就采用如下参数:
java -DKAFKA_ZOOKEEPER=localhost:2181 -jar zipkin-server-1.19.2-exec.jar
打开ui查看 http://localhost:9411/
接着附上测试插入数据到kafka的代码。
import com.github.kristofa.brave.*;
import com.twitter.zipkin.gen.Endpoint;
import zipkin.Span;
import zipkin.reporter.AsyncReporter;
import zipkin.reporter.kafka08.KafkaSender;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BraveKafkaTest {
private static Brave brave = null;
private static Brave brave2 = null;
private static void braveInit(){
// KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").messageMaxBytes(10000).build();
KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").build();
AsyncReporter<Span> report = AsyncReporter.builder(sender).build();
brave = new Brave.Builder("appserver").reporter(report).build();
brave2 = new Brave.Builder("datacenter").reporter(report).build();
}
static class Task {
String name;
SpanId spanId;
public Task(String name, SpanId spanId) {
super();
this.name = name;
this.spanId = spanId;
}
}
public static void main(String[] args) throws Exception {
braveInit();
final BlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10);
Thread thread = new Thread(){
public void run() {
while (true) {
try {
Task task = queue.take();
dcHandle(task.name, task.spanId);
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
thread.start();
for (int i = 0; i < 10; i++) {
ServerRequestInterceptor serverRequestInterceptor = brave.serverRequestInterceptor();
ServerResponseInterceptor serverResponseInterceptor = brave.serverResponseInterceptor();
ClientRequestInterceptor clientRequestInterceptor = brave.clientRequestInterceptor();
ClientResponseInterceptor clientResponseInterceptor = brave.clientResponseInterceptor();
serverRequestInterceptor.handle(new ServerRequestAdapterImpl("group_data"));
ClientRequestAdapterImpl clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_radio_list");
clientRequestInterceptor.handle(clientRequestAdapterImpl);
queue.offer(new Task("get_radio_list", clientRequestAdapterImpl.getSpanId()));
Thread.sleep(10);
clientResponseInterceptor.handle(new ClientResponseAdapterImpl());
clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_user_list");
clientRequestInterceptor.handle(clientRequestAdapterImpl);
queue.offer(new Task("get_user_list", clientRequestAdapterImpl.getSpanId()));
Thread.sleep(10);
clientResponseInterceptor.handle(new ClientResponseAdapterImpl());
clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_program_list");
clientRequestInterceptor.handle(clientRequestAdapterImpl);
queue.offer(new Task("get_program_list", clientRequestAdapterImpl.getSpanId()));
Thread.sleep(10);
clientResponseInterceptor.handle(new ClientResponseAdapterImpl());
serverResponseInterceptor.handle(new ServerResponseAdapterImpl());
Thread.sleep(10);
}
}
public static void dcHandle(String spanName, SpanId spanId){
ServerRequestInterceptor serverRequestInterceptor = brave2.serverRequestInterceptor();
ServerResponseInterceptor serverResponseInterceptor = brave2.serverResponseInterceptor();
serverRequestInterceptor.handle(new ServerRequestAdapterImpl(spanName, spanId));
serverResponseInterceptor.handle(new ServerResponseAdapterImpl());
}
static class ServerRequestAdapterImpl implements ServerRequestAdapter {
Random randomGenerator = new Random();
SpanId spanId;
String spanName;
ServerRequestAdapterImpl(String spanName){
this.spanName = spanName;
long startId = randomGenerator.nextLong();
SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build();
this.spanId = spanId;
}
ServerRequestAdapterImpl(String spanName, SpanId spanId){
this.spanName = spanName;
this.spanId = spanId;
}
public TraceData getTraceData() {
if (this.spanId != null) {
return TraceData.builder().spanId(this.spanId).build();
}
long startId = randomGenerator.nextLong();
SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build();
return TraceData.builder().spanId(spanId).build();
}
public String getSpanName() {
return spanName;
}
public Collection<KeyValueAnnotation> requestAnnotations() {
Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
collection.add(kv);
return collection;
}
}
static class ServerResponseAdapterImpl implements ServerResponseAdapter {
public Collection<KeyValueAnnotation> responseAnnotations() {
Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
collection.add(kv);
return collection;
}
}
static class ClientRequestAdapterImpl implements ClientRequestAdapter {
String spanName;
SpanId spanId;
ClientRequestAdapterImpl(String spanName){
this.spanName = spanName;
}
public SpanId getSpanId() {
return spanId;
}
public String getSpanName() {
return this.spanName;
}
public void addSpanIdToRequest(SpanId spanId) {
//记录传输到远程服务
System.out.println(spanId);
if (spanId != null) {
this.spanId = spanId;
System.out.println(String.format("trace_id=%s, parent_id=%s, span_id=%s", spanId.traceId, spanId.parentId, spanId.spanId));
}
}
public Collection<KeyValueAnnotation> requestAnnotations() {
Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
collection.add(kv);
return collection;
}
public Endpoint serverAddress() {
return null;
}
}
static class ClientResponseAdapterImpl implements ClientResponseAdapter {
public Collection<KeyValueAnnotation> responseAnnotations() {
Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
KeyValueAnnotation kv = KeyValueAnnotation.create("radioname", "火星人1");
collection.add(kv);
return collection;
}
}
}
maven依赖:
<dependencies>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-core</artifactId>
<version>3.17.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-spancollector-http</artifactId>
<version>3.17.0</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-spancollector-kafka</artifactId>
<version>3.17.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter</groupId>
<artifactId>zipkin-sender-kafka08</artifactId>
<version>0.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.3</version>
</dependency>
</dependencies>
运行完成后,插入了10条跟踪信息导kafka,打开 http://localhost:9411/ 即可看到效果。
接下来,采用cassandra存储从kafka采集过来的数据。
下载apache-cassandra-2.2.8-bin.tar.gz,解压后
启动cassandra:bin\cassandra.bat
重新运行zipkin(加入-DSTORAGE_TYPE=cassandra参数):
java -DKAFKA_ZOOKEEPER=localhost:2181 -DSTORAGE_TYPE=cassandra -jar zipkin-server-1.19.2-exec.jar