···
package com.loongair.linky.app.example;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.loongair.linky.utils.GetSqlUtil1;
import com.loongair.linky.utils.HikariUtil;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.;
import java.util.concurrent.;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
public class LongAirJdbcSinkObToOb {
static String jdbcURL = "jdbc:mysql://10.1.128.113:22883/db_d_ods?useSSL=false&rewriteBatchedStatements=true&allowMultiQueries=true&useServerPrepStmts=true&serverTimezone=UTC";
static String jdbcUser = "uatods@loongair_ob_cx#loongair_ssd:3";
static String jdbcPassword = "UATods123!!";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// properties.setProperty("scan.startup.mode", "latest_offset");
// properties.setProperty("snapshot.locking.mode", "none");
// properties.setProperty("scan.incremental.snapshot.enabled","true");
// properties.setProperty("scan.incremental.snapshot.chunk.size","4096");
// properties.setProperty("scan.snapshot.fetch.size","10000");
// properties.setProperty("debezium.min.row. count.to.stream.result","10000");
// MySqlSource<String> stringMySqlSource = MySqlSource.<String>builder()
// // .startupOptions(StartupOptions.latest())
// .hostname("10.1.147.147")
// .username("uatads")
// .password("UATads123!")
// .port(3308)
// .databaseList("db_t_ads") // monitor all tables under inventory database
// .tableList("db_t_ads.ads_rd_foc_t2018") // 如果不写则监控库下的所有表,需要使用【库名.表名】
// .serverTimeZone("Asia/Shanghai")
// .startupOptions(StartupOptions.initial())
// .deserializer(new JsonDebeziumDeserializationSchema()) // 自定义返回数据格式
// .debeziumProperties(DebeziumProperties.getDebeziumProperties())
// .build();
env.enableCheckpointing(3000);
String serverTimeZone = "+08:00";
SourceFunction<String> oceanBaseSource =
OceanBaseSource.<String>builder()
.rsList("10.1.129.154:2882:2881;10.1.129.155:2882:2881;10.1.129.156:2882:2881")
.startupMode(StartupMode.LATEST_OFFSET)
.username("uathsd@loongair_ob_cx#loongair_ssd:3")
.password("UAThsd123!!")
// .username("uatods@loongair_ob_cx#loongair_ssd:3")
// .password("UATods123!!")
.tenantName("loongair_ob_cx")
// .databaseName("ob_db_psg_data_center_test")
// .tableName("flight_info")
// .tableList("ob_db_psg_data_center_test.flight_info,ob_db_psg_data_center_test.market_flight_info,ob_db_psg_data_center_test.passenger_frequent_info,ob_db_psg_data_center_test.vip_boarding_record")
.tableList("ob_db_psg_data_center_test.vip_boarding_record")
.hostname("10.1.128.113")
.port(22883)
.compatibleMode("mysql")
.jdbcDriver("com.mysql.jdbc.Driver")
.logProxyHost("10.1.129.157")
.logProxyPort(2983)
.serverTimeZone(serverTimeZone)
.workingMode("memory")
.deserializer(new OceanBaseDeserializer())
.build();
DataStreamSource<String> OBsource = env.addSource(oceanBaseSource, "flight_info1");
// // 模拟源数据流
// DataStream<String> source = env.fromSource(stringMySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source");
OBsource.print(">>>>>OBsource");
// env.execute();
DataStream<JSONObject> jsonStream = OBsource.map((MapFunction<String, JSONObject>) value -> {
JSONObject jsonObject = JSON.parseObject(value);
// System.out.println("==="+jsonObject.toJSONString()+"===");
return jsonObject;
});
// 定义一个滚动窗口,窗口大小为5秒
DataStream<List<JSONObject>> windowedStream = jsonStream
// .keyBy((key) -> key.getString("ts_ms")) // 使用你的key选择器
.keyBy((key) -> key.getString("tableName")) // 使用你的key选择器
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.aggregate(new AggregateFunction<JSONObject, List<JSONObject>, List<JSONObject>>() {
@Override
public List<JSONObject> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<JSONObject> add(JSONObject value, List<JSONObject> accumulator) {
accumulator.add(value);
return accumulator;
}
@Override
public List<JSONObject> getResult(List<JSONObject> accumulator) {
return accumulator;
}
@Override
public List<JSONObject> merge(List<JSONObject> a, List<JSONObject> b) {
a.addAll(b);
return a;
}
});
// windowedStream.print(">>>>>>>windowedStream");
windowedStream.addSink(new RichSinkFunction<List<JSONObject>>() {
Connection conn = null;
@Override
public void open(Configuration parameters) throws Exception {
conn = HikariUtil.getDruidDataSource();
conn.setAutoCommit(false);
}
@Override
public void close() throws Exception {
if (conn != null) {
conn.close();
}
}
@Override
public void invoke(List<JSONObject> value, Context context) throws Exception {
long start = new Date().getTime();
// CopyOnWriteArrayList<CopyOnWriteArrayList<String>> jdbcsql = new CopyOnWriteArrayList<>();
List<String> sqllist = GetSqlUtil1.getSqlByArray(JSON.toJSONString(value));
// System.out.println("sqllist" + sqllist.size());
// 开始时间
long start1 = System.currentTimeMillis();
// 每500条数据开启一条线程
int threadSize = 5000;
// 总数据条数
int dataSize = sqllist.size();
// 线程数
int threadNum = dataSize / threadSize + 1;
// 定义标记,过滤threadNum为整数
boolean special = dataSize % threadSize == 0;
// 创建一个线程池
ExecutorService exec = Executors.newFixedThreadPool(threadNum);
List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
Callable<Integer> task = null;
CopyOnWriteArrayList<String> cutList = null;
final int[] num = {0};
for (int n = 0; n < threadNum; n++) {
if (n == threadNum - 1) {
if (special) {
break;
}
cutList = new CopyOnWriteArrayList(sqllist.subList(threadSize * n, dataSize).toArray());
} else {
cutList = new CopyOnWriteArrayList(sqllist.subList(threadSize * n, threadSize * (n + 1)).toArray());
}
final CopyOnWriteArrayList<String> listStr = cutList;
task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
AtomicInteger count = new AtomicInteger(0);
listStr.parallelStream().forEach(sql -> {
try (Statement statement = conn.createStatement()) {
System.out.println("sql---->"+sql);
statement.addBatch(sql);
if (count.incrementAndGet() % 1 == 0) {
synchronized (conn) { // Synchronize on the connection to ensure safe batch execution
int[] i = statement.executeBatch();
System.out.println("更新了" + i[0] + "数据");
conn.commit();
}
}
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
});
if (count.get() % 500 != 0) {
try (Statement statement = conn.createStatement()) {
int[] i = statement.executeBatch();
System.out.println("外面更新了" + i[0] + "数据");
conn.commit();
}
}
return 1;
}
};
tasks.add(task);
}
System.out.println("tasksSize=" + tasks.size());
List<Future<Integer>> results = exec.invokeAll(tasks);
for (Future<Integer> future : results) {
System.out.println("results.size=" + results.size());
// System.out.println(future.get());
}
// 关闭线程池
exec.shutdown();
System.out.println("线程任务执行结束");
System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start1) + "毫秒");
// AtomicInteger count = new AtomicInteger(0);
// sqllist.parallelStream().forEach(sql->{
// try (Statement statement = conn.createStatement()) {
// // If you have parameters in SQL, set them here
// statement.addBatch(sql);
// if (count.incrementAndGet() % 500 == 0) {
// synchronized (conn) { // Synchronize on the connection to ensure safe batch execution
// int[] i = statement.executeBatch();
// System.out.println("更新了"+i[0]+"数据");
// conn.commit();
// count.set(0);
// }
// }
// } catch (SQLException e) {
// throw new RuntimeException(e);
// }
// });
// if (count.get() % 500 != 0) {
// try (Statement statement = conn.createStatement()) {
// int[] i = statement.executeBatch();
// System.out.println("外面更新了"+i[0]+"数据");
// conn.commit();
// }
// }
// long cost = new Date().getTime()-start;
// System.out.println("解析好了"+cost/1000+"秒");
// try {
//// value.forEach(json -> {
//// jdbcsql.add(GetSqlUtil1.getSqlByArray(JSON.toJSONString(json)));
//// });
//// int count = 0;
//// for(List<String> sqllist:jdbcsql){
//// for(String sql:sqllist){
//// count++;
//// try (PreparedStatement pstmt = conn.prepareStatement(sql)){
//// pstmt.addBatch();
//// if(count%500==0){
//// int[] i = pstmt.executeBatch();
//// System.out.println("更新了"+i[0]+"数据");
//// conn.commit();
//// count=0;
//// }
//// }
//// }
//// }
//// if (count > 0) {
//// try (Statement statement = conn.createStatement()) {
//// int[] i = statement.executeBatch();
//// System.out.println("外面更新了"+i[0]+"数据");
//// conn.commit();
//// }
//// }
// // Use a thread-safe structure to accumulate results
//// AtomicInteger count = new AtomicInteger(0);
//// // Using try-with-resources for auto-closing of Statement
//// jdbcsql.parallelStream().forEach(sqlObj -> {
//// sqlObj.parallelStream().forEach(sql->{
//// try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
//// // If you have parameters in SQL, set them here
//// pstmt.addBatch();
//// if (count.incrementAndGet() % 500 == 0) {
//// synchronized (conn) { // Synchronize on the connection to ensure safe batch execution
//// int[] i = pstmt.executeBatch();
//// System.out.println("更新了"+i[0]+"数据");
//// conn.commit();
//// count.set(0);
//// }
//// }
//// } catch (SQLException e) {
//// throw new RuntimeException(e);
//// }
//// });
////
//// });
////
//// if (count.get() % 500 != 0) {
//// try (Statement statement = conn.createStatement()) {
//// int[] i = statement.executeBatch();
//// System.out.println("外面更新了"+i[0]+"数据");
//// conn.commit();
//// }
//// }
//
// } catch (Exception e) {
// conn.rollback(); // Rollback transaction on exception
// e.printStackTrace();
// throw e;
// }
}
}).setParallelism(1);
try {
env.execute("JDBC Sink Example");
} catch (
Exception e) {
throw new RuntimeException(e);
}
}
}
// ------------JdbcSink.sink 示例 -------------------
// SinkFunction<JSONObject> jdbcSink = JdbcSink.sink(
// "INSERT INTO ads_rd_foc_t2001 (FLIGHT_ID, FLIGHT_DATE, FLIGHT_TYPE,
// FLIGHT_NO, AC_TYPE, LAYOUT, AC_REG, DEPARTURE_AIRPORT, ARRIVAL_AIRPORT, HTD,
// STD, ETD, OUT, ACARS_ATD, ATD, HTA, STA, ETA, ACARS_ATA, ATA, INN, BAY,
// BAY_2, ON_BOARD_TIME, CLOSE_DOOR_TIME, OPEN_DOOR_TIME, D_OR_I, P_OR_C,
// ADJUST_TYPE, FLG_DELAY, FLG_VR, FLG_PATCH, FLG_CS, BASE, CARRIER, FILIALE,
// SERIAL, AC_LINK_LINE, CREW_LINK_LINE, DEPA_DIV_AIRPORT, FPL_DIV_AIRPORT1,
// FPL_DIV_AIRPORT2, FLAG_FPL, FLAG_RLS, FLAG_TAKEOFF, FLY_TIMES, FLIGHT_STATUS,
// REMARK, ESTIMATE_SK_TIME, BUSINESS_DYNAMIC_TIME, REMARK_BUSINESS,
// PROVIDER_ID, DIET_TYPE, CREW_PROVIDER_ID, CREW_DIET_TYPE, JW_OPERATOR,
// SELECT_FLG, TOTAL_FUEL, TRIP_FUEL, SLIDE_FUEL, IMPT_FLT, RR_COUNT, DIET_FLAG,
// READ_OK, CHK_STEP, PTD, PTA, DISPATCH_IMPT, TELE_REMARK, CREW_TICKET,
// FLY_DEVICE, ICE_FLAG, WATER_FLAG, CLOSE_CARGODOOR_TIME,
// CLOSE_CARGODOOR_REMARK, REAL_FUEL, REAL_FUEL_MODI, REAL_SEGMENT_FUEL,
// REAL_SEGMENT_FUEL_MODI, PZ_FUEL_FLAG, RLS_FUEL_FLAG, REAL_RLS_FUEL,
// REAL_RLS_FUEL_MODI, FLAG_CREWZL, UNITS, SLIDE_TIME, ROUTE_DIV_AIRPORT,
// CHECK_IN_COUNT, NCKQ_TIME, WCKQ_TIME, CLOSE_DOOR_COUNT, CDM, FLG_DELAY_SF,
// PASSFAST, FLIGHT_DATE1, FLG_DELAY_CG, FLG_DELAY_JS, FLG_DELAY_JG, CDM_FLAG,
// LAST_MODIFY_TIME, SECRECY, CTOT, PTDNEW, PTANEW, OPEN_CARGO_DOOR_TIME,
// PRESET_FUEL, FLG_DELAY_CLOSE_TIME, CLOSE_DELAY_TYPE, TOBT, FRC_PLANLINE_TID)
// VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
// ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
// ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
// ?, ?, ?) ON DUPLICATE KEY UPDATE FLIGHT_ID=VALUES(FLIGHT_ID),
// FLIGHT_DATE=VALUES(FLIGHT_DATE), FLIGHT_TYPE=VALUES(FLIGHT_TYPE),
// FLIGHT_NO=VALUES(FLIGHT_NO), AC_TYPE=VALUES(AC_TYPE), LAYOUT=VALUES(LAYOUT),
// AC_REG=VALUES(AC_REG), DEPARTURE_AIRPORT=VALUES(DEPARTURE_AIRPORT),
// ARRIVAL_AIRPORT=VALUES(ARRIVAL_AIRPORT), HTD=VALUES(HTD), STD=VALUES(STD),
// ETD=VALUES(ETD), OUT=VALUES(OUT), ACARS_ATD=VALUES(ACARS_ATD),
// ATD=VALUES(ATD), HTA=VALUES(HTA), STA=VALUES(STA), ETA=VALUES(ETA),
// ACARS_ATA=VALUES(ACARS_ATA), ATA=VALUES(ATA), INN=VALUES(INN),
// BAY=VALUES(BAY), ON_BOARD_TIME=VALUES(ON_BOARD_TIME),
// CLOSE_DOOR_TIME=VALUES(CLOSE_DOOR_TIME),
// OPEN_DOOR_TIME=VALUES(OPEN_DOOR_TIME), D_OR_I=VALUES(D_OR_I),
// P_OR_C=VALUES(P_OR_C), ADJUST_TYPE=VALUES(ADJUST_TYPE),
// FLG_DELAY=VALUES(FLG_DELAY), BASE=VALUES(BASE), CARRIER=VALUES(CARRIER),
// FILIALE=VALUES(FILIALE), SERIAL=VALUES(SERIAL),
// AC_LINK_LINE=VALUES(AC_LINK_LINE), CREW_LINK_LINE=VALUES(CREW_LINK_LINE),
// FPL_DIV_AIRPORT1=VALUES(FPL_DIV_AIRPORT1),
// FPL_DIV_AIRPORT2=VALUES(FPL_DIV_AIRPORT2), FLAG_FPL=VALUES(FLAG_FPL),
// FLAG_RLS=VALUES(FLAG_RLS), FLAG_TAKEOFF=VALUES(FLAG_TAKEOFF),
// FLIGHT_STATUS=VALUES(FLIGHT_STATUS),
// ESTIMATE_SK_TIME=VALUES(ESTIMATE_SK_TIME), SELECT_FLG=VALUES(SELECT_FLG),
// TOTAL_FUEL=VALUES(TOTAL_FUEL), TRIP_FUEL=VALUES(TRIP_FUEL),
// SLIDE_FUEL=VALUES(SLIDE_FUEL), RR_COUNT=VALUES(RR_COUNT),
// READ_OK=VALUES(READ_OK), CHK_STEP=VALUES(CHK_STEP), PTD=VALUES(PTD),
// PTA=VALUES(PTA), TELE_REMARK=VALUES(TELE_REMARK),
// FLY_DEVICE=VALUES(FLY_DEVICE),
// CLOSE_CARGODOOR_TIME=VALUES(CLOSE_CARGODOOR_TIME),
// REAL_SEGMENT_FUEL=VALUES(REAL_SEGMENT_FUEL),
// PZ_FUEL_FLAG=VALUES(PZ_FUEL_FLAG), RLS_FUEL_FLAG=VALUES(RLS_FUEL_FLAG),
// REAL_RLS_FUEL=VALUES(REAL_RLS_FUEL), FLAG_CREWZL=VALUES(FLAG_CREWZL),
// SLIDE_TIME=VALUES(SLIDE_TIME), CHECK_IN_COUNT=VALUES(CHECK_IN_COUNT),
// NCKQ_TIME=VALUES(NCKQ_TIME), WCKQ_TIME=VALUES(WCKQ_TIME),
// FLIGHT_DATE1=VALUES(FLIGHT_DATE1), LAST_MODIFY_TIME=VALUES(LAST_MODIFY_TIME),
// SECRECY=VALUES(SECRECY), FLG_DELAY_CLOSE_TIME=VALUES(FLG_DELAY_CLOSE_TIME),
// CLOSE_DELAY_TYPE=VALUES(CLOSE_DELAY_TYPE);\n",
// (ps, t) -> {
// String[] fields = t.keySet().toArray(new String[0]);
// for (int i = 0; i < fields.length; i++) {
// if (Objects.isNull(t.get(fields[i]))) {
// ps.setNull(i + 1, Types.VARCHAR); // Adjust type as needed
// } else {
// ps.setObject(i + 1, t.get(fields[i])); // Will automatically call the proper
// ps.set* method based on the type of value
// }
// }
// },
// new JdbcExecutionOptions.Builder()
// .withBatchSize(5000) // 按照 500 条数据进行批处理
// .build(),
// new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
// .withUrl(jdbcURL)
//// .withDriverName("org.h2.Driver") // 使用合适的 JDBC 驱动
// .withUsername(jdbcUser)
// .withPassword(jdbcPassword)
// .build()
// );
// // 自定义触发器 (未使用)
// public static class TimeOrCountTrigger extends Trigger<Object, TimeWindow> {
//
// private static final long serialVersionUID = 1L;
//
// private final long maxCount;
// private final long intervalMillis;
//
// private final ValueStateDescriptor<Long> countDescriptor = new ValueStateDescriptor<>("count", Long.class, 0L);
//
// private TimeOrCountTrigger(long maxCount, long intervalMillis) {
// this.maxCount = maxCount;
// this.intervalMillis = intervalMillis;
// }
//
// @Override
// public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx)
// throws Exception {
// ValueState<Long> count = ctx.getPartitionedState(countDescriptor);
// long currentCount = count.value() + 1;
// count.update(currentCount);
//
// if (currentCount >= maxCount) {
// count.update(0L);
// return TriggerResult.FIRE;
// }
//
// long nextFireTimestamp = window.getStart() + intervalMillis;
//
// if (nextFireTimestamp <= ctx.getCurrentProcessingTime()) {
// return TriggerResult.FIRE;
// } else {
// ctx.registerProcessingTimeTimer(nextFireTimestamp);
// }
//
// return TriggerResult.CONTINUE;
// }
//
// @Override
// public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
// return TriggerResult.FIRE;
// }
//
// @Override
// public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
// return TriggerResult.CONTINUE;
// }
//
// @Override
// public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
// ctx.getPartitionedState(countDescriptor).clear();
// ctx.deleteProcessingTimeTimer(window.getStart() + intervalMillis);
// }
//
// public static TimeOrCountTrigger create(long maxCount, Duration interval) {
// return new TimeOrCountTrigger(maxCount, interval.toMillis());
// }
// }
···