本地IDEA搭建开发环境,实现local模式spark对开启Kerberos认证的云端hive数据进行读写操作。
一、环境版本:
本地PC: win10
开发环境: IntelliJ IDEA 2019.1.3 (Community Edition)
java: jdk1.8.0_162
scala: 2.11.12
spark: 2.3.2-mrs-2.0
hadoop: 3.1.1-mrs-2.0
集群版本: 华为云MRS 2.0.5
Kerberos认证:开启
二、环境配置过程
-
打通本地windows与云端MRS集群网络MRS集群
-》hdfs、hive组件主节点、从节点均需要绑定弹性公网ip
-》添加安全组规则:
查询本地windows公网ip,在MRS集群安全组添加入方向规则,协议端口简单粗暴设置为全部放通。
配置完后,可以在本地windows测试在集群绑定的弹性公网ip连通性,是否ping通,telnet相应端口是否成功。
创建添加安全组规则的详细步骤
开启Kerberos认证的集群,需要创建开发用户。
-》用户需要有HDFS、HIVE、YARN等权限,才可以运行spark程序。这里依然简单粗暴的创建并赋予sparkuser该用户所有组件的全部权限。
创建开发用户的详细步骤
-》在MRS Manager界面选择“系统设置>用户管理”,在用户名中选择sparkuser,单击操作中下载认证凭据文件,保存后解压得到用户的keytab文件与krb5.conf文件,将文件中的ip地址改为之前绑定的相应弹性公网ip,用于在spark程序本地运行时进行安全认证。-
下载集群客户端配置文件
-》将hdfs-site.xml、core-site.xml、hive-site.xml、yarn-site.xml、mapred-site.xml等*.xml配置文件放到IDEA工程中的资源目录下。
-》将集群客户端配置中hosts文件的集群节点ip修改为之前绑定的弹性公网ip,将修改好的ip主机名映射添加到本地windows的hosts文件。
IDEA中创建Maven工程
-》配置华为云镜像
kerberos认证代码参考华为的样例代码
-》将步骤3的集群客户端配置文件和用户凭证keytab、krb5.conf放到工程的资源目录下,将客户端配置文件中的所有ip地址修改成hosts文件中相应的主机名。
-》在hdfs-site.xml中添加如下配置
本地测试PC与集群不在一个局域网,这种情况下,本地访问hdfs时,namenode会返回数据所在的datanode地址,但是返回的可能是datanode的内网私有ip,我们无法根据该ip访问数据节点datanode,添加如下配置,让namenode返回datanode的域名。之前我们已经在本地hosts文件配置了所有节点的公网ip。因此本地就可以通过域名访问到hdfs中的数据了。
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
<description>only cofig in clients</description>
</property>
-》spark访问云端hdfs代码如下:
package com.huawei.bigdata.spark.examples
import java.io.File
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import com.huawei.hadoop.security.LoginUtil
object FemaleInfoCollection {
def main(args: Array[String]) {
// security mode
val userPrincipal = "spark_wang"
val filePath = System.getProperty("user.dir") + File.separator + "resources" + File.separator
val userKeyTableFile = filePath + "user.keytab"
val krbFile = filePath + "krb5.conf"
val hadoopConf: Configuration = new Configuration()
// hadoopConf.set("dfs.client.use.datanode.hostname", "true") // 已在hdfs-side.xml添加该配置
LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf)
// Configure the Spark application name.
val conf = new SparkConf().setAppName("CollectFemaleInfo")
.setMaster("local")
// Initializing Spark
val sc = new SparkContext(conf)
// Read data. This code indicates the data path that the input parameter args(0) specifies.
val text = sc.textFile("/user/spark_wang/female-info.txt") // 默认会从配置文件中获取hdfs地址,可以写成全路径hdfs://node-master1bcgx:9820/user/spark_wang/female-info.txt
// Filter the data information about the time that female netizens spend online.
val data = text.filter(_.contains("female"))
// Aggregate the time that each female netizen spends online
val femaleData: RDD[(String, Int)] = data.map { line =>
val t = line.split(',')
(t(0), t(2).toInt)
}.reduceByKey(_ + _)
// Filter the information about female netizens who spend more than 2 hours online, and export the results
val result = femaleData.filter(line => line._2 > 10)
result.collect().map(x => x._1 + ',' + x._2).foreach(println)
sc.stop()
}
}
-》spark本地读云端hive代码如下:
本地可以访问到hive上的所有数据库,如果访问不到云端hive,结果要么报错,要么只能显示default一个数据库名称。
package com.huawei.bigdata.spark.examples
import java.io.File
import com.huawei.hadoop.security.LoginUtil
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* Author: whn
* Date: 2020-2-6 14:49
* Version: 1.0
* Function:
*/
object SparkHiveOnCloud {
case class Test(occupy: String, name: String, age: Int)
def main(args: Array[String]): Unit = {
val userPrincipal = "spark_wang"
val filePath = System.getProperty("user.dir") + File.separator + "resources" + File.separator
val userKeyTableFile = filePath + "user.keytab"
val krbFile = filePath + "krb5.conf"
val hadoopConf: Configuration = new Configuration()
hadoopConf.set("dfs.client.use.datanode.hostname", "true")
LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf)
val conf = new SparkConf()
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local")
.config(conf)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
spark.sql("SHOW DATABASES").show()
//运行结果
// +------------+
// |databaseName|
// +------------+
// | default|
// |mrs_reserved|
// | test|
// +------------+
val arr = Array(("a", "whn", 20), ("b", "cjj", 12), ("c", "haha", 18), ("f", "jay", 2), ("g", "kobe", 210), ("z", "asdf", 11))
// 本地创建Dataframe写入云端hive
spark.sparkContext.parallelize(arr)
.map(tuple => Test(tuple._1, tuple._2, tuple._3))
.toDF("occupy", "name", "age")
.write
.format("hive")
.mode("append")
.saveAsTable("test.longi")
// 读hive
val data = spark.sql("Select * from test.longi")
data.show()
spark.stop()
}
}
至此就实现了云端集群外的本地window通过spark local模式直接访问云端集群的hdfs、hive数据并进行读写操作。
-》附上pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.huawei.spark.examples</groupId>
<artifactId>SparkScalaExample</artifactId>
<version>mrs-2.0</version>
<name>SparkScalaExample</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.3.2-mrs-2.0</spark.version>
<hadoop.version>3.1.1-mrs-2.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<configuration>
<recompileMode>modified-only</recompileMode>
</configuration>
<executions>
<execution>
<id>main-scalac</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<directory>target</directory>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<sourceDirectory>src</sourceDirectory>
</build>
</project>