设计原因
1.由于客户存在多个商业级运维系统,每个运维系统都完全分开,需要每天登入各个运维系统查看是否存在告警。
2.商业级的运维系统毕竟是通用化,无法完美匹配客户的业务需求,需要自己对商业的运维系统做扩展。
3.商业级的运维系统有时候存在一些bug,对于少数设备无法进行监控和采集。
采集信息详情
- 采集存储数据
- 主机wwn,主机组,LUN组,LUN,端口组,端口等等信息
- 采集光交数据
- 光交硬件信息,光交wwn信息,光交端口错误,性能信息
- 采集vmware虚拟化数据
- 采集vmware主机,宿主机,存储信息
- 采集snmp数据
- 待定
- 接收 snmp trap数据
- 通过trap服务接口,采集以上设备的告警信息,并写入告警系统
- 分布式存储采集
- 采集分布式存储的容量信息,卷组,LUN等等信息
- 其他采集
- 直接通过商业运维系统的数据库进行采集,采集需要的数据进行处理后进行存储。
celery的安装及设置
安装celery
1.直接pip install celery
进行安装。安装redis数据库,配置redis数据库密码。
2.创建项目目录,在目录下创建 conf文件夹。在conf内创建conf.py文件,存储一些配置信息
3.创建celery_run.py
4.创建其他目录,每个目录对应一套业务系统。
from __future__ import absolute_import,unicode_literals
from celery import Celery,platforms
from celery.schedules import crontab
from conf.conf import logger,redis_ip,redis_port,redis_pass,redis_broker,redis_backend
import sys
from os import path
#允许root用户运行
platforms.C_FORCE_ROOT = True
#设置最高40个任务同时运行
CELERYD_MAX_TASKS_PER_CHILD = 40
#设置本地时间
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
#采用redis进行处理
app = Celery(
'manager',
#发布任务
broker='redis://:%s@%s:%s/%s'%(redis_pass,redis_ip,redis_port,redis_broker),
#返回任务结果
backend='redis://:%s@%s:%s/%s'%(redis_pass,redis_ip,redis_port,redis_backend),
#在每个应用目录下创建tasks.py,并将其设置到下面
include=['Brocade.tasks','fst.tasks','alert_system.tasks'])
# celery配置存放
app.conf.update(
#返回任务结果存储1小时
result_expires=3600,
#设置序列模式
task_serializer = 'pickle',
# result_serializer = 'pickle',
accept_content =['pickle'],
# accept_content =['application/json', 'pickle'],
#设置本地时间,启用定时任务时使用
CELERY_TIMEZONE = 'Asia/Shanghai',
CELERY_ENABLE_UTC = False,
)
# 定时任务设置
app.conf.beat_schedule = {
'multiply-at-some-time': {
'task': 'alert_system.tasks.printtest',
'schedule': crontab(hour=12, minute=00), # 每天早上 6 点 00 分执行一次
'args': () # 任务函数参数
}
}
app.conf.timezone = "GMT"
if __name__ == '__main__':
try:
app.start()
except Exception as e:
logger.warning("启动失败%s"%(e))
创建完成后可以直接进行启动测试。celery -A celery_run worker -l info,启动成功正常。
mongodb数据库的设置。
在conf下创建mongodb.py,将数据库的操作全部放置到这里面,其他文件专注业务逻辑。需要先安装monogdb,pip install pymongo
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
from urllib.parse import quote_plus
import sys,datetime
from pymongo import MongoClient
try:
from collect_manager.conf.conf import mong_ip,mong_user,mong_password,mong_app
except Exception:
from .conf.conf import mong_ip, mong_user, mong_password, mong_app
#接收log日志使用。
class Logsystem():
def __init__(self):
self.obj = monogdb().use_logsystem()
def warn(self,info):
data = {
"grade":"warn",
"messages":info,
"datetime":datetime.datetime.now(),
"confirm": False,
}
self.obj.insert(data)
def error(self,info):
data = {
"grade":"error",
"messages":info,
"datetime":datetime.datetime.now(),
"confirm":False,
}
self.obj.insert(data)
def info(self,info):
data = {
"grade":"info",
"messages":info,
"datetime":datetime.datetime.now(),
"confirm":True,
}
self.obj.insert(data)
def fail(self,info):
data = {
"grade":"fail",
"messages":info,
"datetime":datetime.datetime.now(),
"confirm":False,
}
self.obj.insert(data)
# 登入monogdb使用
class monogdb():
def __init__(self):
"""
初始化登入系统
"""
self.url = "mongodb://%s:%s@%s" % (quote_plus(mong_user),quote_plus(mong_password),mong_ip)
self.client = MongoClient(self.url,connect=False)
self.app = mong_app
def Logindatabase(self):
"""
返回登入得数据库
:return:
"""
dataname = self.client[self.app]
return dataname
def use_bro_san_tables(self):
"""
使用bro_san表
:return:
"""
obj = self.Logindatabase()
return obj["bro_san"]
def use_soft_log_tables(self):
obj = self.Logindatabase()
return obj["soft_log"]
def use_test_tables(self):
obj = self.Logindatabase()
return obj["test"]
def use_virtual_machine_tables(self):
obj = self.Logindatabase()
return obj["virtual_machine"]
def use_vmwarestorage_tables(self):
obj = self.Logindatabase()
return obj["vmware_storage"]
def use_vmwarehost_tables(self):
obj = self.Logindatabase()
return obj["vmware_host"]
def use_logsystem(self):
"""
类型:
messages
qrade 错误类型,debug info warn error fail
is_confirm 是否确认
is_delete 是否删除
label 标注,确认是否为重复信息,采集mysql id信息。
datetime 日期
:return:
"""
obj = self.Logindatabase()
return obj["log_system"]
snmp trap的接收服务设置
负责接收trap的信息,由于 设备种类太多,没必要对MIB做解析,直接写入设备IP到日志系统内。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys
sys.path.append(".")
from pysnmp.entity import engine, config
from pysnmp.proto import api
from pyasn1.codec.ber import decoder
from pysnmp.carrier.asynsock.dgram import udp, udp6
from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher
from pysnmp.entity.rfc3413 import ntfrcv
try:
from monogdb_api import Logsystem
except:
from .monogdb_api import Logsystem
Logsystem = Logsystem()
snmpEngine = engine.SnmpEngine()
def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
while wholeMsg:
msgVer = int(api.decodeMessageVersion(wholeMsg))
if msgVer in api.protoModules:
pMod = api.protoModules[msgVer]
else:
print('Unsupported SNMP version %s' % msgVer)
return
reqMsg, wholeMsg = decoder.decode(
wholeMsg, asn1Spec=pMod.Message(),
)
print('Notification message from %s:%s: ' % (
transportDomain, transportAddress
)
)
ipdress = transportAddress[0]
reqPDU = pMod.apiMessage.getPDU(reqMsg)
if reqPDU.isSameTypeWith(pMod.TrapPDU()):
if msgVer == api.protoVersion1:
try:
strlist = 'Enterprise: {},Agent Address: {},Generic Trap: {},Specific Trap: {},Uptime: {}'.format(
pMod.apiTrapPDU.getEnterprise(reqPDU).prettyPrint(),
pMod.apiTrapPDU.getAgentAddr(reqPDU).prettyPrint(),
pMod.apiTrapPDU.getGenericTrap(reqPDU).prettyPrint(),
pMod.apiTrapPDU.getSpecificTrap(reqPDU).prettyPrint(),
pMod.apiTrapPDU.getTimeStamp(reqPDU).prettyPrint()
)
print(strlist)
except Exception as e:
strlist = ("messages error,error({})").format(e)
varBinds = pMod.apiTrapPDU.getVarBindList(reqPDU)
else:
strlist= "error messages is not"
varBinds = pMod.apiPDU.getVarBindList(reqPDU)
Logsystem.error((u"ip ({}) is receive trap error, Please check the device details immediately.\n {}").format(ipdress,strlist))
# print('Var-binds:')
# print(str(varBinds))
# for oid, val in varBinds:
# #a = oid.prettyPrint().strip()
# b = val.prettyPrint().strip().split('\n')
# #print(a)
# for line in b:
# item = line.strip()
# if item.startswith('string-value'):
# print('string-value='+item.replace('string-value=0x','').decode('hex'))
# else:
# print(item)
return wholeMsg
if __name__ == '__main__':
transportDispatcher = AsynsockDispatcher()
transportDispatcher.registerRecvCbFun(cbFun)
# UDP/IPv4
transportDispatcher.registerTransport(
udp.domainName, udp.UdpSocketTransport().openServerMode(('0.0.0.0', 162))
)
# UDP/IPv6
transportDispatcher.registerTransport(
udp6.domainName, udp6.Udp6SocketTransport().openServerMode(('::1', 162))
)
transportDispatcher.jobStarted(1)
try:
# Dispatcher will never finish as job#1 never reaches zero
transportDispatcher.runDispatcher()
except:
transportDispatcher.closeDispatcher()
raise
存储数据的采集
在目录下创建应用目录storage,并在下面创建对应的子目录,比如华为存储设置huawei,富士通存储设置fst.并且在storage下创建tasks.py。存储数据通过web主动点击的方式进行采集。
设备品牌 | 采集方式 | 备注 |
---|---|---|
富士通 | ssh | 通过ssh进行数据采集 |
华为 | ssh | 通过ssh进行数据采集 |
日立 | web | 手工采集 |
EMC | web | 手工采集 |
这个代码相对简单,直接ssh到设备进行采集就行,对于一些低端的设备通过登入web进行采集。
光交数据的采集
每天采集一次,通过ssh的方式进行采集,采集后进行清洗存储。
设备品牌 | 采集方式 | 备注 |
---|---|---|
博科 | ssh | 通过ssh进行数据采集 |
思科 | ssh | 通过ssh进行数据采集 |
vmware虚拟化数据的采集
采集vmware的数据进行存储,对接客户的备份系统,明确查看虚拟化是否成功备份。
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import atexit
import requests
from pyVmomi import vim
from pyVim import connect
import json
from conf.conf import logger
def sizeof_fmt(num):
"""
Returns the human readable version of a file size
:param num:
:return:
"""
for item in ['bytes', 'KB', 'MB', 'GB']:
if num < 1024.0:
return "%3.1f%s" % (num, item)
num /= 1024.0
return "%3.1f%s" % (num, 'TB')
class vsphere:
def __init__(self,host,user,password,port=443):
"""
初始化实例,
:param host:vc主机名
:param user: vc用户
:param password: vc密码
:param port: vc端口,默认443
"""
try:
self.service_instance = connect.SmartConnectNoSSL(host=host,user = user,pwd = password,port = int(port))
atexit.register(connect.Disconnect, self.service_instance)
self.content = self.service_instance.RetrieveContent()
except Exception as e:
logger.error("to vsphere fail - %s"%(e))
#v1.0
def getvm(self):
"""
{'bootTime': None,
'Bios UUID': '423f330a-a578-2c6e-3fb1-dc514a38184d',
'Annotation': '',
'Name': 'GZGL_10.239.37.57',
'VMware-tools': 'toolsOld',
'Template': False,
'memorySizeMB': 16384,
'numdisk': 2,
'Path': '[X10_K01_HITACHI_Cluster03_LUN15] GZGL_10.239.37.57/GZGL_10.239.37.57.vmx',
'IP': 'toolsOld',
'Instance UUID':
'503fd0c6-1379-f1d0-c2ce-2a6ca446b34c',
'Guest': 'Red Hat Enterprise Linux 6 (64-bit)',
'State': 'poweredOn',
'numCpu': 4}
:return: list[dict]
"""
container = self.content.rootFolder # starting point to look into
viewType = [vim.VirtualMachine] # object types to look for
recursive = True # whether we should look into it recursively
containerView = self.content.viewManager.CreateContainerView(
container, viewType, recursive)
children = containerView.view
data = []
for child in children:
summary = child.summary
try:
tempdata = {
"Name":summary.config.name,
"vm_tag":str(summary.vm),
"Template":summary.config.template,
"Path":summary.config.vmPathName,
"Guest":summary.config.guestFullName,
"Instance UUID":summary.config.instanceUuid,
"Bios UUID": summary.config.uuid,
"Annotation":summary.config.annotation,
"State":summary.runtime.powerState,
"VMware-tools":summary.guest.toolsStatus,
"IP":summary.guest.ipAddress,
"memorySizeMB":summary.config.memorySizeMB,
"numCpu":summary.config.numCpu,
"numdisk":summary.config.numVirtualDisks,
"bootTime":summary.runtime.bootTime,
"exsi_tag":str(summary.runtime.host),
"exsi_ip":summary.runtime.host.name,
"storage_committed": summary.storage.committed,
"storage_uncommitted":summary.storage.uncommitted,
"storage_unshared":summary.storage.unshared,
"quickStats_status":summary.quickStats.guestHeartbeatStatus,
"quickStats_uptimeSeconds":summary.quickStats.uptimeSeconds,
"quickStats_hostMemoryUsage":summary.quickStats.hostMemoryUsage,
}
except Exception as e:
tempdata = {
"Name": summary.config.name,
"vm_tag": str(summary.vm),
"Template": summary.config.template,
"Path": summary.config.vmPathName,
"Guest": summary.config.guestFullName,
"Instance UUID": summary.config.instanceUuid,
"Bios UUID": summary.config.uuid,
"Annotation": summary.config.annotation,
"State": summary.runtime.powerState,
"VMware-tools": summary.guest.toolsStatus,
"IP": summary.guest.ipAddress,
"memorySizeMB": summary.config.memorySizeMB,
"numCpu": summary.config.numCpu,
"numdisk": summary.config.numVirtualDisks,
"bootTime": summary.runtime.bootTime,
"exsi_tag": str(summary.runtime.host),
"exsi_ip": summary.runtime.host.name,
#"storage_committed": summary.storage.committed,
#"storage_uncommitted": summary.storage.uncommitted,
#"storage_unshared": summary.storage.unshared,
"quickStats_status": summary.quickStats.guestHeartbeatStatus,
"quickStats_uptimeSeconds": summary.quickStats.uptimeSeconds,
"quickStats_hostMemoryUsage": summary.quickStats.hostMemoryUsage,
}
data.append(tempdata)
return data
def getvm_uuid(self):
container = self.content.rootFolder # starting point to look into
viewType = [vim.VirtualMachine] # object types to look for
recursive = True # whether we should look into it recursively
containerView = self.content.viewManager.CreateContainerView(
container, viewType, recursive)
children = containerView.view
data = []
for child in children:
summary = child.summary
tempdata = {
"Instance UUID":summary.config.instanceUuid,
}
data.append(tempdata)
return data
#废弃
def getexsihost_storage(self):
"""
{
ip:
磁盘名称。
{磁盘信息}
}
:return:dict
"""
objview = self.content.viewManager.CreateContainerView(self.content.rootFolder,
[vim.HostSystem],
True)
esxi_hosts = objview.view
objview.Destroy()
datastores = []
for esxi_host in esxi_hosts:
# All Filesystems on ESXi host
# print(esxi_host)
storage_system = esxi_host.configManager.storageSystem
host_file_sys_vol_mount_info = storage_system.fileSystemVolumeInfo.mountInfo
datastore_dict = {}
# Map all filesystems
for host_mount_info in host_file_sys_vol_mount_info:
# Extract only VMFS volumes
if host_mount_info.volume.type == "VMFS":
extents = host_mount_info.volume.extent
try:
uuid = host_mount_info.volume.uuid
uuids = uuid.replace("-","")
except Exception as e:
uuids = ""
datastore_details = {
'uuid': uuid,
'capacity': host_mount_info.volume.capacity,
'vmfs_version': host_mount_info.volume.version,
'local': host_mount_info.volume.local,
'ssd': host_mount_info.volume.ssd,
}
extent_arr = []
extent_count = 0
for extent in extents:
extent_count += 1
extent_arr.append(extent.diskName)
datastore_details['extents'] = extent_arr
datastore_details["storagename"] = host_mount_info.volume.name
datastore_details["exsi_host"] = esxi_host.name
datastores.append(datastore_details)
return datastores
def getstorage01(self):
"""
"Capacity(GB)": "1.1TB",
"URL": "ds:///vmfs/volumes/5b126d7e-fbee7582-8d66-5c546d5798c3/",
"VmNum": "0",
"hostNum": "1",
"free cap(GB)": "1.1TB",
"Name": "datastore1 (25)"
完成
:return:[{}]
"""
ds_obj_list = self.content.viewManager.CreateContainerView(self.content.rootFolder,
[vim.Datastore],
True)
datastore = []
for ds in ds_obj_list.view:
summary = ds.summary
ds_capacity = summary.capacity
ds_freespace = summary.freeSpace
# ipdata = []
# vmdata = []
esxiip = []
vmip = []
for i in ds.host:
# ipdata.append({str(i.key):i.key.name})
esxiip.append({
"esxiip_tag":str(i.key),
"esxiip":i.key.name
})
for i in ds.vm:
# vmdata.append({str(i):i.name})
vmip.append(
{
"vmip_tag":str(i),
"vmipname":i.name,
}
)
datadict = {
"Name":format(summary.name),
"URL":format(summary.url),
"Capacity":format(sizeof_fmt(ds_capacity)),
"free cap":format(sizeof_fmt(ds_freespace)),
"hostNum":format(len(ds.host)),
"VmNum":format(len(ds.vm)),
"esxiip":esxiip,
"vmip":vmip
}
if datadict:
datastore.append(datadict)
return datastore
#v1.0
def gethost(self):
"""
:return:list
"""
objview = self.content.viewManager.CreateContainerView(self.content.rootFolder,
[vim.HostSystem],
True)
esxi_hosts = objview.view
objview.Destroy()
datastores = []
for esxi_host in esxi_hosts:
systemInfo = esxi_host.hardware.systemInfo
cpuInfo = esxi_host.hardware.cpuInfo
storage_system = esxi_host.configManager.storageSystem
host_file_sys_vol_mount_info = storage_system.fileSystemVolumeInfo.mountInfo
data = {
"esxi_host":esxi_host.name
,"esxi_tag":str(esxi_host)
,"vendor":systemInfo.vendor
,"model":systemInfo.model
,"host_uuid":systemInfo.uuid
,"numCpuPackages":cpuInfo.numCpuPackages
,"numCpuCores":cpuInfo.numCpuCores
,"numCpuThreads":cpuInfo.numCpuThreads
,"hz":cpuInfo.hz
,"memorySize":esxi_host.hardware.memorySize
}
datastores.append(data)
return datastores
#v1.0
def getcluster(self):
"""
获取集群下的IP信息
:return:
"""
objview = self.content.viewManager.CreateContainerView(self.content.rootFolder,
[vim.ClusterComputeResource],
True)
datavcenter = {}
for i in objview.view:
datahost = []
for k in i.host:
datahost.append({str(k):k.name})
datavcenter[i.name] = datahost
# print(json.dumps(datavcenter,indent=1))
return datavcenter
利用supervisor进行进程管理
supervisor安装
1.apt-get install supervisor直接进行安装 。
2.echo_supervisord_conf > /etc/supervisord.conf,生成配置文件
3.配置:
key在配置文件底部添加
[include]
files=/etc/supervisor/*.conf #若你本地无/etc/supervisor目录,直接创建。
4.开机自启动
配置进程开机自启动“
chmod +x /etc/init.d/supervisord
# in debian based:
sudo update-rc.d supervisord defaults
# in redhat
chkconfig --add supervisord
设置开机启动:chkconfig supervisord on
查看是否成功:chkconfig --list | grep supervisord
- 配置程序,在/etc/supervisor内创建3个conf配置文件。并写入以下3个文件。
[program:sanmanager]
directory=/home/collect_manager
command= celery -A run_celery worker -l info
stdout_logfile=/home/collect_manager/run.log
user = root
autostart=true
autorestart=true
startsecs=60
stopasgroup=true
ikillasgroup=true
startretries=1
redirect_stderr=true
[program:sanmanagerbreat]
directory=/home/collect_manager
command= celery -A run_celery heart -l info
stdout_logfile=/home/collect_manager/breat.log
user = root
autostart=true
autorestart=true
startsecs=60
stopasgroup=true
ikillasgroup=true
startretries=1
redirect_stderr=true
[program:trapserver]
directory=/home/collect_manager
command= python3 trapserver.py
stdout_logfile=/home/collect_manager/trapserver.log
user = root
autostart=true
autorestart=true
startsecs=60
stopasgroup=true
ikillasgroup=true
startretries=1
redirect_stderr=true
6.用法
supervisorctl的用法
supervisord : 启动
supervisor supervisorctl reload :修改完配置文件后重新启动
supervisor supervisorctl status :查看supervisor监管的进程状态
supervisorctl start 进程名 :启动XXX进程
supervisorctl stop 进程名 :停止XXX进程
supervisorctl stop all:停止全部进程,注:start、restart、stop都不会载入最新的配置文件。
supervisorctl update:根据最新的配置文件,启动新配置或有改动的进程,配置没有改动的进程不会受影响而重启