优化前1小时写入未完成
表1:400万
es 438万行0.3kb/行,8节点6分片1副本,写入到jg hbase表同样的8节点 50region,再反写到es原index中
spark 100executor,2min 写完,200万/min,连读带写入jg和反写id到es:30万/s
es 438万行0.3kb/行,8节点6分片1副本
spark 100executor,2min 写完,200万/min,30万/s
jg hbase表 50region
表2:300万
es 345万行记录
spark 50executor,3min 写完,100万/min,20万/s
优化点
- 增加repartition 到20,每个partition处理20万左右
- 增加spark executor memory到2g,之前1g经常溢写到硬盘,比较慢
- jg create时默认region = 50,增大并发
- jg batch loading = true,id block size = 100000,buffer = 102400
lib问题解决:
- 增加http-client
- 增加guava
- 增加hbase-client
放到每个yarn nm节点上,启动spark时
--conf "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.21/jars/my/*" \
启动:
nohup spark2-submit \
--conf "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.21/jars/my/*" \
--master yarn-client \
--class com.didichuxing.sts.dcp.knowledgebase.sparkprocess.app.Es2JgVertexJob \
--executor-memory 2g \
dcp-knowledgebase-sparkprocess-1.0-SNAPSHOT-jar-with-dependencies.jar \
dws_kg_vertex_vehicle \
100.90.170.15:9200,100.90.170.16:9200,100.90.164.32:9200 \
100.90.170.15,100.90.170.16,100.90.164.32 \
2181 \
janusgraph_biggraph1 \
100.90.170.15:9200,100.90.170.16:9200,100.90.164.32:9200 \
janusgraph_biggraph1 \
100000 \
102400 \
6 \
20 \
> logs_veh_jg.logs &
连接:
val builder = JanusGraphFactory.build
builder.set("storage.backend", "hbase")
builder.set("storage.hostname", jgConf(0))
builder.set("storage.port", jgConf(1))
builder.set("storage.hbase.table", jgConf(2))
builder.set("storage.hbase.skip-schema-check", "true")
builder.set("index.es.backend", "elasticsearch")
builder.set("index.es.hostname", jgConf(3))
builder.set("index.es.index-name", jgConf(4))
// BulkLoad
builder.set("storage.batch-loading", "true")
builder.set("ids.block-size", jgConf(5)) //default 10000, the number of vertices you expect to add per JanusGraph instance per hour.
builder.set("ids.authority.wait-time", "1000ms") //default 300ms
builder.set("ids.renew-timeout", "120000") //default 120000ms
builder.set("storage.buffer-size", jgConf(6)) //default 1024