这种模式非常适合调试信息
1. spark local模式连接远程hadoop集群
- 创建test.txt文件,写入几行数据,上传到hdfs中
- 把hadoop集群的配置文件
core-site.xml
和hdfs-site.xml
复制到项目的resources
目录下
经过测试,linux下运行idea,没有这两个文件也能连上hadoop集群
- 创建scala代码
package hdfs
import org.apache.spark.sql.SparkSession
object HdfsTest {
def main(args: Array[String]): Unit = {
// 如果在windows本地跑,需要从widnows访问HDFS,需要指定一个合法的身份
// System.setProperty("HADOOP_USER_NAME", "hdfs")
val spark = SparkSession.builder()
.appName("hdfs-test")
.master("local")
// 设置参数
.config("dfs.client.use.datanode.hostname", "false")
.getOrCreate();
// spark.sparkContext.setLogLevel("DEBUG")
//支持通配符路径,支持压缩文件读取
val path = "hdfs://10.121.138.145:8020/test.txt"
val rdd = spark.read.textFile(path)
//统计数量
println("count = "+rdd.count())
//停止spark
spark.stop()
}
}
重点说明:
- 如果在windows下运行,请添加
System.setProperty("HADOOP_USER_NAME", "hdfs")
代码,否则会提示Permission Denied
- CDH默认
dfs.client.use.datanode.hostname
为true
,意思为使用hostname连接hdfs,如果我们不修改本机的host文件,本地是无法连接datanode机器。有三种方式解决
- 修改本机的host,需要配置上所有的hadoop的host及ip
- 修改
hdfs-site.xml
配置中的dfs.client.use.datanode.hostname
为false
- 代码中通过
sparkSession
设置.config("dfs.client.use.datanode.hostname", "false")
直接运行此代码即可,如果输出test.txt中的文本行数,恭喜你实验成功!!
2. spark local模式连接远程hive集群
连接hive集群,没有找到类似
dfs.client.use.datanode.hostname
hive相关的配置,所以只能通过hosts文件来解决。
第一步:修改本机hosts文件,配置hive集群的hostname及ip
修改本机hosts文件,让本机能够解析hive集群中的机器名及ip
比如我的配置:
10.121.138.145 lfg00
10.121.138.146 lfg01
10.121.138.147 lfg02
10.121.138.148 lfg03
第二步:连接hive
这步有两种方式:
- 复制hive的配置文件hive-site.xml至项目的resources文件夹下
- 在代码中增加
.config("hive.metastore.uris","thrift://lfg00:9083")
第三步:代码
package hdfs
import org.apache.spark.sql.SparkSession
import utils.GeoHash
/**
* 从hadoop读取数据,计算geohash并写入hadoop/hive
*/
object GeohashTest {
def main(args: Array[String]): Unit = {
// 如果在windows本地跑,需要从widnows访问HDFS,需要指定一个合法的身份
// System.setProperty("HADOOP_USER_NAME", "hdfs")
val spark = SparkSession.builder()
.appName("hdfs-test")
.master("local")
// use ip
.config("dfs.client.use.datanode.hostname", "false")
// 如果输出目录已存在,则直接覆盖
.config("spark.hadoop.validateOutputSpecs", "false")
// 连接hive地址,如果复制hive-site.xml到resources下,则不需要此配置
// .config("hive.metastore.uris","thrift://lfg00:9083")
// 启用hive支持
.enableHiveSupport()
.getOrCreate();
// spark.sparkContext.setLogLevel("DEBUG")
//支持通配符路径,支持压缩文件读取
val inputPath = "hdfs://10.121.138.145:8020/test/put2/*"
writeToHive(spark,inputPath)
//停止spark
spark.stop()
}
/**
* 写入hive
* @param spark
* @param inputPath
*/
def writeToHive(spark:SparkSession, inputPath:String): Unit = {
val cols = List("lat", "lng", "value", "hash")
val rdd = spark.read.textFile(inputPath)
import spark.implicits._
val out = rdd.map(line => {
val columns = line.split(",")
// 计算geohash
val hash = GeoHash.encode(columns(0).toDouble, columns(1).toDouble, 8)
(columns(0), columns(1), columns(2), hash)
}).toDF(cols: _*)
out.printSchema()
// 创建临时表
out.createOrReplaceTempView("tmp_data")
// 把临时表数据写入hive
spark.sql(
"""
| insert overwrite table unicomm_poc.test_geohash select * from tmp_data
|""".stripMargin)
}
}