Persistence
回到本章开头的例子,我们可以把“年度-气温”的中间数据集缓存在内存中:
scala> tuples.cache()
res1: tuples.type = MappedRDD[4] at map at <console>:18
调用cache()不会立刻把RDD缓存到内存中,只是对这个RDD做一个标记,当Spark job运行的时候,实际的缓存行为才会发生。因此我们首先强制运行一个job:
scala> tuples.reduceByKey((a, b) => Math.max(a, b)).foreach(println(_))
INFO BlockManagerInfo: Added rdd_4_0 in memory on 192.168.1.90:64640
INFO BlockManagerInfo: Added rdd_4_1 in memory on 192.168.1.90:64640
(1950,22)
(1949,111)
关于BlockManagerInfo的日志显示,作为job运行的一部分,RDD的分区会被保持在内存中。日志显示这个RDD的编号是4(在调用cache()方法之后的控制台输出中,也能看到这个信息),它包含两个分区,标签分别是0和1。如果在这个缓存的数据集上运行另一个job,我们会看到这个RDD将从内存中加载。这次我们计算最低气温:
scala> tuples.reduceByKey((a, b) => Math.min(a, b)).foreach(println(_))
INFO BlockManager: Found block rdd_4_0 locally
INFO BlockManager: Found block rdd_4_1 locally
(1949,78)
(1950,-11)
这是在微小数据集上的简单示例,但是对于更大的job,节省的时间将很可观。在MapReduce中,为了执行另一个计算,输入数据集必须再次从磁盘加载。即使中间数据可以作为输入(比如一个清洗后的数据集,无效行和不必要的字段都已移除),也不能改变“数据必须从磁盘加载”的事实,这是很慢的。Spark会把数据集缓存在一个跨集群的内存高速缓存中,这就意味着任何基于此数据集的计算都会执行的非常快。
在对数据进行交互式探索时,这种效率是极其有用的。这也自然适合某些类型的算法,比如迭代算法,一次迭代计算的结果可以缓存在内存中,成为下次迭代计算的输入。这种算法也可以用MapReduce实现,每次迭代都是一个单独的MapReduce job,因此每次迭代的结果必须写入磁盘,然后在下次迭代时再读回来。
缓存的RDD只能被同一个application中的job获取。要在不同的application之间共享数据集,第一个application必须使用某个saveAs*()方法(saveAsTextFile(),saveAsHadoopFile()等等)来写到外部存储中,然后第二个application使用SparkContext中的对应方法(textFile(),hadoopFile()等等)再次加载。同样的,当一个application终止时,它缓存的所有RDD都被销毁,除非显式的保存下来,否则不能再次访问。
Persistence levels
调用cache()会把RDD的每个分区持久化到执行器(executor)的内存中。如果执行器没有足够的内存来存储这个RDD分区,计算不会失败,相反该分区将会根据需要进行重算。对于带有很多trsansformation的复杂程序,这是很昂贵的。因此Spark提供了不同类型的持久化行为供用户选择,在调用persist()时指定StorageLevel参数即可。
默认的持久化级别是MEMORY_ONLY,这种方式使用对象的常规内存表示。要使用更紧凑的表现形式,可以把分区中的元素序列化为字节数组(byte array)。这种级别是MEMORY_ONLY_SER,相比MEMORY_ONLY,这种级别会导致CPU的压力,如果序列化之后的RDD分区能够适应内存,而常规的内存表示不适合,那么这种压力就是值得的。MEMORY_ONLY_SER还会减轻垃圾回收的压力,因为每个RDD都以字节数组的形式存储,而不是很多的对象。
在driver程序的日志文件中,检查BlockManager相关的信息,可以看到一个RDD分区是否不适合内存。另外,每个driver的SparkContext会在4040端口启动一个HTTP服务,提供关于运行环境以及正在运行的job的有用信息,包括缓存的RDD分区的信息。
默认情况下,使用常规的Java序列化框架来序列化RDD分区,不过Kryo序列化框架(下节讨论)通常是更好的选择,在大小和速度两方面都更优秀。如果把序列化后的分区进行压缩,可以节省更多的空间(再一次付出CPU的代价),设置spark.rdd.compress属性为true来启用压缩,属性spark.io.compression.codec是可选设置。
如果重算一个数据集非常昂贵,那么MEMORY_AND_DISK(如果数据集在内存中放不下,就写到磁盘上)或者MEMORY_AND_DISK_SER(如果序列化后的数据集在内存中放不下,就写到磁盘上)是合适的。
还有一些更高级的和实验中的持久化级别,用来在集群中的多个节点上复制分区,或者使用off-heap内存——更多细节,查看Spark文档。
Serialization
在Spark中需要考虑序列化的两个方面:序列化数据和序列化函数(或闭包)。
Data
首先来看数据的序列化。默认情况下,Spark使用Java序列化框架在执行器之间的网络上传输数据,或者以序列化的形式来缓存数据。对程序员来说,Java的序列化很好理解,只需确定你使用的类实现了java.io.Serializable接口或者java.io.Externalizable接口,但从性能和大小的角度来看,这种方式的效率不高。
对于大多数的Spark程序,更好的选择是Kryo序列化框架。Kryo是一个高效的通用的Java序列化库。要使用Kryo,在driver程序的SparkConf上设置spark.serializer属性如下:
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Kryo不要求你的类实现特定接口,因此简单的Java对象不需要任何改动即可在RDD中使用。话虽如此,如果在使用一个类之前把它注册到Kryo会更加高效。这是因为Kryo会创建一个引用,指向那个序列化对象的类(一个对象对应一个引用),如果类已注册,该引用是个整数ID,如果类没有注册,该引用是类的全名。这个引导仅仅适用于你自己的类,Scala类和许多其他的框架类(比如Avro Generic或者Thrift类)已经由Spark注册了。
向Kryo注册类也很简单。创建一个KryoRegistrator的子类,覆盖registerClasses()方法:
class CustomKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[WeatherRecord])
}
}
最后,在driver程序中,把属性spark.kryo.registrator设置为你的KryoRegistrator实现类的完整类名:
conf.set("spark.kryo.registrator", "CustomKryoRegistrator")
Functions
通常,函数的序列化将"刚好工作":在Scala中,函数都是可序列化的,使用标准Java序列化机制。这也是Spark向远程执行器节点发送函数时使用的方式。即使在本地模式下运行,Spark也会序列化函数。如果你在无意中引入了不可序列化的函数(比如,从一个非序列化类的方法转换过来的函数),你会在开发过程的早期阶段发现它。
Shared Variables
Spark程序经常需要访问一些数据,这些数据不是一个RDD的一部分。例如,下面的程序在一个map()操作中使用了一个查找表(lookup table):
val lookup = Map(1 -> "a", 2 -> "e", 3 -> "i", 4 -> "o", 5 -> "u")
val result = sc.parallelize(Array(2, 1, 3)).map(lookup(_))
assert(result.collect().toSet === Set("a", "e", "i"))
这段程序会正确工作(变量lookup被序列化为闭包的一部分,传递给map()),但是还有一个更高效的方式来达到同样的目的:使用广播变量。
Broadcast Variables
广播变量在序列化之后发送给每一个执行器,在那里缓存起来,因此后续的任务可以在需要时访问。这与普通的变量不同。普通的变量会序列化为闭包的一部分,然后在网络上传输,一个任务一次传输。广播变量的角色,与MapReduce中的分布式缓存相似,不过Spark内部的实现是把数据存储在内存中,仅当内存被耗尽时才写到磁盘。
广播变量的创建方法是,把需要广播的变量传递给SparkContext的broadcast()方法。T类型的变量被包装进Broadcast[T],然后返回:
val lookup: Broadcast[Map[Int, String]] =
sc.broadcast(Map(1 -> "a", 2 -> "e", 3 -> "i", 4 -> "o", 5 -> "u"))
val result = sc.parallelize(Array(2, 1, 3)).map(lookup.value(_))
assert(result.collect().toSet === Set("a", "e", "i"))
在RDD的map()操作中,调用这个广播变量的value来访问它。
顾名思义,广播变量是单向传送的,从driver到task——没有办法更新一个广播变量,然后回传给driver。为此,我们需要一个累加器。
Accumulators
累加器是一个共享变量,和MapReduce中的计数器一样,任务只能对其增加。在job完成以后,累加器的最终值可以在driver程序中获取。下面的例子中,使用累加器计算一个整数RDD中的元素数量,同时使用reduce()操作对RDD中的值求和:
val count: Accumulator[Int] = sc.accumulator(0)
val result = sc.parallelize(Array(1, 2, 3))
.map(i => { count += 1; i })
.reduce((x, y) => x + y)
assert(count.value === 3)
assert(result === 6)
第一行代码使用SparkContext的accumulator()方法,创建了一个累加器变量count。map()操作是一个恒等函数,副作用是增加count。当Spark job的结果计算出来之后,累加器的值通过调用value来访问。
在这个例子中,我们使用一个Int作为累加器,但任何的数值类型都是可以的。Spark还提供了两种方法,一是使用累加器的结果类型与“被增量”的类型不同(参见SparkContext的accumulable()方法),二是可以累加可变集合中的值(通过accumulableCollection())。