此程序用于Clickhouse和Hive数据的监控,Clickhouse数据的监控通过jdbc方式实现,Hive数据的监控通过调用subprocess模块,执行shell命令实现,当发现数据不符合设定条件时,会向企业微信群发送告警信息。
#coding=utf-8
from __future__ import division
import subprocess
import socket
import traceback
import sys
configuraion_file="/data/apps/public/conf.ini"
def getConfig():
print(configuraion_file)
import configparser
config = configparser.ConfigParser()
config.read(configuraion_file)
return config
def getLocalHostname():
hostname=""
try:
hostname=socket.gethostname()
except:
traceback.print_exc()
return hostname
def getLocalIp():
ip=""
try:
ip=socket.gethostbyname(socket.gethostname())
except:
traceback.print_exc()
return ip
def ding_send_text(message,token=None,mobiles=[],is_at_all=False):
from dingtalkchatbot.chatbot import DingtalkChatbot
print(message)
print(token)
print(mobiles)
hostname = getLocalHostname()
ip = getLocalIp()
server_info = "%s[%s]" % (hostname, ip)
message = "%s\n%s"%(server_info,message)
if not token:
config = getConfig()
token = config.get("dingding", "token")
# WebHook地址
webhook = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key='+token
# 初始化机器人小丁
xiaoding = DingtalkChatbot(webhook)
if is_at_all:
xiaoding.send_text(msg=message, is_at_all=True)
elif mobiles and len(mobiles) > 0:
xiaoding.send_text(msg=message, is_at_all=False,at_mobiles=mobiles)
else:
xiaoding.send_text(msg=message,is_at_all=False)
#!/usr/bin/env python
#ecoding=utf8
from datetime import datetime
from datetime import timedelta
from sys import argv
def check_clickhouse_data(sql,phone,threshhold,remark):
from clickhouse_driver import Client
client=Client(host='ip',database='app',user='default',password='')
tablePro=client.execute_with_progress("%s"%sql)
tableResu=tablePro.get_result()
print("%s the query data count is %s"%(sql,tableResu[0][0]))
if tableResu[0][0] < threshhold:
print("the data count is ok")
else:
ding_send_text("ClickHouse中,%s "%(remark) ,'629b85****************',mobiles=[phone])
#执行shell命令,并返回结果
def execute_command(cmd):
major = sys.version_info[0]
if major == 2:
pass
# return execute_command2(cmd)
else:
return execute_command3(cmd)
#for python3
def execute_command3(cmd):
status,output = subprocess.getstatusoutput(cmd)
if status == 0:
status = True
else:
status = False
tempLines = output.split("\n")
lines = []
for e in tempLines:
t = e.strip()
if len(t) > 0 :
lines.append(t)
return (status, output, lines)
def check_hive_data(sql,remark="",threshold=0):
cmd = ''' hive -e " %s " '''%(sql)
(status, output, lines) = execute_command(cmd)
c = lines[-2]
if c and int(c) == threshold:
print("ok")
else:
ding_send_text("%s %s 统计数据量大于%s,请及时查看 " % (remark,sql,threshold),'629b85****************', mobiles=["888888"])
if __name__ == "__main__":
tableType= argv[1]
today = datetime.now()
today_str = today.strftime("%Y-%m-%d")
one_day_ago = datetime.now() + timedelta(days=-1)
one_day_ago_str = one_day_ago.strftime("%Y-%m-%d")
if tableType == "clickhouse":
sql1 = ''' select count(1)
from app.test ''' % ( one_day_ago_str )
check_clickhouse_data(sql1,["88888888"],1,“test表数据量为空,请及时查看”")
if tableType == "hive":
sql = "select count(1) from dw.test"
check_hive_data(sql)
代码中需要配置文件conf.ini格式如下:
[dingding]
token=8a2a1327-ac40-44df-9097-d7527916868c
另外,程序中的"629b85***************"替换为自己的企业微信webhook中的key。
程序执行方式,假设命名上述监控程序为warn.py:
#监控hive仓库数据
python3 warn.py hive
#监控Clickhouse仓库数据
python3 warn.py clickhouse