otter支持rabbitMQ源码修改

otter支持rabbitMQ源码修改

首先从ali源下载otter源码:
https://github.com/alibaba/otter

执行脚本文件manager/deployer 资源目录sql文件夹中

修改manager下deployer资源配置文件:otter.properties
修改zk地址和数据库连接地址,以及管理后台开放端口1099

#zk地址
otter.zookeeper.cluster.default = xx
#与node节点心跳连接地址
otter.communication.manager.port = 1099
#mysql数据库连接
otter.database.driver.class.name = com.mysql.jdbc.Driver
otter.database.driver.url = jdbc:mysql://xxxxxx/otter
otter.database.driver.username = xxxx
otter.database.driver.password = xxxx

管理后台启动入口,配置启动地址后,可以直接run起来
com.alibaba.otter.manager.deployer.OtterManagerLauncher
启动前可以开启debug日志,这样可以在控制台看到启动日志

#开启日志 logback.xml
<appender-ref ref="STDOUT" />

浏览器输入 127.0.0.1:8080即可看到页面,默认是匿名用户,退出后重新使用admin登入 用户名/密码:admin

image.png

上面介绍了简单的启动流程,下面开始准备支持rabbitMQ,如图所示:


image.png
修改文件 :addDataSource.vm
<th>类型:</th>
        <td>
            <select id="sourceType" name="$dataMediaSourceGroup.type.key" onchange="changeform();" >
                <option value="MYSQL">MySQL</option>
                <option value="ORACLE">Oracle</option>
                <option value="RABBITMQ">RabbitMQ</option>
            </select><span class="red">*</span>
        </td>

修改文件:form.xml 该文件对应form表单验证 name="dataMediaSourceInfo"

    <field name="vhost" displayName="vhost"/>

添加vhost文本框

      <tr id="vhost_tr" style="display: none;">
            <th>vhost:</th>
            <td>
                <input id="sourceVhost" name="$dataMediaSourceGroup.vhost.key" value="$!dataMediaSourceGroup.vhost.value" type="text" class="setting_input"/><span class="red">*</span>
                <br />
                <span class="red">#addDataSourceMessage ($dataMediaSourceGroup.vhost)</span>
            </td>
        </tr>
修改js文件 dbCheck.js 联动选择
function changeform(){
    console.log("changed form")
    //获取 sourceType 的Dom对象
    var sourceType = document.getElementById('sourceType').value;
    //获取 groupName 的Dom对象
    var vhost_tr = document.getElementById("vhost_tr");
    if ("RABBITMQ" === sourceType) {
        // 如果是RocketMQ,则显示 groupName 的填写框
        vhost_tr.style.display = "table-row";
    } else {
        // 否则不显示
        vhost_tr.style.display = "none";
    }
}

前端修改已经完成,可以run起来看效果

后端maven引用rabbitMQ client

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </dependency>

修改文件:DataSourceChecker 支持连接验证检查

    public String check(String url, String username, String password, String encode, String sourceType,
                        String vhost) {
        if (DataMediaType.MYSQL.name().equals(sourceType) || DataMediaType.ORACLE.name().equals(sourceType)) {
            return checkDB(url, username, password, encode, sourceType);

        } else if (DataMediaType.RABBITMQ.name().equals(sourceType)) {
            return checkMQ(url, username, password, vhost);
        }
        return DATABASE_FAIL;
    }

    public String checkMQ(String url, String username, String password, String vhost) {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(vhost);
        factory.setAutomaticRecoveryEnabled(true);

        try {
            factory.setUri(url);
            Channel channel = factory.newConnection().createChannel();
            channel.close();
        } catch (Exception e) {
            e.printStackTrace();
            return MQ_CONN_FAIL;
        }

        return MQ_CONN_SUCCESS;
    }

可以测试一下:


image.png

开始修改保存方法,支持数据源保存
添加rabbitMQ数据源类 RabbitMQMediaSource :

public class RabbitMQMediaSource extends DbMediaSource {
    private String vhost;
    private String url;
    public String getUrl() {
        return url;
    }
    public void setUrl(String url) {
        this.url = url;
    }
    public String getVhost() {
        return vhost;
    }
    public void setVhost(String vhost) {
        this.vhost = vhost;
    }
}

枚举添加rabbit映射 DataMediaType


    public boolean isRabbitMQ() {
        return this == DataMediaType.RABBITMQ;
    }

修改文件:DataMediaSourceAction doAdd方法

 if (dataMediaSource.getType().isRabbitMQ()) {
            RabbitMQMediaSource rocketMqMediaSource = new RabbitMQMediaSource();
            dataMediaSourceInfo.setProperties(rocketMqMediaSource);
            rocketMqMediaSource.setUrl(dataMediaSourceInfo.getField("url").getStringValue());
           // rocketMqMediaSource.setRoutingKey(dataMediaSourceInfo.getField("routingKey").getStringValue());
            rocketMqMediaSource.setVhost(dataMediaSourceInfo.getField("vhost").getStringValue());

            try {
                dataMediaSourceService.create(rocketMqMediaSource);
            } catch (RepeatConfigureException rce) {
                err.setMessage("invalidDataMediaSource");
                return;
            }

至此,manage管理修改完成,实现了支持rabbitMQ数据源的添加。
下面我们需要配置数据表映射关系,然后修改node部分代码以支持rabbitMQ的发送

配置数据表
table name 为rabbitMQ的ex和routeKey组成,中间使用逗号隔开,选择rabbitMQ数据源即可,点击保存 如下图

image.png

然后配置一个mysql,如图


image.png

添加zk,node和canal,最后配置表映射 此处为otter基本配置,不在本文中介绍

image.png

image.png

配置完成,我们开始修改node代码

文件名称:RowDataTransformer 修改方法:transform

            // 获取目标库的表信息
            DbDialect dbDialect = null;
            Table table = null;
            if (dataMedia.getSource() instanceof RabbitMQMediaSource) {
              //如果是rabbitMQ数据源,则使用source数据表表结构
                dbDialect = dbDialectFactory.getDbDialect(dataMediaPair.getPipelineId(),
                        (DbMediaSource) sourceMedia.getSource());
                table = dbDialect.findTable(sourceMedia.getNamespace(), result.getTableName());
                result.setTableName(dataMedia.getName());

            } else {
                dbDialect = dbDialectFactory.getDbDialect(dataMediaPair.getPipelineId(),
                        (DbMediaSource) dataMedia.getSource());
                table = dbDialect.findTable(result.getSchemaName(), result.getTableName());

            }

执行sql语句是在sqlBuilderLoadInterceptor拦截器中执行,我们可以加一个判断如果是RabbitMQ,则发送消息到MQ中,以下是代码

    public boolean before(DbLoadContext context, EventData currentData) {

        DataMediaSource dataMediaSource = context.getDataMediaSource();
        if (dataMediaSource instanceof RabbitMQMediaSource) {
            if (!currentData.getEventType().isDml())//如果非数据库管理操作,则不继续执行
                return true;
            Map<String, Object> dataColumns = new HashMap<>();
            Map<String, Object> originalMessageBody = new HashMap<>();

            for (EventColumn key : currentData.getKeys()) {
                dataColumns.put(key.getColumnName(), key.getColumnValue());
            }

            for (EventColumn column : currentData.getColumns()) {
                dataColumns.put(column.getColumnName(), column.getColumnValue());
            }
            for (EventColumn oldKey : currentData.getOldKeys()) {
                originalMessageBody.put(oldKey.getColumnName(), oldKey.getColumnValue());
            }

            MqMessageBody mqMessageBody = new MqMessageBody();
            mqMessageBody.setType(currentData.getEventType().name());
            mqMessageBody.setMessageBody(dataColumns);
            mqMessageBody.setOriginalMessageBody(originalMessageBody);
          #得到配置的表名,因为我们在manager里面配置的是使用逗号隔开,0是ex,1是routingKey
            String[] change = currentData.getTableName().split(",");
            MqPlugin.init(currentData.getSchemaName(), (RabbitMQMediaSource) dataMediaSource)
                    .publisher(currentData.getSchemaName())
                    .toExchange(change[0])
                    .routingKey(change[1])
                    .requireConfirm(true)
                    .build(MessageType.JSON).send(JsonUtils.marshalToString(mqMessageBody));

            return true;
        }

   .....

}

至此,简单修改已经完成,可以支持rabbitMQ的接受,当数据变更时,会将字段和值,以及类型发送到mq中

修改node配置文件

## otter arbitrate & node connect manager config
otter.manager.address = 127.0.0.1:1099

启动node,启动是需要设置启动参数 nid 为manage中配置的Id

在manage中点击启用即可生效:


image.png

插入数据表数据:


image.png

RabbitMQ消息内容:


image.png

gitee地址:https://gitee.com/lumiaomiao126/otter-rabbitMQ
使用相同的修改,可以支持到kafka和rockMQ,es等等其他的通信方式

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容