Flink连接HDFS出现java.net.UnknownHostException解决办法

1. 背景

在开发大数据平台XSailboat中的查看Flink任务的状态数据工具时,用State Process API解析保存点数据,将其从HDFS上读取出来再将其解析过后下沉到HDFS以CSV格式保存,然后由其它接口提供对这个文件的分页加载功能。

以CSV格式下沉到HDFS,笔者直接使用了DataStream上已经废弃的writeAsCsv方法,因为这个方法的特性正好和此处的需求相符,没有使用FileSink,因为它的Bucket特性,在此处不适用。

2. 问题

String checkPointUrl = "http://yc/a/b/c.csv" ;
SavepointReader.read(env, checkPointUrl , new HashMapStateBackend()) ;

在连接hdfs的过程中,出现了

java.lang.IllegalArgumentException: java.net.UnknownHostException: yc
    at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:465)
    at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:357)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:291)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:173)
    at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:168)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:528)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
    at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:257)
    at org.apache.flink.state.api.runtime.SavepointLoader.loadSavepointMetadata(SavepointLoader.java:50)
    at org.apache.flink.state.api.SavepointReader.read(SavepointReader.java:101)
    ...

3. 解决办法

yc这个集群名称是在Hadoop的配置文件hdfs-site.xml的。

<property>
        <name>dfs.nameservices</name>
        <value>yc</value>
    </property>

    <!-- yc包含两个NameNode,分别是nn1,nn2 -->
    <property>
        <name>dfs.ha.namenodes.yc</name>
        <value>nn1,nn2</value>
    </property>

    <!-- nn1的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.yc.nn1</name>
        <value>XCloud150:9000</value>
    </property>

    <!-- nn1的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.yc.nn1</name>
        <value>XCloud150:50070</value>
    </property>

    <!-- nn2的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.yc.nn2</name>
        <value>XCloud151:9000</value>
    </property>

    <!-- nn2的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.yc.nn2</name>
        <value>XCloud151:50070</value>
    </property>

经过代码跟踪分析得知,Flink使用org.apache.flink.runtime.util.HadoopUtils的getHadoopConfiguration方法得到Hadoop的Configuration(org.apache.hadoop.conf.Configuration)。
现摘录其中的代码:

public static Configuration getHadoopConfiguration(
            org.apache.flink.configuration.Configuration flinkConfiguration) {

   // Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
   // from the classpath

   Configuration result = new HdfsConfiguration();
   boolean foundHadoopConfiguration = false;

   // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
   // the hdfs configuration.
   // The properties of a newly added resource will override the ones in previous resources, so
   // a configuration
   // file with higher priority should be added later.

   // Approach 1: HADOOP_HOME environment variables
   String[] possibleHadoopConfPaths = new String[2];
   
   // #############################################################
   // 1. 从系统环境变量HADOOP_HOME目录下去寻找
   // 因hadoop的这些配置文件已经放置在XSailboat的配置目录config/MicroService/common目录下了,此种方式不合适,不采用
   final String hadoopHome = System.getenv("HADOOP_HOME");
   if (hadoopHome != null) {
   LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
     possibleHadoopConfPaths[0] = hadoopHome + "/conf";
     possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
   }

   for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
     if (possibleHadoopConfPath != null) {
        foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
     }
   }

   // Approach 2: Flink configuration (deprecated)
   // #############################################################
   // 2.通过HDFS_DEFAULT_CONFIG配置项指定配置文件所在位置,这个参数已经标注deprecated,不是首选
   final String hdfsDefaultPath =
                flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
   if (hdfsDefaultPath != null) {
      result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
      LOG.debug("Using hdfs-default configuration-file path from Flink config: {}"
          ,hdfsDefaultPath);
      foundHadoopConfiguration = true;
   }
   // #############################################################
   // 3.通过HDFS_SITE_CONFIG配置项指定配置文件所在位置,这个参数已经标注deprecated,不是首选
   final String hdfsSitePath =
                flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
   if (hdfsSitePath != null) {
      result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
      LOG.debug(
                    "Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
      foundHadoopConfiguration = true;
   }
   
   // #############################################################
   // 4.通过PATH_HADOOP_CONFIG配置项指定配置文件所在位置,这个参数已经标注deprecated,不是首选
   final String hadoopConfigPath =
                flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
   if (hadoopConfigPath != null) {
            LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
            foundHadoopConfiguration =
                    addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
   }

   // Approach 3: HADOOP_CONF_DIR environment variable
   // #############################################################
   // 5. 环境变量只能在启动参数上设置,我希望在程序运行期根据参数目录位置设置,以在部署过程中减少参数配置
   String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
   if (hadoopConfDir != null) {
      LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
      foundHadoopConfiguration =
                    addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
   }

   // Approach 4: Flink configuration
   // add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
   // #############################################################
   // 6.在flink配置里面配置,在原先hadoop的配置项名称前面加上flink.hadoop.
   for (String key : flinkConfiguration.keySet()) {
       for (String prefix : FLINK_CONFIG_PREFIXES) {
           if (key.startsWith(prefix)) {
               String newKey = key.substring(prefix.length());
               String value = flinkConfiguration.getString(key, null);
               result.set(newKey, value);
               LOG.debug(
                            "Adding Flink config entry for {} as {}={} to Hadoop config",
                            key,
                            newKey,
                            value);
               foundHadoopConfiguration = true;
            }
      }
   }

   if (!foundHadoopConfiguration) {
      LOG.warn(
                    "Could not find Hadoop configuration via any of the supported methods "
                            + "(Flink configuration, environment variables).");
   }

   return result;
 }

使用已经废弃的参数ConfigConstants.PATH_HADOOP_CONFIG尝试了一下,发现并没有起什么作用,跟踪代码发现执行了下面的代码:
类:org.apache.flink.core.fs.FileSystem

  // getUnguardedFileSystem方法内
            
  // this "default" initialization makes sure that the FileSystem class works
  // even when not configured with an explicit Flink configuration, like on
  // JobManager or TaskManager setup
  if (FS_FACTORIES.isEmpty()) {
     initializeWithoutPlugins(new Configuration());
  }
  // 配置类是new 出来的,所以外面的配置是进不去的。

处理方法,自己主动调用一下这个初始化方法

if(!AppContext.get("Flink_FS_Init", false))
{
  org.apache.flink.configuration.Configuration flinkHdfsConf = new org.apache.flink.configuration.Configuration() ;
  flinkHdfsConf.setString(ConfigConstants.PATH_HADOOP_CONFIG , MSApp.instance().getAppPaths().getCommonConfigDir().getAbsolutePath()) ;
  org.apache.flink.core.fs.FileSystem.initialize(flinkHdfsConf , null);
  AppContext.get("Flink_FS_Init", true) ;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,527评论 5 470
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,314评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,535评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,006评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,961评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,220评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,664评论 3 392
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,351评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,481评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,397评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,443评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,123评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,713评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,801评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,010评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,494评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,075评论 2 341