==22[doc]Cascalog:基于Clojure的Hadoop查询语言

Introducing Cascalog: a Clojure-based query language for Hadoop
http://cascalog.org/articles/marz_intro_1.html


Cascalog:基于Clojure的Hadoop查询语言 | Ji ZHANG's Blog
http://shzhangji.com/blog/2013/05/01/introducing-cascalog-a-clojure-based-query-language-for-hado/

原文:http://nathanmarz.com/blog/introducing-cascalog-a-clojure-based-query-language-for-hado.html
我非常兴奋地告诉大家,Cascalog开源了!Cascalog受Datalog启发,是一种基于Clojure、运行于Hadoop平台上的查询语言。
特点
简单 - 使用相同的语法编写函数、过滤规则、聚合运算;数据联合(join)变得简单而自然。
表达能力强 - 强大的逻辑组合条件,你可以在查询语句中任意编写Clojure函数。
交互性 - 可以在Clojure REPL中执行查询语句。
可扩展 - Cascalog的查询语句是一组MapReduce脚本。
任意数据源 - HDFS、数据库、本地数据、以及任何能够使用Cascading的Tap
读取的数据。
正确处理空值 - 空值往往让事情变得棘手。Cascalog提供了内置的“非空变量”来自动过滤空值。
与Cascading结合 - 使用Cascalog定义的流程可以在Cascading中直接使用,反之亦然。
与Clojure结合 - 能够使用普通的Clojure函数来编写操作流程、过滤规则,又因为Cascalog是一种Clojure DSL,因此也能在其他Clojure代码中使用。

好,下面就让我们开始Cascalog的学习之旅!我会用一系列的示例来介绍Cascalog。这些示例会使用到项目本身提供的“试验场”数据集。我建议你立刻下载Cascalog,一边阅读本文一边在REPL中操作。(安装启动过程只有几分钟,README中有步骤)
基本查询
首先让我们启动REPL,并加载“试验场”数据集:
1
2

lein repl
user=> (use 'cascalog.playground) (bootstrap)

以上语句会加载本文用到的所有模块和数据。你可以阅读项目中的playground.clj
文件来查看这些数据。下面让我们执行第一个查询语句,找出年龄为25岁的人:
1

user=> (?<- (stdout) [?person] (age ?person 25))

这条查询语句可以这样阅读:找出所有age
等于25的?person
。执行过程中你可以看到Hadoop输出的日志信息,几秒钟后就能看到查询结果。
好,让我们尝试稍复杂的例子。我们来做一个范围查询,找出年龄小于30的人:
1

user=> (?<- (stdout) [?person] (age ?person ?age) (< ?age 30))

看起来也不复杂。这条语句中,我们将人的年龄绑定到了?age
变量中,并对该变量做出了“小于30”的限定。
我们重新执行这条语句,只是这次会将人的年龄也输出出来:
1
2

user=> (?<- (stdout) [?person ?age] (age ?person ?age)
(< ?age 30))

我们要做的仅仅是将?age
添加到向量中去。
让我们执行另一条查询,找出艾米丽关注的所有男性:
1
2

user=> (?<- (stdout) [?person] (follows "emily" ?person)
(gender ?person "m"))

可能你没有注意到,这条语句使用了联合查询。各个数据集中的?person
值都必须对应,而follows
和gender
分属于不同的数据集,Cascalog便会使用联合查询。
查询语句的结构
让我们分析一下查询语句的结构,以下面这条语句为例:
1
2

user=> (?<- [stdout] [?person ?a2] (age ?person ?age)
(< ?age 30) (* 2 ?age :> ?a2))

?<-
操作符出现的频率很高,它能同时定义并执行一条查询。?<-
实际上是对<-
和?-
的包装。我们之后会看到如何使用这些操作符编写更为复杂的查询语句。
首先,我们指定了查询结果的输出目的地,就是这里的(stdout)
。(stdout)
会创建一个Cascading的tap
组件,它会在查询结束后将结果打印到标准输出中。我们可以使用任意一种Cascading的tap
组件,也就是说输出结果的格式可以是序列文件(Sequence file)、文本文件等等;也可以输出到任何地方,如本地磁盘、HDFS、数据库等。
在定义了输出目的地后,我们使用Clojure的向量结构来定义输出结果所包含的内容。本例中,我们定义的是?person
和?a2

接下来,我们定义了一系列的约束条件。Cascalog有三种约束条件:
生成器(Generator):表示一个数据源,可以是以下两种类型:Cascading Tap:如HDFS上某个路径中的文件;
一个已经使用<-
定义的查询。

操作器(Operation):引入预定义的变量,将其绑定至新的变量,或是设定一个过滤条件。
集合器(Aggregator):计数、求和、最小值、最大值等等。

约束条件由名称、一组输入变量、以及一组输出变量构成。上述查询中的约束条件有:
(age ?person ?age)
(< ?age 30)
(* 2 ?age :> ?a2)

其中,:>
关键字用于将输入变量和输出变量隔开。如果没有这个关键字,那么该变量在操作器中就会被识别为输入变量,在生成器和集合器中会被认为是输出变量。
age
约束指向playground.clj
中定义的一个tap
,所以它是一个生成器,会输出?person
和?age
这两个数据。
<
约束是一个Clojure函数,因为没有指定输出变量,所以这条约束会构成一个过滤器,将?age
小于30的记录筛选出来。如果我们这样写:
1

(< ?age 30 :> ?young)

那么<
约束会将“年龄是否小于30”作为一个布尔值传递给?young
变量。
约束之间的顺序不重要,因为Cascalog是声明式语言。
变量替换为常量
变量是以?
或!
起始的标识。有时你不在意变量的值,可以直接用_
代替。其他的变量则会在解析时替换成常量。我们已经在很多示例中用到这一特性了。下面这个示例中,我们将输出变量作为一种过滤条件:
1

(* 4 ?v2 :> 100)

这里使用了两个常量:4和100。4是一个输入变量,100则是作为一个过滤条件,只有满足?v2
乘以4等于100的记录才会被筛选出来。字符串、数字、以及其他基本类型和对象类型,只要在Hadoop有对应的序列化操作,都可以被作为常量使用。
让我们回到示例中。找出所有关注了比自己年龄小的用户的列表:
1
2
3

user=> (?<- (stdout) [?person1 ?person2]
(age ?person1 ?age1) (follows ?person1 ?person2)
(age ?person2 ?age2) (< ?age2 ?age1))

同时,我们将年龄差异也输出出来:
1
2
3
4

user=> (?<- (stdout) [?person1 ?person2 ?delta]
(age ?person1 ?age1) (follows ?person1 ?person2)
(age ?person2 ?age2) (- ?age2 ?age1 :> ?delta)
(< ?delta 0))

聚合
下面让我们看看聚合查询的使用方法。统计所有年龄小于30的用户人数:
1
2

user=> (?<- (stdout) [?count] (age _ ?a) (< ?a 30)
(c/count ?count))

这条查询会统计所有的记录。我们也可以只聚合部分记录。比如,让我们找出每个人所关注的用户的数量:
1
2

user=> (?<- (stdout) [?person ?count] (follows ?person _)
(c/count ?count))

因为我们在输出结果中指定了?person
这个变量,所以Cascalog会将数据记录按照用户来分组,然后使用c/count
进行聚合运算。
你可以在单个查询中使用多个聚合条件,它们的分组方式是一致的。例如,我们可以计算每个国家的用户的平均年龄,使用计数和求和这两种聚合方式:
1
2
3
4

user=> (?<- (stdout) [?country ?avg]
(location ?person ?country _ _) (age ?person ?age)
(c/count ?count) (c/sum ?age :> ?sum)
(div ?sum ?count :> ?avg))

可以看到,我们对?sum
和?count
这两个聚合结果执行了div
操作,该操作会在聚合过程结束后进行。
自定义操作
下面我们来编写一个查询,统计几句话中每个单词的出现次数。首先,我们编写一个自定义操作:
1
2
3
4

user=> (defmapcatop split [sentence]
(seq (.split sentence "\s+")))
user=> (?<- (stdout) [?word ?count] (sentence ?s)
(split ?s :> ?word) (c/count ?count))

defmapcatop split
定义了一个方法,这个方法接收一个参数sentence
,并会输出0个或多个元组(tuple)。deffilterop
可以用来定义一个返回布尔型的方法,用来筛选记录;defmapop
定义的函数会返回一个元组;defaggregateop
定义一个聚合函数。这些函数都能在Cascalog工作流API中使用,我会在另一篇博客中叙述。
在上述查询中,如果单词字母大小写不一致,会被分别统计。我们用以下方法来修复这个问题:
1
2
3
4

user=> (defn lowercase [w] (.toLowerCase w))
user=> (?<- (stdout) [?word ?count]
(sentence ?s) (split ?s :> ?word1)
(lowercase ?word1 :> ?word) (c/count ?count))

可以看到,这里直接使用了纯Clojure编写的函数。当这个函数不包含输出变量时,会被作为过滤条件来执行;当包含一个返回值时,则会作为defmapop
来解析。而对于返回0个或多个元组的函数,则必须使用defmapcatop
来定义。
下面这个查询会按照性别和年龄范围来统计用户数量:
1
2
3
4
5

user=> (defn agebucket [age]
(find-first (partial <= age) [17 25 35 45 55 65 100 200]))
user=> (?<- (stdout) [?bucket ?gender ?count]
(age ?person ?age) (gender ?person ?gender)
(agebucket ?age :> ?bucket) (c/count ?count))

非空变量
Cascalog提供了“非空变量”这样的机制来帮助用户处理空值的情况。其实我们每个示例中都在使用这一特性。以?
开头的变量都是非空变量,而以!
开头的则是可空变量。Cascalog会在执行过程中将空值排除在外。
为了体验非空变量的效果,让我们对比下面这两条查询语句:
1
2

user=> (?<- (stdout) [?person ?city] (location ?person _ _ ?city)
user=> (?<- (stdout) [?person !city] (location ?person _ _ !city)

第二组查询结果中会包含空值。
子查询
最后,我们来看看更为复杂的查询,我们会用到子查询这一特性。让我们找出关注了两人以上的用户列表,并找出这些用户之间的关注关系:
1
2
3
4

user=> (let [many-follows (<- [?person] (follows ?person _)
(c/count ?c) (> ?c 2))]
(?<- (stdout) [?person1 ?person2] (many-follows ?person1)
(many-follows ?person2) (follows ?person1 ?person2)))

这里,我们使用let
来定义了一个子查询many-follows
。这个子查询是用<-
定义的。之后,我们便可以在后续查询中使用这个子查询了。
我们还可以在一个查询中指定多个输出目的地。比如我们想要同时得到many-follows
的查询结果:
1
2
3
4
5

user=> (let [many-follows (<- [?person] (follows ?person _)
(c/count ?c) (> ?c 2))
active-follows (<- [?p1 ?p2] (many-follows ?p1)
(many-follows ?p2) (follows ?p1 ?p2))]
(?- (stdout) many-follows (stdout) active-follows))

这里我们分别定义了两个查询,没有立刻执行它们,而是在后续的?-
中将两个查询分别绑定到了两个tap
上,并同时执行。
小结
Cascalog目前在还不断的改进中,未来会增加更多查询特性,以及对查询过程的优化。
我非常希望能够得到你对Cascalog的反馈,如果你有任何评论、问题、或是顾虑,请留言,或者在Twitter上联系我,给我发送邮件nathan.marz@gmail.com,或是在freenode的#cascading频道和我聊天。
下一篇博客会介绍Cascalog的外联合、排序、组合等特性。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容