ps: Maxcompute数据仓库建设的分享。
Maxcompute
大致介绍下Maxcompute, Maxcompute是阿里的一个大数据工具,基于Maxcompute阿里搭建了一个Datawork的数据平台。可以很“方便”的从各种数据源导入数据,做数据分析、机器学习等。
“方便"之所以加个引号,是因为某些方面的确很方便,当然业务实在复杂了,很多时候也存在用的很难受的地方。
更多介绍去阿里官网了解吧。
回到正题,这里主要分享下批量删除分区的一个小技巧。
分区
介绍下分区的概念,Table是一个数据表,也是一个分区的数组。分区把Table的数据分成了一个个的区块。
Maxcompute是个不支持某条数据修改删除的数据仓库。而分区是可以删除和新增的。引入分区,就可以做到在小颗粒度上做到修改和删除的功能。
背景
首先描述下为何会有大量的分区需要删除的场景。
- 数据从datahub归档,比如按频率最高的归档方式 15分钟一次,每次一个分区,可以想象几个月后将会有多少分区。
- 数据从mysql分库增量读取,以每天一次增量,乘以分库的数量,也会产生大量的分区。
分区数量多了之后,文件将会变多。一个是Maxcompute对于表的分区是一个上限数量,另一个是分区数量多了之后计算将会很慢。
这些源数据将会经过清洗产生对应的中间表或者结果集,供BI或者数据分析使用。而源数据为了方便管理,可以通过sql聚合成一个大分区来存放。而原来的很多分区就可以删除,以便腾出空间。
方式
一般正常删除分区是通过sql来删除。
alter table 表A drop if EXISTS PARTITION(分区名='123')
这种方式只能一次删除一个分区,当分区有上万个的时候就不适用了。
这种情况可以通过pyodps来轻松的批量删除分区。
下面是一个删除datahub归档分区的实例
# 清除历史分区,防止分区数达到上限
from datetime import date, timedelta
# 删除十天前分区
end_date = date.today() - timedelta(days=10)
end = end_date.strftime('%Y%m%d')
def drop_pars(tableName):
t = o.get_table(tableName)
s = list(filter(lambda x: x.name < "ds='{}'".format(end), t.partitions))
for par in s:
par.drop()
drop_pars("dhub_trade")
drop_pars("dhub_ft_trade")
drop_pars("dhub_dn_trade")
drop_pars("dhub_wms_trade")
drop_pars("dhub_trade_send")
drop_pars("dhub_wms_trade_send")
可以在dataworks里面新建个pyodps的节点来每天运行,一劳永逸。
如果是放在python本地运行的话建议使用ipython。具体还需要配置下odps的环境。可以参考下官方文档。
后语
Dataworks使用中分库分表很多,怎么配置大量的同步任务。有经验的可以一起研究下。