otter支持rabbitMQ源码修改
首先从ali源下载otter源码:
https://github.com/alibaba/otter
执行脚本文件manager/deployer 资源目录sql文件夹中
修改manager下deployer资源配置文件:otter.properties
修改zk地址和数据库连接地址,以及管理后台开放端口1099
#zk地址
otter.zookeeper.cluster.default = xx
#与node节点心跳连接地址
otter.communication.manager.port = 1099
#mysql数据库连接
otter.database.driver.class.name = com.mysql.jdbc.Driver
otter.database.driver.url = jdbc:mysql://xxxxxx/otter
otter.database.driver.username = xxxx
otter.database.driver.password = xxxx
管理后台启动入口,配置启动地址后,可以直接run起来
com.alibaba.otter.manager.deployer.OtterManagerLauncher
启动前可以开启debug日志,这样可以在控制台看到启动日志
#开启日志 logback.xml
<appender-ref ref="STDOUT" />
浏览器输入 127.0.0.1:8080即可看到页面,默认是匿名用户,退出后重新使用admin登入 用户名/密码:admin
上面介绍了简单的启动流程,下面开始准备支持rabbitMQ,如图所示:
修改文件 :addDataSource.vm
<th>类型:</th>
<td>
<select id="sourceType" name="$dataMediaSourceGroup.type.key" onchange="changeform();" >
<option value="MYSQL">MySQL</option>
<option value="ORACLE">Oracle</option>
<option value="RABBITMQ">RabbitMQ</option>
</select><span class="red">*</span>
</td>
修改文件:form.xml 该文件对应form表单验证 name="dataMediaSourceInfo"
<field name="vhost" displayName="vhost"/>
添加vhost文本框
<tr id="vhost_tr" style="display: none;">
<th>vhost:</th>
<td>
<input id="sourceVhost" name="$dataMediaSourceGroup.vhost.key" value="$!dataMediaSourceGroup.vhost.value" type="text" class="setting_input"/><span class="red">*</span>
<br />
<span class="red">#addDataSourceMessage ($dataMediaSourceGroup.vhost)</span>
</td>
</tr>
修改js文件 dbCheck.js 联动选择
function changeform(){
console.log("changed form")
//获取 sourceType 的Dom对象
var sourceType = document.getElementById('sourceType').value;
//获取 groupName 的Dom对象
var vhost_tr = document.getElementById("vhost_tr");
if ("RABBITMQ" === sourceType) {
// 如果是RocketMQ,则显示 groupName 的填写框
vhost_tr.style.display = "table-row";
} else {
// 否则不显示
vhost_tr.style.display = "none";
}
}
前端修改已经完成,可以run起来看效果
后端maven引用rabbitMQ client
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
修改文件:DataSourceChecker 支持连接验证检查
public String check(String url, String username, String password, String encode, String sourceType,
String vhost) {
if (DataMediaType.MYSQL.name().equals(sourceType) || DataMediaType.ORACLE.name().equals(sourceType)) {
return checkDB(url, username, password, encode, sourceType);
} else if (DataMediaType.RABBITMQ.name().equals(sourceType)) {
return checkMQ(url, username, password, vhost);
}
return DATABASE_FAIL;
}
public String checkMQ(String url, String username, String password, String vhost) {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
factory.setAutomaticRecoveryEnabled(true);
try {
factory.setUri(url);
Channel channel = factory.newConnection().createChannel();
channel.close();
} catch (Exception e) {
e.printStackTrace();
return MQ_CONN_FAIL;
}
return MQ_CONN_SUCCESS;
}
可以测试一下:
开始修改保存方法,支持数据源保存
添加rabbitMQ数据源类 RabbitMQMediaSource :
public class RabbitMQMediaSource extends DbMediaSource {
private String vhost;
private String url;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getVhost() {
return vhost;
}
public void setVhost(String vhost) {
this.vhost = vhost;
}
}
枚举添加rabbit映射 DataMediaType
public boolean isRabbitMQ() {
return this == DataMediaType.RABBITMQ;
}
修改文件:DataMediaSourceAction doAdd方法
if (dataMediaSource.getType().isRabbitMQ()) {
RabbitMQMediaSource rocketMqMediaSource = new RabbitMQMediaSource();
dataMediaSourceInfo.setProperties(rocketMqMediaSource);
rocketMqMediaSource.setUrl(dataMediaSourceInfo.getField("url").getStringValue());
// rocketMqMediaSource.setRoutingKey(dataMediaSourceInfo.getField("routingKey").getStringValue());
rocketMqMediaSource.setVhost(dataMediaSourceInfo.getField("vhost").getStringValue());
try {
dataMediaSourceService.create(rocketMqMediaSource);
} catch (RepeatConfigureException rce) {
err.setMessage("invalidDataMediaSource");
return;
}
至此,manage管理修改完成,实现了支持rabbitMQ数据源的添加。
下面我们需要配置数据表映射关系,然后修改node部分代码以支持rabbitMQ的发送
配置数据表
table name 为rabbitMQ的ex和routeKey组成,中间使用逗号隔开,选择rabbitMQ数据源即可,点击保存 如下图
然后配置一个mysql,如图
添加zk,node和canal,最后配置表映射 此处为otter基本配置,不在本文中介绍
配置完成,我们开始修改node代码
文件名称:RowDataTransformer 修改方法:transform
// 获取目标库的表信息
DbDialect dbDialect = null;
Table table = null;
if (dataMedia.getSource() instanceof RabbitMQMediaSource) {
//如果是rabbitMQ数据源,则使用source数据表表结构
dbDialect = dbDialectFactory.getDbDialect(dataMediaPair.getPipelineId(),
(DbMediaSource) sourceMedia.getSource());
table = dbDialect.findTable(sourceMedia.getNamespace(), result.getTableName());
result.setTableName(dataMedia.getName());
} else {
dbDialect = dbDialectFactory.getDbDialect(dataMediaPair.getPipelineId(),
(DbMediaSource) dataMedia.getSource());
table = dbDialect.findTable(result.getSchemaName(), result.getTableName());
}
执行sql语句是在sqlBuilderLoadInterceptor拦截器中执行,我们可以加一个判断如果是RabbitMQ,则发送消息到MQ中,以下是代码
public boolean before(DbLoadContext context, EventData currentData) {
DataMediaSource dataMediaSource = context.getDataMediaSource();
if (dataMediaSource instanceof RabbitMQMediaSource) {
if (!currentData.getEventType().isDml())//如果非数据库管理操作,则不继续执行
return true;
Map<String, Object> dataColumns = new HashMap<>();
Map<String, Object> originalMessageBody = new HashMap<>();
for (EventColumn key : currentData.getKeys()) {
dataColumns.put(key.getColumnName(), key.getColumnValue());
}
for (EventColumn column : currentData.getColumns()) {
dataColumns.put(column.getColumnName(), column.getColumnValue());
}
for (EventColumn oldKey : currentData.getOldKeys()) {
originalMessageBody.put(oldKey.getColumnName(), oldKey.getColumnValue());
}
MqMessageBody mqMessageBody = new MqMessageBody();
mqMessageBody.setType(currentData.getEventType().name());
mqMessageBody.setMessageBody(dataColumns);
mqMessageBody.setOriginalMessageBody(originalMessageBody);
#得到配置的表名,因为我们在manager里面配置的是使用逗号隔开,0是ex,1是routingKey
String[] change = currentData.getTableName().split(",");
MqPlugin.init(currentData.getSchemaName(), (RabbitMQMediaSource) dataMediaSource)
.publisher(currentData.getSchemaName())
.toExchange(change[0])
.routingKey(change[1])
.requireConfirm(true)
.build(MessageType.JSON).send(JsonUtils.marshalToString(mqMessageBody));
return true;
}
.....
}
至此,简单修改已经完成,可以支持rabbitMQ的接受,当数据变更时,会将字段和值,以及类型发送到mq中
修改node配置文件
## otter arbitrate & node connect manager config
otter.manager.address = 127.0.0.1:1099
启动node,启动是需要设置启动参数 nid 为manage中配置的Id
在manage中点击启用即可生效:
插入数据表数据:
RabbitMQ消息内容:
gitee地址:https://gitee.com/lumiaomiao126/otter-rabbitMQ
使用相同的修改,可以支持到kafka和rockMQ,es等等其他的通信方式