1 Azkaban安装
1.1 Azkaban下载地址
下载地址:http://azkaban.github.io/downloads.html
1.2 Azkaban安装部署
1.2.1 安装前准备
1) 将Azkaban Web服务器、Azkaban执行服务器、Azkaban的sql执行脚本及MySQL安装包拷贝到
hadoop102虚拟机/opt/software目录下
a) azkaban-web-server-2.5.0.tar.gz
b) azkaban-executor-server-2.5.0.tar.gz
c) azkaban-sql-script-2.5.0.tar.gz
d) mysql-libs.zip
2) 选择Mysql作为Azkaban数据库,因为Azkaban建立了一些Mysql连接增强功能,以方便Azkaban设
置,并增强服务可靠性。(参见**hive文档2.4)**
2 安装Azkaban
1) 在/opt/module/目录下创建azkaban目录
[root@hadoop102 module]$ mkdir azkaban
2) 解压azkaban-web-server-2.5.0.tar.gz、azkaban-executor-server-2.5.0.tar.gz、azkaban-sql�
script-2.5.0.tar.gz到/opt/module/azkaban目录下
[root@hadoop102 software]$ tar -zxvf azkaban-web-server-2.5.0.tar.gz -C /opt/module/azkaban/
[root@hadoop102 software]$ tar -zxvf azkaban-executor-server-2.5.0.tar.gz -C
/opt/module/azkaban/
[root@hadoop102 software]$ tar -zxvf azkaban-sql-script-2.5.0.tar.gz -C /opt/module/azkaban/
3) 对解压后的文件重新命名
[root@hadoop102 azkaban]$ mv azkaban-web-2.5.0/ server
[root@hadoop102 azkaban]$ mv azkaban-executor-2.5.0/ executor
4) azkaban脚本导入
进入mysql,创建azkaban数据库,并将解压的脚本导入到azkaban数据库。
[root@hadoop102 azkaban]$ mysql -uroot -p000000
mysql> create database azkaban;
mysql> use azkaban;
mysql> source /opt/module/azkaban/azkaban-2.5.0/create-all-sql-2.5.0.sql注:source后跟.sql文件,用于批量处理.sql文件中的sql语句。
2.3 生成密钥库
Keytool是java数据证书的管理工具,使用户能够管理自己的公/私钥对及相关证书。
-keystore 指定密钥库的名称及位置(产生的各类信息将不在.keystore文件中)
-genkey 在用户主目录中创建一个默认文件".keystore"
-alias 对我们生成的.keystore 进行指认别名;如果没有默认是mykey
-keyalg 指定密钥的算法 RSA/DSA 默认是DSA
1)生成 keystore的密码及相应信息的密钥库
[root@hadoop102 azkaban]$ keytool -keystore keystore -alias jetty -genkey -keyalg RSA
输入密钥库口令:
再次输入新口令:
您的名字与姓氏是什么?
[Unknown]:
您的组织单位名称是什么?
[Unknown]:
您的组织名称是什么?
[Unknown]:
您所在的城市或区域名称是什么?
[Unknown]:
您所在的省/市/自治区名称是什么?
[Unknown]:
该单位的双字母国家/地区代码是什么?
[Unknown]:
CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown是否正确?
输入 的密钥口令
(如果和密钥库口令相同, 按回车):
再次输入新口令:
注意:
密钥库的密码至少必须6个字符,可以是纯数字或者字母或者数字和字母的组合等等
密钥库的密码最好和 的密钥相同,方便记忆
2)将keystore 拷贝到 azkaban web服务器根目录中
[root@hadoop102 azkaban]$ mv keystore /opt/module/azkaban/server/
2.4 时间同步配置先配置好服务器节点上的时区
1) 如果在/usr/share/zoneinfo/这个目录下不存在时区配置文件Asia/Shanghai,就要用 tzselect 生
成。
[root@hadoop102 azkaban]$ tzselect
Please identify a location so that time zone rules can be set correctly.
Please select a continent or ocean.
1) Africa
2) Americas
3) Antarctica
4) Arctic Ocean
5) Asia
6) Atlantic Ocean
7) Australia
8) Europe
9) Indian Ocean
10) Pacifific Ocean
11) none - I want to specify the time zone using the Posix TZ format.
#? 5
Please select a country.
1) Afghanistan 18) Israel 35) Palestine
2) Armenia 19) Japan 36) Philippines
3) Azerbaijan 20) Jordan 37) Qatar
4) Bahrain 21) Kazakhstan 38) Russia
5) Bangladesh 22) Korea (North) 39) Saudi Arabia
6) Bhutan 23) Korea (South) 40) Singapore
7) Brunei 24) Kuwait 41) Sri Lanka
8) Cambodia 25) Kyrgyzstan 42) Syria
9) China 26) Laos 43) Taiwan
10) Cyprus 27) Lebanon 44) Tajikistan
11) East Timor 28) Macau 45) Thailand
12) Georgia 29) Malaysia 46) Turkmenistan
13) Hong Kong 30) Mongolia 47) United Arab Emirates
14) India 31) Myanmar (Burma) 48) Uzbekistan
15) Indonesia 32) Nepal 49) Vietnam
16) Iran 33) Oman 50) Yemen17) Iraq 34) Pakistan
#? 9
Please select one of the following time zone regions.
1) Beijing Time
2) Xinjiang Time
#? 1
The following information has been given:
China
Beijing Time
Therefore TZ='Asia/Shanghai' will be used.
Local time is now: Thu Oct 18 16:24:23 CST 2018.
Universal Time is now: Thu Oct 18 08:24:23 UTC 2018.
Is the above information OK?
1) Yes
2) No
#? 1
You can make this change permanent for yourself by appending the line
TZ='Asia/Shanghai'; export TZ
to the fifile '.profifile' in your home directory; then log out and log in again.
Here is that TZ value again, this time on standard output so that you
can use the /usr/bin/tzselect command in shell scripts:
Asia/Shanghai
2)拷贝该时区文件,覆盖系统本地时区配置
[root@hadoop102 azkaban]$ cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
3)集群时间同步(同时发给三个窗口)
[root@hadoop102 azkaban]$ sudo date -s '2018-10-18 16:39:30'
2 配置文件
/opt/module/cdh/azkaban/executor/conf
如下是web服务器配置1)进入azkaban web服务器安装目录 conf目录,打开azkaban.properties文件
[root@hadoop102 conf]$ pwd
/opt/module/azkaban/server/conf
[root@hadoop102 conf]$ vim azkaban.properties
2)按照如下配置修改azkaban.properties文件。
#Azkaban Personalization Settings
#服务器UI名称,用于服务器上方显示的名字
azkaban.name=Test
#描述
azkaban.label=My Local Azkaban
#UI颜色
azkaban.color=#FF3601
azkaban.default.servlet.path=/index
#默认web server存放web文件的目录
web.resource.dir=/opt/module/azkaban/server/web/
#默认时区,已改为亚洲/上海 默认为美国
default.timezone.id=Asia/Shanghai
#Azkaban UserManager class
user.manager.class=azkaban.user.XmlUserManager
#用户权限管理默认类(绝对路径)
user.manager.xml.fifile=/opt/module/azkaban/server/conf/azkaban-users.xml
#Loader for projects
#global配置文件所在位置(绝对路径)
executor.global.properties=/opt/module/azkaban/executor/conf/global.properties
azkaban.project.dir=projects
#数据库类型
database.type=mysql
#端口号
mysql.port=3306
#数据库连接IP
mysql.host=hadoop102#数据库实例名
mysql.database=Azkaban
#数据库用户名
mysql.user=root
#数据库密码
mysql.password=000000
#最大连接数
mysql.numconnections=100
# Velocity dev mode
velocity.dev.mode=false
# Azkaban Jetty server properties.
# Jetty服务器属性.
#最大线程数
jetty.maxThreads=25
#Jetty SSL端口
jetty.ssl.port=8443
#Jetty端口
jetty.port=8081
#SSL文件名(绝对路径)
jetty.keystore=/opt/module/azkaban/server/keystore
#SSL文件密码
jetty.password=000000
#Jetty主密码与keystore文件相同
jetty.keypassword=000000
#SSL文件名(绝对路径)
jetty.truststore=/opt/module/azkaban/server/keystore
#SSL文件密码
jetty.trustpassword=000000
# Azkaban Executor settings
executor.port=12321# mail settings
mail.sender=
mail.host=
job.failure.email=
job.success.email=
lockdown.create.projects=false
cache.directory=cache
3)web服务器用户配置
在azkaban web服务器安装目录 conf目录,按照如下配置修改azkaban-users.xml 文件,增加管理员
用户。
[root@hadoop102 conf]$ vim azkaban-users.xml
2 执行服务器配置
1)进入执行服务器安装目录conf,打开azkaban.properties
[root@hadoop102 conf]$ pwd
/opt/module/azkaban/executor/conf
[root@hadoop102 conf]$ vim azkaban.properties
2) 按照如下配置修改azkaban.properties文件。
#Azkaban
#时区
default.timezone.id=Asia/Shanghai
# Azkaban JobTypes Plugins
#jobtype 插件所在位置
azkaban.jobtype.plugin.dir=plugins/jobtypes
#Loader for projects
executor.global.properties=/opt/module/azkaban/executor/conf/global.properties
azkaban.project.dir=projects
# 数据库也要按要求配置
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkabanmysql.user=root
mysql.password=000000
mysql.numconnections=100
# Azkaban Executor settings
#最大线程数
executor.maxThreads=50
#端口号(如修改,请与web服务中一致)
executor.port=12321
#线程数
executor.flflow.threads=30
启动服务器
在executor服务器目录下执行启动命令
[root@hadoop102 executor]$ pwd
/opt/module/azkaban/executor
[root@hadoop102 executor]$ bin/azkaban-executor-start.sh
启动web服务器
在azkaban web服务器目录下执行启动命令
[root@hadoop102 server]$ pwd
/opt/module/azkaban/server
[root@hadoop102 server]$ bin/azkaban-web-start.sh
注意:
先执行executor,再执行web,避免Web Server会因为找不到执行器启动失败。
jps查看进程
[root@hadoop102 server]$ jps
3601 AzkabanExecutorServer
5880 Jps
3661 AzkabanWebServer
启动完成后,在浏览器(建议使用谷歌浏览器)中输入https://**服务器IP地址:8443**,即可访问
azkaban服务了。
在登录中输入刚才在azkaban-users.xml文件中新添加的户用名及密码,点击 login。
2 案例
1)创建有依赖关系的多个job描述
第一个job:start.job
[root@hadoop102 jobs]$ vim start.job
#start.job
type=command
command=echo "this is start job"
第二个job:step1.job依赖start.job
[root@hadoop102 jobs]$ vim step1.job
第三个job:step2.job依赖start.job
[root@hadoop102 jobs]$ vim step2.job
#step2.job
type=command
dependencies=start
command=echo "this is step2 job"
第四个job:fifinish.job依赖step1.job和step2.job
[root@hadoop102 jobs]$ vim fifinish.job
#fifinish.jobtype=command
dependencies=step1,step2
command=echo "this is fifinish job"
2)将所有job资源文件打到一个zip包中
[root@hadoop102 jobs]$ zip jobs.zip start.job step1.job step2.job fifinish.job
updating: start.job (deflflated 16%)
adding: step1.job (deflflated 12%)
adding: step2.job (deflflated 12%)
adding: fifinish.job (deflflated 14%)
3)在azkaban的web管理界面创建工程并上传zip包
5)启动工作流flflow
6)查看结果
7) 单独跑一个任务,可以做任务的依赖来跑)
8)查看所有的执行记录
3 azkaban 其他任务类型的job
1、概述
原生的 Azkaban 支持的plugin类型有以下这些:
1. command:Linux shell命令行任务
2. gobblin:通用数据采集工具
3. hadoopJava:运行hadoopMR任务
4. java:原生java任务
5. hive:支持执行hiveSQL
6. pig:pig脚本任务
7. spark:spark任务
8. hdfsToTeradata:把数据从hdfs导入Teradata
9. teradataToHdfs:把数据从Teradata导入hdfs
其中最简单而且最常用的是command类型,我们在上一篇文章中已经描述了如何编写一个command
的job任务。如果使用command类型,效果其实跟在本地执行Linux shell命令一样,这样的话,还不如
把shell放到crontable 中运行。所以我们把重点放到Azkaban支持的比较常用的四种类型:java、
hadoopJava、hive、spark
2、java类型
1、代码编写:MyJavaJob.javapackage com.dataeye.java;
public class MyJavaJob {
public static void main(String[] args) {
System.out.println("#################################");
System.out.println("#### MyJavaJob class exec... ###");
System.out.println("#################################");
}
}
2、打包成jar文件:使用maven或者eclipse导出为jar文件
3、编写job文件:java.job
type=javaprocess
classpath=./lib/*,${azkaban.home}/lib/*
java.class=com.dataeye.java.MyJavaJob
4、组成一个完整的运行包
新建一个目录,在该目录下创建一个lib文件夹,把第二步打包的jar文件放到这里,把job文件放到和lib
文件夹同一级的目录下,如下所示:
完整的运行包
5、打包成zip文件
把lib目录和job文件打包成zip文件,如下的java.zip:
zip文件
6、提交运行,过程跟之前文章介绍的步骤一样,不再详述,执行结果如下:执行结果
从输出日志可以看出,代码已经正常执行。
以上是java类型的任务编写和执行的过程。接下来介绍其他任务编写的时候,只会介绍代码的编写和
job的编写,其他过程与上面的一致。
3、hadoopJava类型
1、数据准备
以下内容是运行wordcount任务时需要的输入文件input.txt:
1 Ross male 33 3674
2 Julie male 42 2019
3 Gloria female 45 3567
4 Carol female 36 2813
5 Malcolm male 42 2856
6 Joan female 22 2235
7 Niki female 27 3682
8 Betty female 20 3001
9 Linda male 21 2511
10 Whitney male 35 3075
11 Lily male 27 3645
12 Fred female 39 2202
13 Gary male 28 3925
14 William female 38 2056
15 Charles male 48 2981
16 Michael male 25 2606
17 Karl female 32 2260
18 Barbara male 39 2743
19 Elizabeth female 26 2726
20 Helen female 47 2457
21 Katharine male 45 3638
22 Lee female 43 305023 Ann male 35 2874
24 Diana male 37 3929
25 Fiona female 45 2955
26 Bob female 21 3382
27 John male 48 3677
28 Thomas female 22 2784
29 Dean male 38 2266
30 Paul female 31 2679
把input.txt文件拷贝到hdfs的 /data/yann/input 目录下
2、代码准备:
package azkaban.jobtype.examples.java;
import azkaban.jobtype.javautils.AbstractHadoopJob;
import azkaban.utils.Props;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.log4j.Logger;
public class WordCount extends AbstractHadoopJob
{
private static final Logger logger = Logger.getLogger(WordCount.class);
private final String inputPath;
private final String outputPath;
private boolean forceOutputOverrite;
public WordCount(String name, Props props)
{
super(name, props);
this.inputPath = props.getString("input.path");
this.outputPath = props.getString("output.path");
this.forceOutputOverrite = props.getBoolean("force.output.overwrite",
false);
}
public void run()
throws Exception
{logger.info(String.format("Starting %s", new Object[] {
getClass().getSimpleName() }));
JobConf jobconf = getJobConf();
jobconf.setJarByClass(WordCount.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(IntWritable.class);
jobconf.setMapperClass(Map.class);
jobconf.setReducerClass(Reduce.class);
jobconf.setInputFormat(TextInputFormat.class);
jobconf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.addInputPath(jobconf, new Path(this.inputPath));
FileOutputFormat.setOutputPath(jobconf, new Path(this.outputPath));
if (this.forceOutputOverrite)
{
FileSystem fs =
FileOutputFormat.getOutputPath(jobconf).getFileSystem(jobconf);
fs.delete(FileOutputFormat.getOutputPath(jobconf), true);
}
super.run();
}
public static class Map extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable>
{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
private long numRecords = 0L;
public void map(LongWritable key, Text value, OutputCollector<Text,
IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
this.word.set(tokenizer.nextToken());
output.collect(this.word, one);
reporter.incrCounter(Counters.INPUT_WORDS, 1L);
}
if (++this.numRecords % 100L == 0L)
reporter.setStatus("Finished processing " + this.numRecords + " records
" + "from the input file");
}
static enum Counters
{
INPUT_WORDS;
}
}3、编写job文件
wordcount.job文件内容如下:
这样hadoopJava类型的任务已经完成,打包提交到Azkaban中执行即可
4、hive类型
1、编写 hive.sql文件
以上是标准的hive的sql脚本,首先切换到azkaban数据库,然后把user_table0 的数据插入到
user_table1 表的指定day_p分区。需要先准备好 user_table0 和 user_table1 表结构和数据。
编写完成后,把文件放入 res 文件夹中。
2、编写hive.job文件
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
int sum = 0;
while (values.hasNext()) {
sum += ((IntWritable)values.next()).get();
}
output.collect(key, new IntWritable(sum));
}
}
}
type=hadoopJava
job.extend=false
job.class=azkaban.jobtype.examples.java.WordCount
classpath=./lib/*,${azkaban.home}/lib/*
force.output.overwrite=true
input.path=/data/yann/input
output.path=/data/yann/output
use azkaban;
INSERT OVERWRITE TABLE
user_table1 PARTITION (day_p='2017-02-08')
SELECT appid,uid,country,province,city
FROM user_table0 where adType=1;关键的参数是 hive.script,该参数指定使用的sql脚本在 res目录下的hive.sql文件
5、spark类型
spark任务有两种运行方式,一种是command类型,另一种是spark类型
首先准备好spark任务的代码
然后准备数据,数据就使用前面hadoopJava中的数据即可。
最后打包成jar文件:spark-template-1.0-SNAPSHOT.jar
1、command类型
command类型的配置方式比较简单,spark.job文件如下:
type=hive
user.to.proxy=azkaban
classpath=./lib/*,${azkaban.home}/lib/*
azk.hive.action=execute.query
hive.script=res/hive.sql
package com.dataeye.template.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext}
object WordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage:WordCount <hdfs_file>")
System.exit(1)
}
System.out.println("get first param ==> " + args(0))
System.out.println("get second param ==> " + args(1))
/** spark 2.0的方式
* val spark = SparkSession.builder().appName("WordCount").getOrCreate()
*/
val sc = new SparkContext(new SparkConf().setAppName("WordCount"))
val spark = new SQLContext(sc)
val file = spark.sparkContext.textFile(args(0))
val wordCounts = file.flatMap(line => line.split(" ")).map(word => (word,
1)).reduceByKey(_ + _)
// 数据collect 到driver端打印
wordCounts.collect().foreach(println _)
}
}type=command
command=${spark.home}/bin/spark-submit --master yarn-cluster --class
com.dataeye.template.spark.WordCount lib/spark-template-1.0-SNAPSHOT.jar
hdfs://de-hdfs/data/yann/info.txt paramtest
2、spark类型
type=spark
master=yarn-cluster
execution-jar=lib/spark-template-1.0-SNAPSHOT.jar
class=com.dataeye.template.spark.WordCount
params=hdfs://de-hdfs/data/yann/info.txt paramtest
以上就是Azkaban支持的几种常用的任务类型。