squbs-17. 持久化缓冲区(PersistentBuffer)

PersistentBuffer 是一系列 Akka Streams 流组建的第一个。它像 Akka Streams缓冲区一样工作,不同的是,缓存的内容存储于一系列内存映射的文件中,由 PersistentBuffer构造提供。这可以让缓冲区大小实际无上限,不使用JVM堆来存储,在同时处理百万级/每秒时性能优异。

依赖

以下依赖在Persistent Buffer工作时需要:

"org.squbs" %% "squbs-pattern" % squbsVersion,
"net.openhft" % "chronicle-queue" % "4.5.13"

例子

以下的例子显示了 PersistentBuffer在流中的用法:

implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val buffer = new PersistentBuffer[ByteString](new File("/tmp/myqueue"))
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val countFuture = source.via(buffer.async).runWith(counter)

此版本在GraphDSL显示相同

implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val buffer = new PersistentBuffer[ByteString](new File("/tmp/myqueue"))
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(counter) { implicit builder =>
  sink =>
    import GraphDSL.Implicits._
    source ~> buffer.async ~> sink
    ClosedShape
})
val countFuture = streamGraph.run()

背压(Back-Pressure)

PersistentBuffer不背压上游流量。它将获取给它的所有的流元素,并且通过增长或旋转队列文件的数量来增长存储。它没有任何方法确定缓冲区的限制或存储大小。下游流量背压按照每个Akka Streams和Reactive Streams的要求进行。

如果PersistentBuffer stage被下游流量混淆, PersistentBuffer不会缓存并且它实际上会背压。为了保证PersistentBuffer确实运行在它自己的空间,在这之后加入一个async 边界。

失败 & 恢复

由于它的持久特性, PersistentBuffer可以从突然的流关闭,故障,JVM故障甚至潜在的系统故障中恢复。在同一个目录通过 PersistentBuffer重启流将启动发出存贮在缓存中的元素,在新的元素加入进来之前不会消费。在先前的流故障或关闭时缓存中正在消费的元素(并未消费完成)将会丢失。

因为缓存通过本地存储、心轴或SSD,因此这个缓存的性能和耐久性同样取决于存储的耐久性。所以,理解和推断缓存的耐久性非常重要,与那些数据库和其他热离线持久存储不是同一个级别,以换取更高的性能。

Akka Streams stage批处理请求并在内部缓存记录。 PersistentBuffer保证回复和记录的持久化到达onPush, Akka Stream stage内部缓存的记录未到达onPush 将会在故障中丢失。

提交保证(Commit Guarantee)

在一个不可预知的故障情况中,从 PersistentBuffer stage发出的元素却未抵达sink将会丢失。有些情况下,它可能需要避免此类数据丢失。在 sink之前使用commit stage对这类情况有帮助。加入 commit stage,使用 PersistentBufferAtLeastOnce 。请参考下面commit stage 用法的例子:

implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val tempPath = new File("/tmp/myqueue")
val config = ConfigFactory.parseMap {
    Map(
      "persist-dir" -> s"${tempPath.getAbsolutePath}"
    )
  }
val buffer = new PersistentBufferAtLeastOnce[ByteString](config)
val commit = buffer.commit[ByteString]
val flowSink = // do some transformation or a sink flow with expected failure
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(counter) { implicit builder =>
  sink =>
    import GraphDSL.Implicits._
    // ensures that records are reprocessed when something fails at tranform flow
    source ~> buffer ~> flowSink ~> commit ~> sink 
    ClosedShape
})
val countFuture = streamGraph.run()

请注意,commit 无法防止在 sink(或者其他commit之后的stage)内部缓存中的丢失。

提交订单(Commit Order)

commit stage应该正常按照顺序接收元素。然而,流中的一个潜在的bug可能引起一个元素丢弃或不按顺序抵达 commit stage。默认的commit-order-policy设置为 lenient,使流继续运行在这个场景中。你可以设置为 strict,以便抛出CommitOrderException 异常,并让Supervision.Decider确定要执行的操作。

空间管理

一个典型的持久队列目录查看如下:

$ ls -l
-rw-r--r--  1 squbs_user     110054053  83886080 May 17 20:00 20160518.cq4
-rw-r--r--  1 squbs_user     110054053      8192 May 17 20:00 tailer.idx

当所有的读者成功处理读取queue,队列文件自动删除。

配置

队列通过传递一个保存了所有默认配置的持久化目录的地址创建。所有的例子可以在上面看到。或者,它可以通过传递在构建时一个 Config对象创建。 Config 对象是一个标准的HOCON 配置。下面的例子展示了使用Config构建PersistentBuffer

val configText =
  """
    | persist-dir = /tmp/myQueue
    | roll-cycle = xlarge_daily
    | wire-type = compressed_binary
    | block-size = 80m
  """.stripMargin
val config = ConfigFactory.parseString(configText)

//使用Config构建缓存
val buffer = new PersistentBuffer[ByteString](config)

下面的配置属性用于 PersistentBuffer

persist-dir = /tmp/myQueue # Required
roll-cycle = daily         # Optional, defaults to daily
wire-type = binary         # Optional, defaults to binary
block-size = 80m           # Optional, defaults to 64m
index-spacing = 16k        # Optional, defaults to roll-cycle's spacing 
index-count = 16           # Optional, defaults to roll-cycle's count
commit-order-policy = lenient # Optional, default to lenient

Roll-cycle可以用大写或小写指定。roll-cycle支持以下值:

Roll Cycle 容量(Capacity)
MINUTELY 64 million entries per minute
HOURLY 256 million entries per hour
SMALL_DAILY 512 million entries per day
DAILY 4 billion entries per day
LARGE_DAILY 32 billion entries per day
XLARGE_DAILY 2 trillion entries per day
HUGE_DAILY 256 trillion entries per day

Wire-type可以通过大写或者小写指定。wire-type支持以下值:

  • TEXT
  • BINARY
  • FIELDLESS_BINARY
  • COMPRESSED_BINARY
  • JSON
  • RAW
  • CSV

内存大小诸如 block-sizeindex-spacing 依据memory size format defined in the HOCON specification指定。

序列化(Serialization)

QueueSerializer[T] 需要被隐式的提供给PersistentBuffer[T],如上面的例子所示:

implicit val serializer = QueueSerializer[ByteString]()

QueueSerializer[T]() 为你的目标类型调用生产一个序列化器(Serializer)。它基于基础设施的序列化和反序列化。

实现Serializer

控制队列中细粒度的持久化格式,你可能需要实现你自己的序列化器(serializer)如下:

case class Person(name: String, age: Int)

class PersonSerializer extends QueueSerializer[Person] {

  override def readElement(wire: WireIn): Option[Person] = {
    for {
      name <- Option(wire.read().`object`(classOf[String]))
      age <- Option(wire.read().int32)
    } yield { Person(name, age) }
  }

  override def writeElement(element: Person, wire: WireOut): Unit = {
    wire.write().`object`(classOf[String], element.name)
    wire.write().int32(element.age)
  }
}

使用这个序列化器(serializer),只需要在构建PersistentBuffer之前声明它为隐式的,如下:

implicit val serializer = new PersonSerializer()
val buffer = new PersistentBuffer[Person](new File("/tmp/myqueue")

广播缓存(Broadcast Buffer)

BroadcastBuffer是持久化缓存的一个变种。这个工作与PersistentBuffer相似,流元素广播至多个输出端口。因此它是缓存和广播stage的组合。这个配置采用一个名为output-ports的附加参数,用于指定输出端口的数量。

当流元素从每个输出端口发出(以独立的速度,取决于下游的速度要求)时,特别需要广播缓存。

val configText =
  """
    | persist-dir = /tmp/myQueue
    | roll-cycle = xlarge_daily
    | wire-type = compressed_binary
    | block-size = 80m
    | output-ports = 3
  """.stripMargin
val config = ConfigFactory.parseString(configText)

// Construct the buffer using a Config.
val bcBuffer = new BroadcastBuffer[ByteString](config)

例子

implicit val serializer = QueueSerializer[ByteString]()

val in = Source(1 to 100000)
val flowCounter = Flow[Any].map(_ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)

val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(flowCounter) { implicit builder =>
      sink =>
        import GraphDSL.Implicits._
        val buffer = new BroadcastBufferAtLeastOnce[ByteString](config)
        val commit = buffer.commit[ByteString]
        val bcBuffer = builder.add(buffer.async)
        val mr = builder.add(merge)
        in ~> transform ~> bcBuffer ~> commit ~> mr ~> sink
                           bcBuffer ~> commit ~> mr
                           bcBuffer ~> commit ~> mr
        ClosedShape
    })
    
val countFuture = streamGraph.run()

积分(Credits)

PersistentBuffer 利用Chronicle-Queue 4.x作为高性能内存映射队列持久化。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,636评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,890评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,680评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,766评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,665评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,045评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,515评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,182评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,334评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,274评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,319评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,002评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,599评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,675评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,917评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,309评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,885评论 2 341

推荐阅读更多精彩内容