运行脚本(Flink On yarn)
export HADOOP_CONF_DIR=/data/aiops/hadoop/etc/hadoop
export HADOOP_USER_NAME=hdfs
FLINK_HOME=/home/aiops/flink-1.13.6
#这个是官方的样例, 但是跑完后就会停止不方便观察运行情况
#WORDCOUNT_JAR=$FLINK_HOME/examples/streaming/WordCount.jar
#MAIN=org.apache.flink.streaming.examples.wordcount.WordCount
WORDCOUNT_JAR=$FLINK_HOME/flink_demo-1.0-SNAPSHOT.jar
MAIN=org.example.Main
$FLINK_HOME/bin/flink run -d \
-m yarn-cluster \
-c $MAIN $WORDCOUNT_JAR \
-t streaming \
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.18.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.23.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.23.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.example.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
如果想用低版本的flink运行(如1.13.6), 则对应更改依赖的artifactId会多一个后缀标志scala的版本
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
代码
package org.example;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class Main {
private static final Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2).createInput(new GenericInputFormat<Object>() {
@Override
public boolean reachedEnd() throws IOException {
return false;
}
@Override
public Object nextRecord(Object o) throws IOException {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ignored) {
}
return UUID.randomUUID().toString() + "-" + System.currentTimeMillis();
}
}).addSink(new SinkFunction<Object>() {
@Override
public void invoke(Object value, Context context) throws Exception {
logger.info("rec---" + value);
}
});
env.execute();
}
}
注意: pom中的依赖范围是provided, 这个是为了避免和hadoop classpath或flink/lib下的jar冲突, 所以在IDE运行项目时, 需要勾选上provided, 否则会报类找不到