HBase Observer中ES增加创建IK mapping

ES中的中文分词支持改为用IK分词
在调用java api时,需要指定字段使用IK分词创建mapping
同时ES还从原来使用的BulkRequestBuilder,改成参数更多更灵活的BulkProcessor。

1.原来的ElasticSearchOperator

package com.xxx.data;


import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;

import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.ImmutableSettings;
//import org.elasticsearch.common.settings.Settings;
//import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class ElasticSearchOperator {

    // 缓冲池容量
    private static final int MAX_BULK_COUNT = 10;
    // 最大提交间隔(秒)
    private static final int MAX_COMMIT_INTERVAL = 60 * 5;

    private static Client client = null;
    private static BulkRequestBuilder bulkRequestBuilder = null;

    private static Lock commitLock = new ReentrantLock();

    static {

        // elasticsearch1.5.0
//        Settings settings = ImmutableSettings.settingsBuilder()
//                .put("cluster.name", Config.clusterName).build();
//        client = new TransportClient(settings)
//                .addTransportAddress(new InetSocketTransportAddress(
//                        Config.nodeHost, Config.nodePort));

        // 2.3.5
        client = MyTransportClient.client;

        bulkRequestBuilder = client.prepareBulk();
        bulkRequestBuilder.setRefresh(true);

        Timer timer = new Timer();
        timer.schedule(new CommitTimer(), 10 * 1000, MAX_COMMIT_INTERVAL * 1000);
    }

    /**
     * 判断缓存池是否已满,批量提交
     *
     * @param threshold
     */
    private static void bulkRequest(int threshold) {
        if (bulkRequestBuilder.numberOfActions() > threshold) {
            BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
            if (!bulkResponse.hasFailures()) {
                bulkRequestBuilder = client.prepareBulk();
            }
        }
    }

    /**
     * 加入索引请求到缓冲池
     *
     * @param builder
     */
    public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            commitLock.unlock();
        }
    }

    /**
     * 加入删除请求到缓冲池
     *
     * @param builder
     */
    public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            commitLock.unlock();
        }
    }

    /**
     * 定时任务,避免RegionServer迟迟无数据更新,导致ElasticSearch没有与HBase同步
     */
    static class CommitTimer extends TimerTask {
        @Override
        public void run() {
            commitLock.lock();
            try {
                bulkRequest(0);
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                commitLock.unlock();
            }
        }
    }

    private static void test() {
        Config.indexName = "flume-2016-08-10";
        Config.typeName = "tweet";
        for (int i = 10; i < 20; i++) {
            Map<String, Object> json = new HashMap<String, Object>();
            json.put("field", "ttt");
            //添加
//            addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, String.valueOf(i)).setDoc(json).setUpsert(json));
            //删除
            addDeleteBuilderToBulk(client.prepareDelete(Config.indexName, Config.typeName, String.valueOf(i)));
        }

        System.out.println(bulkRequestBuilder.numberOfActions());
    }

    public static void main(String[] args) {
        test();
    }
}

2.改成ElasticSearchBulkProcessor

package com.xxx.data;

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;

import java.util.*;

/**
 * Created by lisiyu on 16/9/19.
 */
public class ElasticSearchBulkProcessor {

    private static Client client = null;
    private static BulkProcessor bulkProcessor = null;

    // 缓冲池容量(计数,request)
    private static final int MAX_BULK_COUNT = 1000;
    // 缓冲池容量(大小,MB)
    private static final int MAX_BULK_SIZE = 1024;
    // 最大提交间隔(秒)
    private static final int MAX_COMMIT_INTERVAL = 60 * 1;
    // 最大并发数量
    private static final int MAX_CONCURRENT_REQUEST = 2;
    // 失败重试等待时间 (ms)
    private static final int REJECT_EXCEPTION_RETRY_WAIT = 500;
    // 失败重试次数
    private static final int REJECT_EXCEPTION_RETRY_TIMES = 3;

    static {
        // 2.3.5
        client = MyTransportClient.client;

        bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {  }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {  }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {  }
                })
                .setBulkActions(MAX_BULK_COUNT)
                .setBulkSize(new ByteSizeValue(MAX_BULK_SIZE, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(MAX_COMMIT_INTERVAL))
                .setConcurrentRequests(MAX_CONCURRENT_REQUEST)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(
                                TimeValue.timeValueMillis(REJECT_EXCEPTION_RETRY_WAIT),
                                REJECT_EXCEPTION_RETRY_TIMES))
                .build();
    }

    /**
     * 加入索引请求到缓冲池
     *
     * @param indexRequest
     * @param fieldSet
     */
    public static void addIndexRequestToBulkProcessor(IndexRequest indexRequest,Set<String> fieldSet) {
        try {
            // 获取索引及类型信息
            System.out.println("index:"+indexRequest.index());
            System.out.println("type:"+indexRequest.type());

            // 尝试创建索引,并指定ik中文分词
            createMapping(indexRequest.index(),indexRequest.type(),fieldSet);

            // 更新数据
            bulkProcessor.add(indexRequest);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    /**
     * 创建mapping(feid("indexAnalyzer","ik")该字段分词IK索引 ;feid("searchAnalyzer","ik")该字段分词ik查询;具体分词插件请看IK分词插件说明)
     * @param index 索引名称;
     * @param mappingType 索引类型
     * @param fieldSet 列集合
     * @throws Exception
     */
    public static void createMapping(String index,String mappingType,Set<String> fieldSet)throws Exception{
        // 判断index是否存在,不存在则创建索引,并启用ik分词器
        if(client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet().isExists()){
            System.out.println("index: '"+index+"' is exist!");
            new XContentFactory();
            XContentBuilder builder=XContentFactory.jsonBuilder()
                    .startObject()//注意不要加index和type
                    .startObject("properties")
                    .startObject("id").field("type", "string").field("store", "yes").endObject();
            for(String field : fieldSet){
                builder = builder.startObject(field).field("type", "string").field("store", "yes").field("analyzer", "ik").endObject();
            }
            builder = builder.endObject().endObject();

            PutMappingRequest mapping = Requests.putMappingRequest(index).type(mappingType).source(builder);
            client.admin().indices().putMapping(mapping).actionGet();

        } else {
            System.out.println("create index: '"+index+"'!");
            new XContentFactory();
            XContentBuilder builder=XContentFactory.jsonBuilder()
                    .startObject()//注意不要加index和type
                    .startObject("properties")
                    .startObject("id").field("type", "string").field("store", "yes").endObject();
            for(String field : fieldSet){
                builder = builder.startObject(field).field("type", "string").field("store", "yes").field("analyzer", "ik").endObject();
            }
            builder = builder.endObject().endObject();

            client.admin().indices().prepareCreate(index).addMapping(mappingType, builder).get();
        }
    }

    public static void test() {
        // on startup
        Client client = MyTransportClient.client;
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {  }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {  }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {  }
                })
                .setBulkActions(10000)
                .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();

        Map<String, Object> json = new HashMap<String, Object>();
        json.put("field", "test");
        bulkProcessor.add(new IndexRequest("twitter", "tweet", "1111").source(json));
    }

    public static void main(String[] args) {
        test();
    }
}

3.DataSyncObserver类修改

@Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        /**
         * 原方法调用ElasticSearchOperator,没有通过IK创建中文索引。
         */
//        try {
//            String indexId = new String(put.getRow());
//            Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
////            NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
//            Map<String, Object> json = new HashMap<String, Object>();
//            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
//                for (Cell cell : entry.getValue()) {
//                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
//                    String value = Bytes.toString(CellUtil.cloneValue(cell));
//                    json.put(key, value);
//                }
//            }
//            System.out.println();
//            ElasticSearchOperator.addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, indexId).setDoc(json).setUpsert(json));
//            LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName);
//        } catch (Exception ex) {
//            LOG.error(ex);
//        }

        /**
         * 新方法调用ElasticSearchBulkProcessor,通过IK创建中文索引。
         */
        try {
            String indexId = new String(put.getRow());
            NavigableMap familyMap = put.getFamilyCellMap();
            HashSet set = new HashSet();
            HashMap json = new HashMap();
            Iterator mapIterator = familyMap.entrySet().iterator();

            while(mapIterator.hasNext()) {
                Map.Entry entry = (Map.Entry)mapIterator.next();
                Iterator valueIterator = ((List)entry.getValue()).iterator();

                while(valueIterator.hasNext()) {
                    Cell cell = (Cell)valueIterator.next();
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    json.put(key, value);
                    set.add(key);
                }
            }

            System.out.println();
            ElasticSearchBulkProcessor.addIndexRequestToBulkProcessor((new IndexRequest(Config.indexName, Config.typeName, indexId)).source(json), set);
            LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName);
        } catch (Exception ex) {
            LOG.error(ex);
        }
    }

4.测试

  • 代码打包
  • jar包上传到hdfs
  • 创建hbase表,并修改表属性关联observer
  • 测试put新数据
  • 查看es中数据
  • 中文分词测试

{"query":{"query_string":{"query":"拖鞋"}},"highlight":{"require_field_match":false,"explain":true,"fields":{"*":{}}}}

中文分词测试.jpg

5.程序代码整体和其余测试等操作可以查看另一篇文章

Sqoop导入HBase,并借助Coprocessor协处理器同步索引到ES

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,440评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,814评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,427评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,710评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,625评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,014评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,511评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,162评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,311评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,262评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,278评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,989评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,583评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,664评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,904评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,274评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,856评论 2 339

推荐阅读更多精彩内容