在此记录 pyflink 运行过程中遇到的问题以及解决方法。让小伙伴们少走弯路。
Q1: No module named 'encodings'
Caused by: java.io.IOException: Failed to execute the command: venv.zip/venv/bin/python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))
output: Could not find platform independent libraries <prefix>
Could not find platform dependent libraries <exec_prefix>
Consider setting $PYTHONHOME to <prefix>[:<exec_prefix>]
Fatal Python error: initfsencoding: unable to load the file system codec
ModuleNotFoundError: No module named 'encodings'
问题分析:Python 环境的问题,用 virtualenv 来管理虚拟环境时遇到。
问题解决:利用以下脚本(注意修改 pyflink 版本),使用 miniconda3 来管理虚拟环境即可解决。
wget https://ci.apache.org/projects/flink/flink-docs-release-1.11/downloads/setup-pyflink-virtual-env.sh
sh setup-pyflink-virtual-env.sh 1.11.2
source venv/bin/activate # 激活虚拟环境
Q2: No module named 'encodings'
File "main.py", line 2
SyntaxError: Non-ASCII character '\xe5' in file main.py on line 3, but no encoding declared; see http://www.python.org/peps/pep-0263.html for details
org.apache.flink.client.program.ProgramAbortException
问题分析:表面上是无法解析非 ASCII 码,实际上是因为当前的 python 版本错了。通过 flink run 来提交 Python 任务时,Flink 会调用 “python” 命令,要求 python 的版本为 3.5, 3.6 或者 3.7 中的一个。
问题解决:激活虚拟环境,使得运行 python -V
时显示的 python 版本为 3.5,3.6 或 3.7
Q3: Could not find any factory for identifier 'kafka'
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.org.apache.flink.client.program.ProgramAbortException
问题分析:没有指定 kafka jar 文件
问题解决:在 flink run 的时候,加入参数 -j flink-sql-connector-kafka_2.11-1.11.2.jar
,具体 jar 包的下载路径见 Apache Kafka SQL Connector,根据 kafka 版本选择 jar 包来下载。
Q4: The parallelism must be a positive number: yarch
The parallelism must be a positive number: yarch
问题分析:并行度要设置为正数
问题解决:在 flink run 的时候,加入参数 -p 1
。但问题可能出在 flink 上。直接部署的 flink 没有问题,但是集成到 CDH 后,flink 的配置参数发生了变化,导致无法以简单的 flink run 的方式来运行
Q5: No manifest found in jar file '/xxxx/venv.zip'
org.apache.flink.client.program.ProgramInvocationException: No manifest found in jar file '/xxxx/venv.zip'. The manifest is need to point to the program's main class.
问题分析:没有找到 manifest 文件来指定程序的 main class。
问题解决:同 Q4,可能是 flink 没有正确安装好。
Q6: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
问题分析:jar 包里没有指定 'Main-Class' 或 'program-class'
问题解决:同 Q4,可能是 flink 没有正确安装好。
Q7:java.net.MalformedURLException: no protocol:
py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL.
: java.net.MalformedURLException: no protocol:
问题分析:no protocol,没有指定通信协议异常。
问题解决:看看报错所在的行,如果是 URL ,检查是否缺少 http:// ;如果是路径,检查是否缺少 file://。
Q8:Method registerFunction(...) does not exist
py4j.protocol.Py4JError: An error occurred while calling o4.registerFunction. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method registerFunction([class java.lang.String, class com.flink.udf.TopN]) does not exist
问题分析:registerFunction 函数不存在,看源码 registerFunction 是 TableEnvironment._j_env 对象的方法,怀疑是 _j_env 没有正确定义。_j_env 应该是指 java 运行的环境。
问题解决:在创建 TableEnvironment 的时候,再传入环境变量。下面举例说明。
原来的创建方式:
# 流处理环境
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
改正后的创建方式,注意 create 函数里的第一个入参为前面初始化好的 env 变量:
# 流处理环境
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
对于批处理同理:
from pyflink.dataset import ExecutionEnvironment
env = ExecutionEnvironment.get_execution_environment()
...
Q9:Caused by: org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
Caused by: org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:404)
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:216)
问题分析:TaskExecutor 没有在运行状态。可以运行 jps 查看是否有 TaskManagerRunner 。
问题解决:重启 Flink。
Q10:Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
...
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness: xxxx/site-packages/pyflink/fn_execution/boot.py --id=1-1 --logging_endpoint=localhost:58957 --artifact_endpoint=localhost:58958 --provision_endpoint=localhost:58959 --control_endpoint=localhost:58956
...
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Server sendMessage() failed with Error"
debug_error_string = "{"created":"@1605690803.729786000","description":"Error received from peer ipv6:[::1]:58959","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Server sendMessage() failed with Error","grpc_status":1}"
>
...
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
...
Caused by: java.lang.IllegalStateException: Process died with exit code 0
...
问题分析:发送消息给 RPC 服务失败,具体原因不知道。
问题解决:重新提交 Flink 作业。
Q11:Process died with exit code 127
问题背景:我想要在流处理作业的 UDF 里使用某个依赖项(如 faker ),于是编写了 requirements.txt 并且使用如下的 pip install 命令下载该依赖项的安装包到 cached_dir 目录下,如上图所示,包含了 4 个文件。完成后,以 yarn-cluster 模式提交作业到集群上,但是出现了下面的问题,看作业的运行状态是 finished (流处理作业不应该会 finished),最终状态是 failed。
pip download -d cached_dir -r requirements.txt --no-binary :all:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
...
java.lang.RuntimeException: Failed to create stage bundle factory!
...
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 127
...
Caused by: java.lang.IllegalStateException: Process died with exit code 127
...
问题分析:通过控制变量法,定位到问题在于 python 依赖包。可以看到在 cached_dir 里面有一个 python 原生自带的包 python-deteutil ,可能是在安装这个包的时候发生了冲突(猜测)。
问题解决:把 python-deteutil-2.8.1.tar.gz 删除后就没有问题了。最终 cached_dir 只包含了 3 个必备的安装包。