13面试问题(2)
是否存在i+1<i的数
存在,int最大值
redis的五种数据类型是指
value的类型,key只能是string类型
企业站内搜索技术选型
单独使用Lucene实现站内搜索需要开发的工作量较大,因此不建议采用。
solr是基于Lucene的全文搜索服务器,能满足搜索技术
esearch是更加新的技术
4、解释内存溢出和内存泄漏,不用的对象需要置为null吗?
内存溢出 out of memory,是指程序在申请内存时,没有足够的内存空间供其使用
内存泄露 memory leak,是指程序在申请内存后,无法释放已申请的内存空间
内存泄露会导致内存溢出
需要,能释放内存空间
定时任务
直接修改/etc/crontab文件
每星期日的6:30执行ls命令:
30 6 * * 0 ls
每15分钟执行一次ls命令:
*/15 * * * * ls
几种垃圾回收机制
标记-清除收集器
标记-压缩收集器
复制收集器
Fs命令与dfs命令有什么不同的地方
fs可以操作任何文件系统
dfs:只能操作HDFS文件系统
10大数据习题集
1.kafka集群的规模,消费速度是多少。
答:一般中小型公司是10个节点,每秒20M左右。
hadoop的shuffle过程
一、Map端的shuffle
Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill。
在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。partition的目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据。接着运行combiner(如果设置了的话),combiner的本质也是一个Reducer,其目的是对将要写入到磁盘上的文件先进行一次处理,这样,写入到磁盘的数据量就会减少。最后将数据写到本地磁盘产生spill文件(spill文件保存在{mapred.local.dir}指定的目录中,Map任务结束后就会被删除)。
最后,每个Map任务可能产生多个spill文件,在每个Map任务完成前,会通过多路归并算法将这些spill文件归并成一个文件。至此,Map的shuffle过程就结束了。
二、Reduce端的shuffle
Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。
首先要将Map端产生的输出文件拷贝到Reduce端,但每个Reducer如何知道自己应该处理哪些数据呢?因为Map端进行partition的时候,实际上就相当于指定了每个Reducer要处理的数据(partition就对应了Reducer),所以Reducer在拷贝数据的时候只需拷贝与自己对应的partition中的数据即可。每个Reducer会处理一个或者多个partition,但需要先将自己对应的partition中的数据从每个Map的输出结果中拷贝过来。
接下来就是sort阶段,也成为merge阶段,因为这个阶段的主要工作是执行了归并排序。从Map端拷贝到Reduce端的数据都是有序的,所以很适合归并排序。最终在Reduce端生成一个较大的文件作为Reduce的输入。
最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上。
5.spark streming在实时处理时会发生什么故障,如何停止
StreamingContext.stop会把关联的SparkContext对象也停止,如果不想把SparkContext对象也停止的话可以把StremingContext.stop的可选参数stopSparkContext设为flase。
7.说一下你对hadoop生态圈的认识。
说一下大数据PPT
8.yarn的理解:
ResourceManager:负责整个集群的资源管理和调度 ApplicationMaster:负责应用程序相关事务,比如任务调度、任务监控和容错等。 目前可以支持多种计算框架运行在YARN上面,比如MapReduce、storm、Spark、Flink。
6、两个文件合并的问题:给定a、b两个文件,各存放50亿个url,每个url各占用64字节,内存限制是4G,如何找出a、b文件共同的url?
1)主要的思想是把文件分开进行计算,在对每个文件进行对比,得出相同的URL,因为以上说是含有相同的URL所以不用考虑数据倾斜的问题。详细的解题思路如下:
a、可以估计每个文件的大小为5G*64=300G,远大于4G。所以不可能将其完全加载到内存中处理。考虑采取分而治之的方法。
b、遍历文件a,对每个url求取hash(url)%1000,然后根据所得值将url分别存储到1000个小文件(设为a0,a1,...a999)当中。这样每个小文件的大小约为300M。
b、遍历文件b,采取和a相同的方法将url分别存储到1000个小文件(b0,b1....b999)中。这样处理后,所有可能相同的url都在对应的小文件(a0 vs b0, a1 vs b1....a999 vs b999)当中,不对应的小文件(比如a0 vs b99)不可能有相同的url。然后我们只要求出1000对小文件中相同的url即可。
c、比如对于a0 vs b0,我们可以遍历a0,将其中的url存储到hash_map当中。然后遍历b0,如果url在hash_map中,则说明此url在a和b中同时存在,保存到文件中即可。
d、如果分成的小文件不均匀,导致有些小文件太大(比如大于2G),可以考虑将这些太大的小文件再按类似的方法分成小小文件即可
7、按照需求使用spark编写一下程序?
A、当前文件a.text的格式如下,请统计每个单词出现的个数和第四列每个元素出现的个数
A,b,c,d
B,b,f,e
A,a,c,f
sc.textFile(“a.text”).flatMap(_.split(“,”)).map((_,1)).ReduceByKey(_+_).foreach(println)
sc.textFile(“a.text”).map(line=>{(line.split(",")(3),1)}).reduceByKey(_+_).foreach(println)
B、HDFS中有两个文件a.text与b.text,文件的格式为(ip,username),如:a.text,b.text
a.text
127.0.0.1 xiaozhang
127.0.0.1 xiaoli
127.0.0.2 wangwu
127.0.0.3 lisi
B.text
127.0.0.4 lixiaolu
127.0.0.5 lisi
每个文件至少有1000万行,请用程序完成以下工作,
1)出现在b.text而没有出现在a.text的IP
rdd22.filter(x=>{!x.contains(rdd11)}).foreach(println)
8.kafka 重启是否会导致数据丢失
不会 因为kafka会做持久化,并且offset会保存在zk中,重启后每次读取都会从kafka最新的offset读取(可以从zk里面指定kafka的offset实现从指定位置读取)
9.讲讲checkpoint
checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面),防止很长的计算过程出错时还要从头计算,并且为整合历史数据提供了可能
10.spark streaming 的优缺点
和storm与flink对比来说
11.kafka的message包括哪些信息
一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成
12.怎么查看kafka的offset
consumer.position()
13. rdd 怎么分区宽依赖和窄依赖
宽依赖:父RDD的分区被子RDD的多个分区使用 例如 groupByKey、reduceByKey、sortByKey等操作会产生宽依赖,会产生shuffle
窄依赖:父RDD的每个分区都只被子RDD的一个分区使用 例如map、filter、union等操作会产生窄依赖
14.怎么解决kafka的数据丢失
producer端:
宏观上看保证数据的可靠安全性,肯定是依据分区数做好数据备份,设立副本数。
broker端:
topic设置多分区,分区自适应所在机器,为了让各分区均匀分布在所在的broker中,分区数要大于broker数。
分区是kafka进行并行读写的单位,是提升kafka速度的关键。
Consumer端
consumer端丢失消息的情形比较简单:如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。为了避免数据丢失,现给出两点建议:
enable.auto.commit=false 关闭自动提交位移
在消息被完整处理之后再手动提交位移
3.0请写出以下的shell命令
(1)杀死一个job
(2)删除hdfs上的 /tmp/aaa目录
答:(1)hadoop job –list 得到job的id,然后执 行 hadoop job -kill jobId就可以杀死一个指定jobId的job工作了。
(2)hadoopfs -rmr /tmp/aaa
12.0 请简述mapreduce中的combine和partition的作用
答:combiner是发生在map的最后一个阶段,其原理也是一个小型的reducer,主要作用是减少输出到reduce的数据量,缓解网络传输瓶颈,提高reducer的执行效率。
partition的主要作用将map阶段产生的所有kv对分配给不同的reducer task处理,可以将reduce阶段的处理负载进行分摊
31.数据的三范式
答:
第一范式()无重复的列
第二范式(2NF)属性完全依赖于主键 [消除部分子函数依赖]
第三范式(3NF)属性不依赖于其它非主属性 [消除传递依赖]
35.MapReduce优化经验(即hive调优、即hadoop调优)
答:(1.)设置合理的map和reduce的个数。合理设置blocksize
(2.)避免出现数据倾斜
(4.对数据进行压缩
(5.小文件处理优化:事先合并成大文件,用MR中的combineTextInputformat
103. hive 中的压缩格式 RCFile、TextFile、SequenceFile各有什么区别?
其余两个都进行了压缩,但是不可以直接打开,在创建hive表时需要指定文件格式才能使用
14、有可能使hadoop任务输出到多个目录中吗?如果可以,怎么做?
自定义outputformat或者用multioutputs工具
四、 你们数据库怎么导入hive 的,有没有出现问题
像blob或clob之类的数据,采取不抽取
增量全量导入配置的参数不同
空值问题、分隔符问题
infa导入时数据长度问题
sqoop导入时还要注意字段类型转换问题
五、hdfs-site.xml的3个主要属性?
dfs.name.dir决定的是元数据存储的路径以及DFS的存储方式(磁盘或是远端)
dfs.data.dir决定的是数据存储的路径
fs.checkpoint.dir用于第二Namenode
八、hadoop和spark都是并行计算,那么他们有什么相同和区别?
两者都使用mr模型来进行并行计算,hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。
Spark用户提交的任务称为application,一个application对应一个SparkContext,app中存在多个job,没触发一个action操作就会产生一个job。
这些job可以并行或者串行执行,每个job有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和application一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算的。
Hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系。
Spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作join,groupby等,而且通过DAG图可以实现良好的容错。
十一、简单说一下hadoop和spark的shuffle过程
Hadoop:map端保存分片数据,通过网络收集到reduce端。
Spark:spark的shuffle实在DAGSchedular划分Stage的时候产生的,TaskSchedular要分发Stage到各个worker的executor。减少shuffle可以提高性能。
十六、请列出正常的hadoop集群中hadoop都分别需要启动 哪些进程,他们的作用分别都是什么,请尽量列的详细一些。
namenode:负责管理hdfs中文件块的元数据,响应客户端请求,管理datanode上文件block的均衡,维持副本数量
Secondname:主要负责做checkpoint操作;也可以做冷备,对一定范围内数据做快照性备份。
Datanode:存储数据块,负责客户端对数据块的io请求
Jobtracker :管理任务,并将任务分配给 tasktracker。
Tasktracker: 执行JobTracker分配的任务。
Resourcemanager、Nodemanager、Journalnode、Zookeeper、Zkfc
1.Kudu优势在于:
提供快速的全量数据分析与实时处理功能
结构化的数据模型,支持标准SQL语法,支持数据的更新操作。
集成Impala,利用Imapla SQL语法可操作Kudu数据。
与mapreduce、spark以及其它hadoop生态系统集成。
利用Cloudera Manager,方便管理和维护。
高可用。Tablet Servers and Masters利用Raft Consensus Algorithm.确保只要有一半的副本可用,则tablet可用(可读写)。
对数据顺序扫描(scan)和随机访问(random access)同时具有高性能,简化用户复杂的混合架构。
2.kudu的基本结构
Table
Table就是你在Kudu里存储数据的地方,可以理解为一个表
Tablet
Tablet是存储的最小单位,类似HDFS的block,HBase的region。
Tablet Server
类似HDFS的DataNode, tablet server上存了多个Tablet,每个Tablet有多个副本存放在不同的Table Server上,每个Tablet副本同时只有一个Leader,Leader对用户提供写操作,然后同步给其它follower,其它follower只提供读服务,不提供写服务。当Leader节点发生故障后,通过算法 Raft Consensus Algorithm来重新选举Leader节点。
Master Server
类似HDFS的NameNode, Master负责管理元数据。这些元数据包括talbet的基本信息,位置信息。Master还作为负载均衡服务器,监听Tablet Server的健康状态
Catalog Table
Catalog table存储着元数据信息:
3.Spark Streaming的三种运用场景
1、无状态操作
只关注当前新生成的小批次数据,所有计算都只是基于这个批次的数据进行处理。
2、状态操作
除了当前新生成的小批次数据,但还需要用到以前所生成的所有的历史数据
3、window操作
关注窗口内的批次数据
7. “jps”命令的用处?
解答:
这个命令可以检查Namenode、Datanode、Task Tracker、 Job Tracker是否正常工作。
12. 请列出你所知道的 hadoop 调度器,并简要说明其工作方法?
解答:
1.FIFO schedular:默认,先进先出的原则
2.Capacity schedular:计算能力调度器,选择占用最小,优先级高的先执行,以此类推。
3.Fair schedular:公平调度,所有的job具有相同的资源。
22. hadoop 的 namenode 宕机,怎么解决
解答:
先分析宕机后的损失,宕机后直接导致client无法访问,内存中的元数据丢失,但是硬盘中的元数据应该还存在,如果只是节点挂了,重启即可,如果是机器挂了,重启机器后看节点是否能重启,不能重启就要找到原因修复了。但是最终的解决方案应该是在设计集群的初期就考虑到这个问题,做namenode的HA。
27. 在hadoop中文件的压缩带来了两大好处:
解答:
(1)它减少了存储文件所需的空间;
(2)加快了数据在网络上或者从磁盘上或到磁盘上的传输速度;
28.java类型如何转化为hadoop基本类型?
调用hadoop类型的构造方法,或者调用set()方法。
new LongWritable(123L);
29.hadoop基本类型如何转化为java类型?
对于Text,需要调用toString()方法,其他类型调用get()方法。
3. 生产环境中为什么建议使用外部表?
解答:
1、因为外部表不会加载数据到hive,减少数据传输、数据还能共享。
2、hive不会修改数据,所以无需担心数据的损坏
3、删除表时,只删除表结构、不删除数据。
8. 假如一个分区的数据错误怎么通过hivesql删除
解答:
alter table ptable drop partition (daytime='20140911',city='bj');
元数据,数据文件都删除,但目录daytime= 20140911还在
9. Hive里面用什么代替in查询
解答:
提示:Hive中的left semi join替换sql中的in操作
7. HBase的检索支持3种方式:
解答:
(1) 通过单个Rowkey访问,即按照某个Rowkey键值进行get操作,这样获取唯一一条记录;
(2) 通过Rowkey的range进行scan,即通过设置startRowKey和endRowKey,在这个范围内进行扫描。这样可以按指定的条件获取一批记录;
(3) 全表扫描,即直接扫描整张表中所有行记录。
08hadoop生态圈复习笔记
Zookeeper概述
Zookeeper介绍
Zookeeper是分布式应用程序的协调服务框架,是Hadoop的重要组件。ZK要解决的问题:
1.分布式环境下的数据一致性。
2.分布式环境下的统一命名服务
3.分布式环境下的配置管理
4.分布式环境下的分布式锁
5.集群管理问题
分布式编程容易出现的问题
1.活锁:
多个线程争用一个资源,但是没有任何一个 线程能拿到这个资源。如此循环往复,就形成了活锁。活锁会耗尽Cpu资源(在做无意义的调度)
2.死锁:
有一个线程拿到资源,但相互等待互不释放造成死锁
3.需要考虑集群的管理问题:
需要有一套机制来检测到集群里节点的状态变化。zk是通过临时节点监控哪个服务器挂掉的!!!!!!
4.如果用一台机器做集群管理:
存在单点故障问题,所以针对集群管理,也需要形成一个集群(奇数台,至少3台)
5.管理集群里Leader的选举问题(要根据一定的算法和规则来选举),包括要考虑Leader挂掉之后,如何从剩余的follower里选出Leader
6.分布式锁的实现,用之前学的重入锁,同步代码块是做不了的(因为之前学的锁都是单机锁)
Zk数据结构
1.ZK有一个最开始的节点
2.每个znode节点都可存储数据
3.Znode树的维系实在内存中,目的是供用户快速的查询
4.每个znode节点都是一个路径(通过路径来定位这个节点)
5.每个路径名都是唯一的。
想要执行以下指令,需要先启动zk服务器端(bash zkServer.sh start),再启动zk客户端(bash zkCli.sh)
ZK指令
ls/create/delete/get/set
Zk集群搭建完毕后的数据一致性:
如果用java代码连接该集群中的任何一个服务器,对其做的节点上的修改,在其余服务器也会产生同样的修改(即:集群中的服务器保证数据完全一致)
Zookeeper事务概念
1.每一个创建节点、修改节点、删除节点操作都是一个事务,每一个事务都用一个事务id来代表,叫:zxid
2.zxid 是全局唯一,并且全局递增的。作用就是可以根据最大事务id,找到最新的事务
Zookeeper选举机制
Leader选举出来之后
Leader身上一般是有最新数据的(有最大事务id的),所以首先做的就是原子广播(通过原子广播端口发送事物给其余服务器)。
原子广播的目的:
一、是为了确保数据一致性(即客户端无论通过哪个zk服务器查看数据,数据都是一样的)
二、是为了防止leader挂掉之后,数据的丢失问题
节点类型及特点
persistent(持久节点):
ephemeral(临时节点):
如果和他绑定的服务器宕机了,他则消失
观察者模式
观察者模式的产生意义:
为了提高Zk选举性能,处理思想就是减少投票人数。(前提是满足过半机制),
由此,zk引出了观 察者模式 观察者特点:
1.不参与投票(选举的投票以及事务更新的投票)
2.观察者会跟随最后的投票结果
Zookeeper的特性
数据一致性(单一视图)
client不论连接到哪个Zookeeper,展示给它都是同一个视图,即查询的数据都是一样的。这是zookeeper最重要的性能。
原子性
对于事务决议的更新,要么都更新成功,要么都不更新。
可靠性
一旦服务端发生改变,那么这次变更将会一直保留下来,除非有另一个事务又对其进行了改变。
实时性
Zookeeper保证客户端将在非常短的时间间隔范围内获得服务器的更新信息,或者服务器失效 的信息,或者指定监听事件的变化信息。(前提条件是:网络状况良好)
过半性 zookeeper
集群必须有半数以上的机器存活才能正常工作。因为只有满足过半数,才能满足选 举机制选出Leader。因为只有过半,在做事务决议时,事务才能更新。 所以一般来说,zookeeper集群的数量最好是奇数个。
Zk的脑裂
脑裂的定义:
在管理集群里,出现两个Leader的状况,造成数据混乱,造成整个集群出现问题。脑裂问题是不可控和不可模拟的。出现脑裂问题的根源是选举机制自集群的选举。 并且普遍存在于Master-slave架构(主从架构)里。
Zk脑裂的预防 :
为选举的Leader分配递增id,根据id的大小去判断是否老Leader或新Leader,如果是老Leader, 就不接受其指令。
hadoop
什么是Hadoop?
Hadoop是将海量数据在分布式集群上储存(通过HDFS(分布式数据储存系统)),并运行分布式分析数据(基于mapreduce的一套数据处理框架)
Hadoop能储存和抽取非关系型数据,但并没有查询语言介入,因此不能说它是一个数据库,它更像一个数据仓库,只有通过mapreduce这样的工具才能进行真正的数据处理
HDFS的特点
HDFS概述(HDFS架构图):
HDFS中存在:
一个名字节点NameNode(对应一台服务器)和多个数据节点DataNode(对应多台服务器)
NameNode
存储元数据信息(元数据:描述数据的数据)
元数据保存在内存/磁盘中
保存文件、block、datanode之间的映射关系
DataNode
存储block内容
存储在磁盘中
HDFS优点
1.支持超大文件。
2.检测和快速应对硬件故障
3.使应用程序能以流的形式访问数据集,增大了数据的吞吐量
4.大部分hdfs操作文件时,需要一次写入,多次读取,不会修改,有利于提高吞吐量
5.高容错性(数据自动保存多个副本,副本丢失后自动恢复)
6.可构建在廉价机器上,提高集群存储能力
HDFS缺点
不适用于低延迟数据访问的场景(因为提高了数据吞吐量,而牺牲了获取数据的延迟)
不适用于大量的小文件储存的场景(namenode的内存大小,决定了hdfs文件系统可保存的文件数量。大量的小文件会在namenode上产生大量的记录,占用其空间)
不适用于多用户写入文件、修改文件的场景,只有这样数据的吞吐量才能大。
不支持超强的事务(没有像关系型数据库那样,对事务有强有力的支持)
HDFS细节说明
Block块概念
数据块(block)是HDFS为文件提供的切块后的数据储存的空间,其默认大小为128M,若文件小于128M,则块的大小为文件大小
NameNode
A.NameNode维护着HDFS中的元信息,数据格式参照如下:
/test/a.log(哪个文件),3(备份了几份),{b1,b2}(切了几块),[{b1:[h0,h1,h3]}(第一块的位置都在哪些datanode上),{b2:[h0,h2,h4]}(第二块的位置都在哪些datanode上)
B.NameNode中的元数据(metadata)信息存储在:
内存/文件中,内存中为实时信息,文件中为数据镜像作为持久化存储使用。
这些文件包括:
fsimage :元数据镜像文件。存储某NameNode元数据信息,但并不是实时同步内存中的数据。
edits :记录操作的日志文件
元数据具体储存流程如下:
NameNode会首先将元数据写到edits文件中,写入成功后才会修改内存中的元数据,并向客户端返回这才请求的结果。
此时不会更新fsimage中的元数据,所以,fsimage中的元数据并不是实时的元数据,只有在达到条件时再根据edits进行更新,更新过程需要SecondaryNameNode(相当于NN的助理,专门负责合并的工作)参与。
上图所示:达到条件后 snn会将nn中的fsimage和edits文件拷贝过来,同时nn中会创建一个新的edits.new文件,新的读写请求会写入到这个edits.new中,在snn中将拷贝过来的fsimage和edits合并为一个新的fsimage,最后snn将合并完成的fsimage文件拷贝回nn中替换之前的fsimage,nn再将edtis.new改为edits
问:ssn可以对元数据做一定程度的备份,但不是热备,对不对?那什么情况下可能造成NameNode元数据信息丢失?
snn并不是nn的热备,但是能保存大部分备份数据。原因就在于在合并过程中,如果nn发生故障,那edits.new中的数据丢失了就找不回来了,找回来的只能是用于合并的原edits,因此通常NameNode和SNN要放置到不同机器中以此提升性能,并提供一定的元数据安全性。
何时触发数据合并?:
根据配置文件设置的时间间隔:fs.checkpoint.period 默认3600秒;
根据配置文件设置的edits log大小 fs.checkpoint.size 默认64MB;
DataNode
DataNode节点会不断向NameNode节点发送心跳报告。
初始化时,每个数据节点将当前存储的数据块告知NameNode节点。
通过向NameNode主动发送心跳保持与其联系(3秒一次)
后续DataNode节点在工作的过程中,数据节点仍会不断的更新NameNode节点与之对应的元数据信息,并接受来自NameNode节点的指令,创建、移动或者删除本地磁盘上的数据块。
如果10分钟都没收到dn的心跳,则认为其已经lost,并copy其上的block到其他dn
Block三个副本放置策略:
第一个副本:如果上传文件的服务器本身就是DataNode,就放置在上传文件的DN;如果是外部客户端向集群上传,就随机选择一台磁盘不太满,cpu不太忙的节点
第二个副本:放置在第一个副本不同机架的节点上(会自动找到不同机架)
第三个副本:放置在与第二个副本相同机架的节点上(机架内通讯比机架间通讯块)
HDFS执行流程
HDFS读流程图
1.OpenFile:使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
Get Block: Namenode会视情况返回文件的部分或者全部block列表,对于每个block,Namenode都会返回有该block拷贝的三个三个DataNode地址;
客户端开发库Client会选取离客户端最接近的DataNode来读取block;如果客户端本身就是DataNode,那么将从本地直接获取数据.
读取完当前block的数据后,关闭与当前的DataNode连接,并为读取下一个block寻找最佳的DataNode,每读取完一个block都会进行checksum验证,如果读取datanode时出现错误,客户端会通知Namenode,然后再从下一个拥有该block拷贝的datanode继续。当读完列表的block后,且文件读取还没有结束,客户端开发库会继续向Namenode获取下一批的block列表。
当文件最后一个块也都读取完成后,datanode会连接namenode告知关闭文件。
HDFS写流程(存入文件)
1.使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文件创建一个记录(账本(edits)上记录一条),否则会让客户端抛出异常;
当客户端开始写入文件的时候,开发库Client会将文件切分成多个packets(数据包,每个包是64k)(注意:文件会被切成数据包而非数据块,数据块是HDFS所提供的储存这些数据包的空间),并在内部以数据队列"data queue"的形式管理这些packets,并向Namenode申请新的blocks。
开发库把packet以流的方式写入第一个datanode中的block,该block把该packet存储之后,再将其传递给的下一个datanode中的block,直到最后一个datanode中的block,这种写数据的方式呈流水线的形式。
最后一个datanode成功存储之后会返回一个信息,向上传递至客户端,在客户端的开发库成功收到信息后,会从数据队列中移除相应的packet。
6当所有的块都存放完后,通知NameNode关闭文件。
HDFS的删除流程
当NameNode执行delete方法时,它只标记操作涉及的需要被删除的数据块,而不会主动联系这些数据块所在的DataNode节点。
当保存着这些数据块的DataNode节点向NameNode节点发送心跳时,在心跳应答里,NameNode节点会向DataNode发出指令,从而把数据删除掉。
所以在执行完delete方法后的一段时间内,数据块才能被真正的删除掉。
安全模式
何时会进入安全模式,此时的特点:
在启动HDFS后,会立即进入安全模式,此时不能操作hdfs中的文件,只能查看目录文件名等,读写操作都不能进行。
为什么要进入安全模式?安全模式中所进行的任务:
namenode启动时,需要载入fsimage文件到内存,同时执行edits文件中各项操作
一旦在内存中成功建立文件系统元数据的映射,则创建一个新的fsimage文件(这个步骤不需要SNN的参与)和一个空的编辑文件。
在此阶段NameNode收集各个DataNode的报告,当每个数据块都备份了三份以上时(不足的会自动复制到三份以上),再经过若干时间,安全模式结束
补充:数据块的具体存放位置是由谁来维护的?
Namenode只是告诉哪个datanode中要放哪些数据块,而具体放在datanode中的哪个位置,由datanode决定(其内部维护了一个数据块列表)
MapReduce
概述
MapReduce是一个分布式计算框架(HDFS是一个分布式文件储存系统),解决了海量数据的计算问题。
MapReduce框架的节点组成结构
1、 ResourceManager工作职能:
A.知道管理哪些机器,即管理哪些NodeManager。
B.要有检测机制,能够检测到NodeManager的状态变换,通过RPC心跳来实现。
C.任务的分配和调度
2、 NodeManager工作职能:
A.能够收到ResourceManager发过来的任务,并进行任务的处理。这里处理任务指的是Map任务或Reduce任务。
Map、Reduce的执行步骤
要能说清整个流程
Map和reduce个数的确定:
要处理的文件被分成了几块储存在HDFS上,当使用MR进行文件处理时,就会调用几个map任务来处理这几块的数据;对map的输出进行洗牌(shuffle),设置分区个数(相同key肯定在同一个区,但是不同的key也可能在同一个区)后,分了几个区就会创建几个reduce来处理这些分区中的数据
Mapreduce执行溢写原理
1.Mapper
每个MapperTask有一个环形内存缓冲区,用于存储map任务的输出。默认大小100MB,一旦达到阀值0.8,一个后台线程把内容写到磁盘的指定目录下的新建的一个溢出写文件。
写磁盘前,要partition,sort,Combiner(这些就是在shuffle中要做的事情)。如果有后续的数据,将会继续写入环形缓冲区中,最终写入下一个溢出文件中。
等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
2.Reducer
Reducer通过Http方式得到输出文件的分区。
NodeManager为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘(一但Map任务完成,Reduce就开始复制map输出)
小文件处理
小文件的定义
小文件指的是:
那些size比HDFS 的block size(128M)小的多的文件。如果在HDFS中存储海量的小文件,会产生很多问题。
大量小文件在HDFS中的问题
问题:太占namenode内存空间(1000个1M比1个1000M多占1000倍的namenode空间)
因此HDFS并不是为了有效的处理大量小文件而存在的。它主要是为了流式的访问大文件而设计的。
大量小文件在mapreduce中的问题
Map tasks通常是每次处理一个block的input,如果有大量小文件,就会有很多block,就会产生大量的map tasks。Hadoop里每个task任务(map任务或reduce任务)的执行都会启动JVM来运行。启动一个新的JVM将耗时1秒左右,对于运行时间较长(比如1分钟以上)的job影响不大,但如果都是时间很短的task,那么频繁启停JVM会有开销。
解决方法:
1.可以在一个JVM中允许task reuse,以支持在一个JVM中运行多个map task,以此来减少一些JVM的启动消耗(通过设置mapred.job.reuse.jvm.num.tasks属性,默认为1,-1为无限制)。
2.将多个小文件合成一个spilt,即用一个map任务来处理。
flume
1.flume概述
1.1.flume概念(日志收集)
1.1.1.flume概念
flume是分布式的,可靠的,高可用的,用于对不同来源的大量的日志数据进行有效收集、聚集和移动,并以集中式的数据存储的系统。
运行原理:
将日志信息的每一行,包装成json格式,再传入flume系统中进行管理
补:json格式:
{“header”:{“name”:“liming”,“age”:“18”},body:{日志中每一行字符串}}
2.flume中的概念、模型和特点
2.1.flume中的一些重要概念
2.1.1.flume Event:
flume 事件,被定义为一个具有有效荷载的字节数据流和可选的字符串属性集。
2.1.2.flume Agent:
flume 代理,是一个进程承载从外部源事件流到下一个目的地的过程。包含source channel 和 sink。
2.1.3.Source
数据源,消耗外部传递给他的事件,外部源将数据按照flume Source 能识别的格式将Flume 事件发送给flume Source。
2.1.4.Channel
数据通道,是一个被动的存储,用来保持事件,直到由一个flume Sink消耗。
2.1.5.Sink
数据汇聚点,代表外部数据存放位置。发送flume event到指定的外部目标。
各种不同Source(提供了不同类型的数据源,只需要在配置文件中更改source的配置)
1.Avro source:(用的最多)
监听Avro客户端,将监听到的事件流作为数据源
3.Spooling Directory Source:
监听自动收集目录,如果有文件传入,flume就会以日志的形式解析,并收集这些文件,这些传入的文件就相当于数据源
4.Netcat Source:
监听指定端口,并将接受到的数据每一行转换为一个事件作为数据源
各种不同的Sink(是指收到的外部数据存放位置,只需要在配置文件中更改sink的配置):
2.File Roll Sink:
每隔指定时长,生成文件,保存这段时间收到的外部数据
3.Avro Sink:
作为多级流动、扇入、扇出的数据储存中介使用,是多级流动、扇入、扇出的基础
4.HDFS Sink:
将收到的外部数据储存到HDFS中
各种不同的channel(是指收到的外部数据在中间过渡时的存放位置,只需要在配置文件中更改channel的配置):
1.Memory channel:
数据被临时保存在内存中的指定大小的队列
3.File Channel:
数据被持久保存在硬盘中
4.Spillable Memory Channel:
数据被保存在内存队列和硬盘中(相当于1和3的结合,最常用)
Selector:
选择器,用于扇出的两种模式下,指定数据发送给哪个channel
Interceptors:
拦截器,在数据发送给channel前,对数据进行修改或删除
1.Timestamp Interceptors:
修改数据,在event头中加入当前处理时间(即:系统时间)
6.Search and Replace Interceptor:
先检查event的body中数据,再替换(捡查和替换都是基于字符串的正则表达式的)
Processor:
用于实现负载均衡和失败恢复的工具
1.Default Sink Processor:
这是默认情况下的使用,即:一个sink对应一个channel
2.Failover Sink Processor:
这是用于失败恢复的情况(如:3个sink对应一个channel,分别设置优先级,优先级高的优先使用,如果高的坏了,第二高的可以顶替他继续工作,注:同时只能有一台在工作,其余相当于备用的)
3.Load Balancing Sink Processor:
这是用于负载均衡的情况(有轮询或随机两种方式)
hive
分区表的含义:
hive支持分区表,当文件中数据量很大,且常常出现按某一字段查询(如:...where country=’...’,这就叫按country字段进行查询),需要对数据进行分区,因为这样可以极大的提高查询时的效率!!!(没分区时需要在所有数据中先提取符合该字段的数据,分区以后可以直接在这个country分区中查询了!)
六、HIVE语法(相当于数据库的sql)
9.补充:
增加分区:
ALTER TABLE book add PARTITION (category = 'zazhi') location '/user/hive/warehouse/datax.db/book/category=zazhi';
删除分区:
ALTER TABLE table_name DROP partition_spec, partition_spec,...
重命名表:
ALTER TABLE table_name RENAME TO new_table_name
修改列名:
ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] [FIRST|AFTER column_name]
增加/替换列:
ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)
查看表名,部分匹配
SHOW TABLES 'page.*';
SHOW TABLES '.*view';
查看某表的所有Partition,如果没有就报错:
SHOW PARTITIONS page_view;
查看某表结构:
DESCRIBE invites;
查看分区内容
SELECT a.foo FROM invites a WHERE a.ds='2008-08-15';
Hbase(了解)
一、HBASE概述
HBASE与HIVE的区别:
HBASE:是基于hadoop的数据库工具(把hadoop作为数据库)
HIVE:是基于hadoop的数据仓库工具(把hadoop作为数据仓库)
优点:
1.是一种 NoSQL 非关系型的数据库 不符合关系型数据库的范式 (关系型数据库的范式:可以将数据放在行列构成的很多表中,进行表的关联,从而阐明数据之间的关系)
2.适合存储:半结构化(如:json格式)与非结构化的数据(不能用行列表示的数据,如图片,视频,音频;语音识别技术、图像识别技术、机械学习技术实际上就是在处理非结构化数据)
3.适合存储稀疏的数据,hbase中空的数据不占用空间(mysql中空数据也占用空间)
为什么会出现空数据呢?
因为是非结构化的数据,因此势必有些格中没有数据
4.面向列(族)进行存储,方便对数据进行压缩 (mysql是以行进行存储)
5.提供实时增删改查的能力 是一种真正的数据库
6.可以存储海量数据,性能也很强大,可以实现上亿条记录的毫秒级别的查询
7.是一个高可靠性,高性能,面向列,可伸缩的分布式存储系统 利用hbase技术可以在廉价的PC上搭建起大规模结构化存储集群。
8.HBase利用HadoopHDFS作为其文件存储系统,利用Hadoop的MapReduce来处理HBase中的海量数据,利用Zookeeper作为协调工具
缺点:
不能提供严格的事务控制,只能在行级别保证事务
(2)逻辑结构
hbase通过表来存储数据 但是表的结构和关系型数据库非常的不一样
行键(RowKey ):
列族(Column Family ):
列(Column):
单元格与时间戳(cell timestamp )
kafka
一、Kafka概述
分布式消息队列
按topic分类开放数据
Producer Consumer
Broker
使用zookeeper做为集群的协调工具
kafka的特点:
高吞吐量
Kafka 每秒可以生产约50 MB消息,每秒处理 110 MB消息
持久化数据存储
将消息持久化到磁盘
分布式系统易于扩展
所有的 producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。
客户端consumer自己维护自己的状态(状态即:该读取什么数据了)
消息被处理的状态是在 consumer 端维护,而不是由 server 端维护。当失败时能自动平衡。
二、Kafka中的基本概念
Kafka将消息以topic为单位进行归纳。
将向Kafka topic发布消息的程序称为producers.
将预订topics并消费消息的程序称为consumer.
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
producers通过网络将消息发送到Kafka集群,集群向消费者提供消息。
客户端和服务端通过TCP协议通信。Kafka提供了Java客户端,并且对多种语言都提供了支持。
三、Topics、Producers、Consumers
1.Topics(主题)
1.一个topic是对一组消息的归纳。(topic的作用:可以让多个不同的团队使用同一个kafka,而不会发生数据混乱)
2.对每个topic,Kafka 对它的日志进行了分区(类似于图书馆的图书分类摆放,实现了负载均衡,并且每个分区在Kafka集群的若干服务器中都有副本,这样这些持有副本的服务可以共同处理数据和请求(类似于leader和flower的关系),副本数量是可以配置的。)
3.分区中的每个消息都有一个连续的序列号叫做offset,用来在分区中唯一的标识这个消息。
4.在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的,不论其是否被消费过,之后它将被丢弃以释放空间。
5.kafa的最大优点:
Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。(其性能由硬件决定)
6.将日志分区的目的:
首先这使得每个日志的数量不会太大,可以在单个服务上保存。另外每个分区可以单独发布和消费,为并发操作topic提供了一种可能。并且在kafka中通过分区实现了负载均衡和失败恢复
2.Producers
Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数选择分区。
3.Consumers
**实际上每个consumer唯一需要维护的数据是消息在日志中的位置,也就是offset.这个offset由consumer来维护
**消费消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。
(1)队列模式
队列模式中,多个consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到;
(2)发布订阅模式
发布-订阅模式中消息被广播到所有的consumer中。
如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。
如果所有的consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。
为什么大数据环境下的消息队列常选择kafka?
分布式存储数据,提供了更好的性能 可靠性 可扩展能力,且按照主题、分区来分布式存放数据,持久化存储,提供海量数据存储能力,性能和磁盘的性能相关和数据量的大小无关
storm
一、Storm作用:
分布式实时计算系统
二、Storm组件
1.结构
storm结构称为topology(拓扑),由stream(数据流),spout(喷嘴-数据流的生成者),bolt(阀门-数据流运算者)组成(参考图:Storm组成结构)。
不同于Hadoop中的job,Storm中的topology会一直运行下去,除非进程被杀死或取消部署。
2.Stream
Storm的核心数据结构是tuple(元组),本质上是包含了一个或多个键值对的列表。Stream是由无限制的tuple组成的序列。
3.spout
spout连接到数据源,将数据转化为一个个的tuple,并将tuple作为数据流进行发射,通常不会用于处理业务逻辑,从而可以很方便的实现spout的复用。开发一个spout的主要工作就是利用API编写代码从数据源消费数据流。
spout的数据源可以有很多种来源:
Kafka
Hbase
Mysql
Redis
hdfs
4.bolt(真正进行业务处理的部分)
bolt主要负责数据的运算,将接收到的数据实施运算后,选择性的输出一个或多个数据流。
一个bolt可以接收多个由spout或其他bolt发射的数据流,从而可以组建出复杂的数据转换和处理的网络拓扑结构。
bolt常见的典型功能:
过滤
连接和聚合
计算
数据库的读写
Hadoop常见参数控制+调优策略
配置所在文件 参数 参数默认值 作用
hdfs-site.xml dfs.heartbeat.interval 3 DN的心跳间隔,秒
在集群网络通信状态不好的时候,适当调大
hdfs-site.xml dfs.blocksize 134217728 块大小,默认是128MB
必须得是1024的整数倍
mapred-site.xml mapreduce.task.io.sort.mb 100 任务内部排序缓冲区大小,默认是100MB
此参数调大,能够减少Spil溢写次数,减少磁盘I/O
mapred-site.xml mapreduce.reduce.shuffle.parallelcopies 5 Reduce Task 启动的并发拷贝数据的线程数
建议,尽可能等于或接近于Map任务数量,
但是不易过多。
mapred-site.xml io.sort.factor 10 merge文件合并因子,如果结果文件数量太多,可以适当调大,从而减少I/O次数