11. mapreduce 的 shuffle 调优参数
Map 端优化参数
选项 | 类型 | 默认值 | 描述 |
---|---|---|---|
io.sort.mb | int | 100 | 缓存map中间结果的buffer大小(MB) |
io.sort.record.percent | float | 0.05 | io.sort.mb中用来保存map output记录边界的百分比,其他缓存用来保存数据 |
io.sort.spill.percent | float | 0.80 | map开始做spill操作的阈值 |
io.sort.factor | int | 10 | 做merge操作时同时操作的stream数上限 |
min.num.spill.for.combine | int | 3 | combiner函数运行的最小spill数 |
mapred.compress.map.output | boolean | false | map中间结果是否采用压缩 |
mapred.map.output.compression.codec | class name | org.apache.hadoop.io.compress.DefaultCodec | map中间结果的压缩格式 |
Reduce 端优化参数
选项 | 类型 | 默认值 | 描述 |
---|---|---|---|
mapred.reduce.parallel.copies | int | 5 | 每个reduce并行下载map结果的最大线程数 |
mapred.reduce.copy.backoff | int | 300 | reduce下载线程最大等待时间(in sec) |
io.sort.factor | int | 10 | 同上 |
mapred.job.shuffle.input.buffer.percent | float | 0.7 | 用来缓存shuffle数据的reduce task heap百分比 |
mapred.job.shuffle.merge.percent | float | 0.66 | 缓存的内存中多少百分比后开始做merge操作 |
mapred.job.reduce.input.buffer.percent | float | 0.0 | sort完成后reduce计算阶段用来缓存数据的百分比 |
12. hive 都在 mysql 中建了哪些表,分别用来干什么
详情参考一下连接:
重要的表有:
存储Hive版本的元数据表(VERSION)
Hive数据库相关的元数据表(DBS、DATABASE_PARAMS)
Hive 表和视图相关的元数据表(TBLS、TABLE_PARAMS、TBL_PRIVS,这三张表通过TBL_ID关联)
Hive 表分区相关的元数据表(主要涉及 PARTITIONS、PARTITION_KEYS、PARTITION_KEY_VALS、PARTITION_PARAMS)
Hive 文件存储信息相关的元数据表(主要涉及SDS、SD_PARAMS、SERDES、SERDE_PARAMS)
13. hive 面试用神 sql,级联求和
需求:现有用户的每月消费金额情况如下,希望输出的结果如下,请用一个 hive sql 实现:
用户 | 时间 | 金额 |
---|---|---|
A | 2015-01 | 5 |
A | 2015-01 | 15 |
B | 2015-01 | 5 |
A | 2015-01 | 8 |
B | 2015-01 | 25 |
A | 2015-01 | 5 |
A | 2015-02 | 4 |
A | 2015-02 | 6 |
B | 2015-02 | 10 |
B | 2015-02 | 5 |
要求输出结果:
+---------+----------+--------+-------------+--+
| a.name | a.time | count | accumulate |
+---------+----------+--------+-------------+--+
| A | 2015-01 | 33 | 33 |
| A | 2015-02 | 10 | 43 |
| B | 2015-01 | 30 | 30 |
| B | 2015-02 | 15 | 45 |
+---------+----------+--------+-------------+--+
实现思路:
- 第一步,先求个用户的月总金额
select username,month,sum(salary) as salary from t_access_times group by username,month
+-----------+----------+---------+--+
| username | month | salary |
+-----------+----------+---------+--+
| A | 2015-01 | 33 |
| A | 2015-02 | 10 |
| B | 2015-01 | 30 |
| B | 2015-02 | 15 |
+-----------+----------+---------+--+
- 第二步,将月总金额表 自己连接 自己连接
select *
from
(select username,month,sum(salary) as salary from t_access_times group by username,month) A
inner join
(select username,month,sum(salary) as salary from t_access_times group by username,month) B
on
A.username=B.username
+-------------+----------+-----------+-------------+----------+-----------+--+
| a.username | a.month | a.salary | b.username | b.month | b.salary |
+-------------+----------+-----------+-------------+----------+-----------+--+
| A | 2015-01 | 33 | A | 2015-01 | 33 |
| A | 2015-01 | 33 | A | 2015-02 | 10 |
| A | 2015-02 | 10 | A | 2015-01 | 33 |
| A | 2015-02 | 10 | A | 2015-02 | 10 |
| B | 2015-01 | 30 | B | 2015-01 | 30 |
| B | 2015-01 | 30 | B | 2015-02 | 15 |
| B | 2015-02 | 15 | B | 2015-01 | 30 |
| B | 2015-02 | 15 | B | 2015-02 | 15 |
+-------------+----------+-----------+-------------+----------+-----------+--+
- 第三步,从上一步的结果中进行分组查询,分组的字段是 a.username a.month。求月累计值:将 b.month <= a.month 的所有 b.salary 求和即可
select A.username,A.month,max(A.salary) as salary,sum(B.salary) as accumulate
from
(select username,month,sum(salary) as salary from t_access_times group by username,month) A
inner join
(select username,month,sum(salary) as salary from t_access_times group by username,month) B
on
A.username=B.username
where B.month <= A.month
group by A.username,A.month
order by A.username,A.month;
+---------+----------+--------+-------------+--+
| a.name | a.time | count | accumulate |
+---------+----------+--------+-------------+--+
| A | 2015-01 | 33 | 33 |
| A | 2015-02 | 10 | 43 |
| B | 2015-01 | 30 | 30 |
| B | 2015-02 | 15 | 45 |
+---------+----------+--------+-------------+--+
14. sqoop 在导入数据到 mysql 中,如何让数据不重复导入?如果存在数据问题 sqoop如何处理?
--update-key ID --update-mode allowinsert,如何数据存在就更新,不存在就插入。
15. 使用 Hive 或者自定义 MapReduce 实现如下逻辑:
product_no lac_id moment start_time user_id county_id staytime city_id
13429100031 22554 8 2013-03-11 08:55:19.151754088 571 571 282 571
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 103 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100140 26642 9 2013-03-11 09:02:19.151754088 571 571 18 571
13429100082 22691 8 2013-03-11 08:57:32.151754088 571 571 287 571
13429100189 22558 8 2013-03-11 08:56:24.139539816 571 571 48 571
13429100349 22503 8 2013-03-11 08:54:30.152622440 571 571 211 571
字段解释:
product_no:用户手机号
lac_id:用户所在基站
start_time:用户在此基站的开始时间
staytime:用户在此基站的逗留时间
需求描述:
根据 lac_id 和 start_time 知道用户当时的位置,根据 staytime 知道用户各个基站的逗留时长。根据轨迹合并连续基站的 staytime。最终得到每一个用户按时间排序在每一个基站驻留时长
期望输出举例:
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 390 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571
考虑用 mapreduce 实现
实现参考MR 基站
问题分析:针对每个product_no按照start_time进行排序(本例降序),如果相邻两项的lac_id相同,则将staytime进行相加保存到后一项中,并将前一项移除。
自己已经做出来了。
请随意使用各种类型的脚本语言实现:批量将指定目录下的所有文件中的 $HADOOP_HOME$替换成/home/ocetl/app/hadoop
Python(没有验证)
import string,os,sys dir=’/var’
files=os.listdir(dir) for f in files: tmpContents = ‘’; for s in f.readlines(): tmpContents += s.replace(“$HADOOP_HOME$”,”/home/ocetl/app/hadoop”)
f.write(tmpContents) f.close()
shell(已验证)
#!/bin/bash
ls $1 | while read line
do
sed -i 's,\$HADOOP_HOME\$,\/home\/hadoop,g' $1$line
echo $1$line
done
16. mapreduce 的 shuffle 执行流程
maptask中的 OutputCollector 收集我们的 map() 方法输出的 kv 对,放到内存缓冲区(环形缓冲区)中(默认 100M)
当缓冲区的存储的数据超过了阈值(默认80%)时,会调用 spiller 线程从内存缓冲区溢出到本地磁盘文件,可能会溢出多个文件
多个溢出文件会被合并成大的溢出文件
在溢出过程中,及合并的过程中,都要调用 partitoner 进行分区和针对 key 进行排序(默认是快速排序)
reducetask 根据自己的分区号,去各个 maptask 机器上取相应的结果分区数据,将这些文件再进行合并和排序(归并排序)。
reducetask 合并完成后,shuffle 的过程也就结束了,后面进入 reducetask 的逻辑运算过程。
Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,==原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快==
缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认100M
17. hdfs 的 checkpoint 过程描述
HA 模式下的元数据合并
在HA模式下checkpoint过程由StandBy NameNode来进行,以下简称为SBNN,Active NameNode简称为ANN。
主要由4个步骤:
SBNN 检查是否达到 checkpoint 条件:离上一次 checkpoint 操作是否已经有一个小时,或者 HDFS 已经进行了100万次操作。
SBNN 检查达到 checkpoint 条件后,将 ANN 的最后一次操作日志读取到本地,将磁盘中的 fsimage 文件和 edits 文件加载到内存,合并成一份新的 fsimage 文件
然后 SBNN 通过 HTTP 联系 ANN。
ANN 通过 HTTP 方式从 SBNN 获取最新的 fsimage 文件并将旧的 fsimage 文件删除。
由于HA机制,会使得Standby NameNode和Active NameNode都拥有最新的fsimage和edits文件。
非 HA 模式的元数据合并
内存中有一份完整的元数据
磁盘有一个“准完整”的元数据镜像
当客户端对 hdfs 中的文件进行新增或者修改操作,相应的记录首先被记入 edits 这种 log 日志中,当客户端操作成功后,相应的元数据会更新到内存中
每隔一段时间,会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merge(合并)(这个过程称为checkpoint)
secondary namenode将合并好之后的新的fsimage镜像文件回传给namenode,namenode将旧的fsimage删除使用新的fsimage和新的edits.inprogress文件
namenode 和 secondary namenode 的工作目录存储结构完全相同,所以,当 namenode 故障退出需要重新恢复时,可以从 secondary namenode 的工作目录中将 fsimage 拷贝到 namenode 的工作目录,以恢复 namenode 的元数据
这样做的缺点就是:secondary namenode 的 fsimage 不是最新的。
什么时候进行 checkpoint?
由两个参数
- dfs.namenode.checkpoint.preiod (两次checkpoint之间的时间间隔,默认值是3600秒,即 1 小时)
- dfs.namenode.checkpoint.txns (两次checkpoint之间最大的操作记录,默认值是 100万次 )来决定。
period 参数表示,经过 1 小时就进行一次 checkpoint,txns 参数表示,hdfs 经过 100 万次操作后就要进行 checkpoint 了。这两个参数任意一个得到满足,都会触发 checkpoint 过程。进行 checkpoint 的节点每隔 dfs.namenode.checkpoint.check.period (默认值是 60 )秒就会去统计一次 hdfs 的操作次数。
18. hdfs 的读数据流程
client跟namenode通信查询元数据,找到文件块所在的datanode服务器
分别连接目标datanode,并发读取文件块,并在合并读取到的文件
datanode 以 packet 为单位向客户端发送。客户端接收后先存在本地缓存,然后再写入文件。
19. hdfs 的写数据流程
client 跟 namenode 通信请求上传文件,namenode 检查目标文件是否已存在(文件已存在则报错不能上传),父目录是否存在(目录不存在报错无法上传),以及客户端是否有新建文件的权限。
namenode 返回是否可以上传,如果可以上传,namenode 会返回这个文件是否被分块以及分块的具体信息。
client 请求第一个 block 该传输到哪些 datanode 服务器上
namenode 返回 3 个 datanode 服务器 ABC
client 请求 3 台 dn 中的一台 A 上传数据(本质上是一个 RPC 调用,建立 pipeline,连接离自己最近的 datanode),A收到请求会继续调用 B,然后 B 调用 C,将整个 pipeline 建立完成,逐级返回客户端
client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答
当一个block传输完成之后,client再次请求namenode上传第二个block的服务器,以此类推。
如果传输过程中,有某个 datanode 出现了故障,那么当前的 pipeline 会被关闭,出现故障的 datanode 会从当前的 pipeline 中移除,剩余的 block 会继续剩下的 datanode 中继续以 pipeline 的形式传输,同时 Namenode 会分配一个新的 datanode,保持 replicas 设定的数量。
20. Hive sql 优化解决数据倾斜问题
Hive 中出现数据倾斜的情况,一般发生在 sql 中的 group by 和 join 上,且和数据逻辑绑定比较深。个人认为解决数据倾斜问题的重点应该放在对数据的设计和业务的理解上。
优化方案:
使用 map side join,参考 hive使用技巧(四)——巧用MapJoin解决数据倾斜问题
针对 count(distinct) 操作,先转换成 group,然后再外面加上 count
Hive 的万能钥匙:hive.groupby.skewindata=true
left semi join 使用,实现 in 操作
尽量尽早的过滤数据,减少每个阶段的数据量,把 where 条件写到 join 的里面,减少 join 的数据量。
针对大量 key 为空的情况,将空的 key 和非空 key 做区分,空的 key 不做 join 操作。
避免使用 order by,原因如下:order by 会对输入做全局排序,因此只有一个 reducer,会导致当输入规模较大时,需要较长的计算时间。
尽量避免一个 sql 包含复杂的逻辑,可以将整个步骤拆分成多个中间表来完成。
join 操作,数据量小的表要放在 join 的左边
union all 的部分大于 2,或每个 union 的部分数据量很大,那么应该拆分成多个 insert into 语句。
具体参考 hive的查询注意事项以及优化总结
9. hbase 的 rowkey 设计原则
rowkey长度原则
==建议越短越好,不要超过16个字节==,原因如下:
- 数据的持久化文件HFile中是按照KeyValue存储的,如果rowkey过长,比如超过100字节,1000w行数据,光rowkey就要占用100*1000w=10亿个字节,将近1G数据,这样会极大影响HFile的存储效率。
- MemStore将缓存部分数据到内存,如果rowkey字段过长,内存的有效利用率就会降低,系统不能缓存更多的数据,这样会降低检索效率。
- 目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。
rowkey散列原则
如果rowkey按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。
rowkey唯一原则
必须在设计上保证其唯一性,rowkey是按照字典顺序排序存储的,因此,设计rowkey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。
10. hbase 常见的避免热点的方法
加盐
这里所说的加盐不是密码学中的加盐,而是在rowkey的前面增加随机数,具体就是给rowkey分配一个随机前缀以使得它和之前的rowkey的开头不同。分配的前缀种类数量应该和你想使用数据分散到不同的region的数量一致。加盐之后的rowkey就会根据随机生成的前缀分散到各个region上,以避免热点。
哈希
哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据
反转
第三种防止热点的方法时反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。
反转rowkey的例子以手机号为rowkey,可以将手机号反转后的字符串作为rowkey,这样的就避免了以手机号那样比较固定开头导致热点问题
时间戳反转
一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为rowkey的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如 [key][reverse_timestamp] , [key] 的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中rowkey是有序的,第一条记录是最后录入的数据。
比如需要保存一个用户的操作记录,按照操作时间倒序排序,在设计rowkey的时候,可以这样设计
[userId反转][Long.Max_Value - timestamp],在查询用户的所有操作记录数据的时候,直接指定反转后的userId,startRow是[userId反转][000000000000],stopRow是[userId反转][Long.Max_Value - timestamp]
如果需要查询某段时间的操作记录,startRow是[user反转][Long.Max_Value - 起始时间],stopRow是[userId反转][Long.Max_Value - 结束时间]
11. hbase 的列族如何设计
Hbase官方文档中写明,目前列族数量最优不超过3个。原因如下:
每个 RegionServer 包含多个 Region,每个 Region 包含多个Store,每个 Store 包含一个 MemStore 和多个 StoreFile。
在 Hbase 的表中,每个列族对应 Region 中的一个Store,Region的大小达到阈值时会分裂,因此如果表中有多个列族,则可能出现以下现象:
一个Region中有多个Store,如果每个CF的数据量分布不均匀时,比如CF1为100万,CF2为1万,则 Region分裂时导致CF2在每个Region中的数据量太少,查询CF2时会横跨多个Region导致效率降低。
如果每个CF的数据分布均匀,比如CF1有50万,CF2有50万,CF3有50万,则Region分裂时导致每个 CF 在Region的数据量偏少,查询某个CF时会导致横跨多个Region的概率增大。
多个CF代表有多个Store,也就是说有多个MemStore,也就导致内存的消耗量增大,使用效率下降。
Region 中的 缓存刷新 和 压缩 是基本操作,即一个CF出现缓存刷新或压缩操作,其它CF也会同时做一样的操作,当列族太多时就会导致IO频繁的问题。
12. hbase 的整体架构
- HMaster
HMaster用于协调多个 HRegionServer,侦测各个RegionServer之间的状态,并平衡RegionServer之间的负载。HMaster还有一个职责就是负责分配Region给RegionServer。==如果无Master过程中,数据读取仍照常进行,但是,region切分、负载均衡等无法进行==
- HRegionServer
对于一个HRegionServer而言,其包括了多个HRegion。HRegionServer的作用只是管理表格,以及实现读写操作。Client直接连接HRegionServer,并通信获取HBase中的数据。对于Region而言,则是真实存放HBase数据的地方,也就说Region是HBase可用性和分布式的基本单位.==每个 RegionServer 包含多个 Region,每个 Region 包含多个Store,每个 Store 包含一个 MemStore 和多个 StoreFile。每个 HStore 对应 Table 中的一个 Column Familiy 的存储。==
- Zookeeper
对于 HBase 而言,Zookeeper的作用是至关重要的。首先Zookeeper是作为HBase Master的HA解决方案。也就是说,是Zookeeper保证了至少有一个HBase Master 处于运行状态。并且Zookeeper负责Region和Region Server的注册。
22. mapreduce 中的 MRAppMaster 和 YarnChild 的作用分别是什么?
MRAppMaster
在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括作业管理、资源申请与再分配、Container启动与释放、作业恢复等。
具体参考:YARN MapReduce MRAppMaster-剖析
YarnChild
YarnChild:运行具体的 map/reduce task。
RunJar:完成job的初始化,包括获取jobID,将jar包上传至hdfs等。
job启动过程:
ResourceManager,NodeManager->RunJar->MRAppMaster->YarnChild
job退出过程:
YarnChild->MRAppMaster->RunJar