原文:USING TRIGGERS ON SCHEMALESS, UBER ENGINEERING’S DATASTORE USING MYSQL
译者:杰微刊兼职翻译汪健
本文主要介绍Schemaless triggers的细节及相关案例,从2014年10月开始,通过这种方式提供给Uber在数据存储方面伸缩的能力。这是本系列文章的第三部分,第一部分是关于Schemaless总体的设计,第二部分是关于其架构的讨论。
Schemaless triggers是一种具有可伸缩性、容错性和无损性的技术,它可以监听Schemaless实例的变化。它是隐藏在行程背后的流程管理引擎,从司机的领航员按下“结束行程”并支付了行程费用开始,所有相关的数据都进去我们的数据仓库等待我们分析。在Schemaless系列的最后一篇中,我们将深入探讨Schemaless triggers的相关功能,以及我们如何让这个系统具备可伸缩性和容错性。
总的来说,在Schemaless中最基本的单元数据被称为单元(cell),它是不可变的,一旦写入就不能再重写(当然在特殊情况下,我们可能会删除旧记录)。一个单元可能被行键(row key)、列名称(column name)及引用键(ref key)所引用,单元的内容通过写入一个更高版本的引用键来进行更新,其中行键和列名称保持不变。Schemaless不对存储在其内部的数据执行任何的类数据库的schema操作,所以这也正是为什么叫Schemaless的原因,从Schemaless的观点来看,它仅仅存储JSON对象。
Schemaless Triggers案例
让我们看一下实际中Schemaless trigger是如何工作的,下面的代码展示了我们是如何异步计费的一个简化版本,大写表示Schemaless 列名称。案例中使用的是python语言:
我们通过添加一个注解符@trigger定义一个trigger,这个注解可以加在一个函数上,也可以添加在Schemaless 指定的列上。这样的做法让Schemaless triggers去调用指定的函数,在本例中是指bill_rider,当一个单元被写入一个指定的列时将被触发调用。这里的BASE是一个列,当一个新的单元写入到BASE时表明行程已经结束。然后就会触发trigger,这时行键(这里是行程的UUID)就会被传递到函数中,如果需要更多的数据,程序员必须从Schemaless 实例中获取其他的实时数据,本例是从行程存储系统Mezzanine中获取数据。
下面的图片展示了bill_rider的trigger相关的信息流(乘客结账部分),箭头指向表明了调用方和被调用方,旁边的数字表示流程的顺序:
首先行程进状态写入到Mezzanine,这会使 Schemaless Trigger框架调用bill_rider,在调用时,函数要求行程存储获取STATUS列的最新版本信息,在本例中is_completed字段不存在,也就是意味着乘客还未结账,接着再BASE列的行程信息将被获取并通过函数调用信用卡provider进行结账。在本例中,我们成功地使用信用卡进行付款,所以我们将成功状态写回到Mezzanine中,然后将STATUS列中的is_completed字段设置为true。
Trigger框架能保证bill_rider被每个Schemaless 实例的每个单元至少调用一次。一般而言trigger函数只会被触发一次,但在某些出错的情况下可能会被调用若干次,这个错误可能是trigger函数本身的错误也可能是trigger函数外部的错误。这也就意味着trigger函数需要被设计成具备幂等性,在本例中,幂等性可以通过检查单元是否已经被处理完毕来实现,函数检测到如果已经处理完毕了则可以直接返回。
当你在查看Schemaless 如何支持类似本案例的流程时,请记住这个案例。我们将会解释Schemaless 如何被当做变更日志来使用的,并且讨论Schemaless 相关的一些API,最后还会分享我们是通过什么技术让流程变得可伸缩和具备容错能力。
把Schemaless 当做日志
Schemaless 包含所有单元,这也就意味着它包含了指定的行键、列键对的所有版本。也真是因为它拥有单元所有的历史版本,Schemaless 除了可以作为随机访问的key-value存储外,它还可以作为变更日志。事实上,它是一个分区日志,每个切片都是自己的日志,如下面图所示:
每个单元都通过指定的行键(这里是指UUID)进行切片映射后写入特定的切片,在每个切片中,所有单元都会被赋予一个唯一的标识符,这个标识符叫已添加ID。已添加ID是一个自动递增的字段,它代表单元插入的顺序,越是新的单元就会有一个越新的已添加ID。除了刚刚提到的已添加ID外,每个单元还会有单元写入时间字段。在所有分片副本中已添加ID具备唯一性,这个特性对于提供failover能力是非常重要的。
Schemaless 的API既支持随机访问也支持日志类型访问,随机访问API是相对于单元而言的,它由row_key、column_key和ref_key三者共同标识。
put_cell (row_key, column_key, ref_key, cell):
// 通过给定的row key、column key和ref key插入一个单元
get_cell(row_key, column_key, ref_key):
//通过指定的row key、column key和ref key获取指定单元
get_cell_latest(row_key, column_key):
// 通过指定的row key和column key获取具有最高版本号ref key的单元
Schemaless 还包含这些API终端的批处理版本,这里我们省略它。早前说过的trigger函数bill_rider就是使用这些函数去获取和操作一个单元的。
对于日志类型访问API,我们主要关心单元的切片数字、时间戳和已添加ID,这三者合起来称定位的位置。
get_cells_for_shard(shard_no, location, limit):
// 从“shard_no”切片返回在“location”后的不多于“limit”个单元
与随机访问API类似,日志访问API拥有更多方法让我们一次性从不同的切片中去批量获取多个单元。其中的location可以使时间戳timestamp或已添加ID added_id。调用get_cells_for_shard除了返回指定的单元外还会返回下一个已添加ID。例如,如果你通过指定location为1000去调用get_cells_for_shards请求了10个单元,那么返回回来的下一个location的位置偏移量就是1010。
追踪日志
通过日志类型访问API你可以追踪Schemaless 实例,这个看起来就像在你的操作系统中通过tail -f命令去追踪一个文件,或者像kafka这样最新的变更会被轮询的事件通知队列。客户端然后通过维护保持跟踪位置偏移量进而使用它们去轮询。想要开启一个跟踪你要指定起始入口,例如location为0,或者任意的时间戳,或者某位置偏移量。
Schemaless triggers通过使用日志类型访问API实现了相同机制的跟踪,它保持跟踪位置偏移量,通过轮询该API方式的最直接的好处就是Schemaless triggers使处理过程具有容错性和可扩展性。通过配置Schemaless实例及配置哪些列去轮询数据,然后就可以通过客户端程序去连接Schemaless triggers框架。所有的函数和回调函数都被绑定到框架的数字流上,在适当的时刻将被Schemaless triggers调用,或者说被触发,而这个适当的时刻就是当一个新的单元被插入到Schemaless实例时。作为回报,框架会将增加运行在主机上的程序需要的工作进程编号。框架优雅地通过可用进程和不可用进程进行分工,将失败的进程上面的工作传播到健康进程去处理。这种分工模式意味着程序员只需编写好处理者即可,例如trigger函数,只要保证这个函数是幂等性的就可以了,剩下的就交给Schemaless triggers来处理。
架构
在这部分中,我们将讨论Schemaless triggers是如何做到可扩展,如何做到让故障影响最小化。下面的图从一个高层次的角度展示了其架构,拿了前面结账服务的例子:
结算服务使用了运行在三个不同主机上的Schemaless triggers,为简单起见,我们假设每个主机上有一个工作进程,Schemaless triggers框架将切片从多个工作进程中分开,所以每个工作进程负责一个特定的切片。注意到,工作进程1从切片1拉取数据,而工作进程2则则负责切片2和切片5,最后工作进程3负责切片3和切片4。一个工作进程只处理指定切片的单元,通过获取新的单元去调用这些切片上注册上来的回调函数。其中一个工作进程被指定为leader,它负责分配切片给各个工作进程。如果有一个工作进程挂了,leader将失败进程的切片重新分配给其他工作进程。
在一个切片中,单元都是按照写入的顺序被进行触发的,这也就意味着如果某个触发单元总是因为程序错误而总是失败,那它就会在相应的切片上阻碍单元处理。为了避免这种延迟,你可以配置Schemaless triggers标记多次失败的单元,并将他们放到单独的队列中。这样做以后Schemaless triggers就会跳过出错的单元接着处理下一个单元。如果被标记的单元超过了某一阀值,trigger就会停止,这通常表明是系统错误,需要人工进行修复。
Schemaless triggers通过保存每个切片最新成功被触发的单元的已添加ID去跟踪整个触发过程,框架将这些位置偏移保存在一个共享存储中,例如zookeeper或者Schemaless 实例本身。这也就意味着如果程序被重启了,trigger将会从公共存储中读取获取位置偏移量后继续运行,公共存储同样用于保存一些meta信息,例如协调leader选举的工作,工作进程的发现及移除。
可扩展性和容错性
Schemaless triggers在刚开始设计时就充分考虑其可扩展性,对于任意客户端程序,我们可以在被追踪的Schemaless 实例中添加最多与切片数量相等的工作进程,通常这个数量为4096。除此之外,我们可以在线添加或移除工作进程来处理Schemaless 实例中其他trigger客户端变化的负载。仅仅通过跟踪框架里面的进度,我们就可以给发送数据的Schemaless 实例添加尽可能多的客户端。在服务器端没有跟踪客户端并推送状态给他们的逻辑。
Schemaless triggers同样也具备容错性的,任何一个进程发生故障都不会影响到系统的运行。
1、如果一个客户端进程发生错误了,leader会将失败进程相关的工作重新分配给其他监控进程,确保所有切片都会分配到处理进程。
2、如果Schemaless triggers节点上的leader发生故障了,一个新的节点将会被选举作为leader,在leader选举的过程中,单元仍然会被执行,但是工作不能被重新分配,而且不能添加或移除工作进程。
3、如果公共存储(例如zookeeper)发生故障了,单元仍然会被处理,但是这种情况也像leader选举期间,不能重新分配工作,工作进程也不能添加或移除。
4、最后,Schemaless triggers框架与Schemaless 实例里面的故障是互相隔离的,任意数据库节点宕了都没问题,因为Schemaless triggers可以从他们的备份节点上读取。
总结
从运维的角度来看,Schemaless triggers是一个非常好的伙伴。Schemaless 是一个实时数据源理想的存储,因为这里的数据可以通过随机访问API或者通过日志类型访问API去访问。另外使用Schemaless triggers的日志类型访问API可以将数据从生产者和消费者中解耦出来,让开发者只要关注逻辑处理而不必关心如何保证其扩展性和容错性。最后,我们可以在运行时添加更多的存储服务器去提升我们的性能和内存。如今,Schemaless triggers框架是整个行程处理流中的核心驱动,包括将数据收进我们的分析数据仓库和跨数据中心的复制。我们对2016及以后未来的前景充满期待。
更多内容: