简介
python中有一个监控文件变化的库,watchdog。包括添加删除文件或目录、修改文件内容、重命名文件或目录等,每种都是一种事件,可自定义方法,用于当事件来临时的动作。
简单用法
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 自定义处理类
class MyHandler(FileSystemEventHandler):
def on_modified(self, event):
print("文件被修改了 %s"%event.src_path)
if __name__ == "__main__":
path = "."
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
如上示例,当当前文件下有文件改变时,会触发on_modified
方法。
下面我们对背后的原理进行分析
watchdog 流程分析
对于以下分析,是基于Windows操作系统的!
对于上述例子,我们自定义处理类是继承FileSystemEventHandler
的,下面是FileSystemEventHandler
的源码:
class FileSystemEventHandler(object):
"""
Base file system event handler that you can override methods from.
"""
def dispatch(self, event):
"""Dispatches events to the appropriate methods.
:param event:
The event object representing the file system event.
:type event:
:class:`FileSystemEvent`
"""
self.on_any_event(event)
_method_map = {
EVENT_TYPE_MODIFIED: self.on_modified,
EVENT_TYPE_MOVED: self.on_moved,
EVENT_TYPE_CREATED: self.on_created,
EVENT_TYPE_DELETED: self.on_deleted,
}
event_type = event.event_type
_method_map[event_type](event)
def on_any_event(self, event):
"""Catch-all event handler.
:param event:
The event object representing the file system event.
:type event:
:class:`FileSystemEvent`
"""
def on_moved(self, event):
"""Called when a file or a directory is moved or renamed.
:param event:
Event representing file/directory movement.
:type event:
:class:`DirMovedEvent` or :class:`FileMovedEvent`
"""
def on_created(self, event):
"""Called when a file or directory is created.
:param event:
Event representing file/directory creation.
:type event:
:class:`DirCreatedEvent` or :class:`FileCreatedEvent`
"""
def on_deleted(self, event):
"""Called when a file or directory is deleted.
:param event:
Event representing file/directory deletion.
:type event:
:class:`DirDeletedEvent` or :class:`FileDeletedEvent`
"""
def on_modified(self, event):
"""Called when a file or directory is modified.
:param event:
Event representing file/directory modification.
:type event:
:class:`DirModifiedEvent` or :class:`FileModifiedEvent`
"""
-
dispatch
方法,用于事件来了,分发事件.其余方法用于处理对应的事件。 - 其实我们除了自定义处理类时能继承
FileSystemEventHandler
,还可以继承其他的类,如RegexMatchingEventHandler(FileSystemEventHandler)
,也可以继承这个类,可以自定义正则表达式,用于匹配要监控的文件目录。当然还有其他类型的类,具体参考源码文件events.py
.
在例子中实例化定义MyHandler
类后,再实例化observer = Observer()
,在Windows下 Observer()
即是WindowsApiObserver
,
WindowsApiObserver
类源码如下:
class WindowsApiObserver(BaseObserver):
"""
Observer thread that schedules watching directories and dispatches
calls to event handlers.
"""
def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
BaseObserver.__init__(self, emitter_class=WindowsApiEmitter,
timeout=timeout)
- 注意这里有个参数
emitter_class=WindowsApiEmitter
,先记下,后面会说
关于BaseObserver
,由于篇幅限制,这里只贴出关键代码:
class BaseThread(threading.Thread):
""" Convenience class for creating stoppable threads. """
def __init__(self):
threading.Thread.__init__(self)
if has_attribute(self, 'daemon'):
self.daemon = True
else:
self.setDaemon(True)
self._stopped_event = Event()
if not has_attribute(self._stopped_event, 'is_set'):
self._stopped_event.is_set = self._stopped_event.isSet
def on_thread_start(self):
pass
def start(self):
self.on_thread_start()
threading.Thread.start(self)
class EventDispatcher(BaseThread):
def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
BaseThread.__init__(self)
self._event_queue = EventQueue()
self._timeout = timeout
@property
def timeout(self):
"""Event queue block timeout."""
return self._timeout
@property
def event_queue(self):
"""The event queue which is populated with file system events
by emitters and from which events are dispatched by a dispatcher
thread."""
return self._event_queue
def dispatch_events(self, event_queue, timeout):
pass
def run(self):
while self.should_keep_running():
try:
self.dispatch_events(self.event_queue, self.timeout)
except queue.Empty:
continue
class BaseObserver(EventDispatcher):
"""Base observer."""
def __init__(self, emitter_class, timeout=DEFAULT_OBSERVER_TIMEOUT):
EventDispatcher.__init__(self, timeout)
self._emitter_class = emitter_class
self._lock = threading.RLock()
self._watches = set()
self._handlers = dict()
self._emitters = set()
self._emitter_for_watch = dict()
def _add_emitter(self, emitter):
self._emitter_for_watch[emitter.watch] = emitter
self._emitters.add(emitter)
def _add_handler_for_watch(self, event_handler, watch):
if watch not in self._handlers:
self._handlers[watch] = set()
self._handlers[watch].add(event_handler)
@property
def emitters(self):
"""Returns event emitter created by this observer."""
return self._emitters
def start(self):
for emitter in self._emitters.copy():
try:
emitter.start()
except Exception:
self._remove_emitter(emitter)
raise
super(BaseObserver, self).start()
def schedule(self, event_handler, path, recursive=False):
with self._lock:
watch = ObservedWatch(path, recursive)
self._add_handler_for_watch(event_handler, watch)
# If we don't have an emitter for this watch already, create it.
if self._emitter_for_watch.get(watch) is None:
emitter = self._emitter_class(event_queue=self.event_queue,
watch=watch,
timeout=self.timeout)
self._add_emitter(emitter)
if self.is_alive():
emitter.start()
self._watches.add(watch)
return watch
def add_handler_for_watch(self, event_handler, watch):
with self._lock:
self._add_handler_for_watch(event_handler, watch)
def dispatch_events(self, event_queue, timeout):
event, watch = event_queue.get(block=True, timeout=timeout)
with self._lock:
# To allow unschedule/stop and safe removal of event handlers
# within event handlers itself, check if the handler is still
# registered after every dispatch.
for handler in list(self._handlers.get(watch, [])):
if handler in self._handlers.get(watch, []):
handler.dispatch(event)
event_queue.task_done()
- 这里继承关系
WindowsApiObserver
-BaseObserver
-EventDispatcher
-BaseThread
最终是一个线程类 - 实例化
WindowsApiObserver
时,执行各个对象的init方法,包括BaseThread
的init方法,启动线程,调用EventDispatcher
的run
方法,循环执行self.dispatch_events(self.event_queue, self.timeout)
,而这个方法如源码所示,从队列中取出事件,handler.dispatch(event)
,调用我们自定义的handler
对象的分发任务的方法。这个过程在程序运行中都是循环进行的,也就是当有事件时,会进行事件的分发处理。 - 然后例子中
observer.schedule(event_handler, path, recursive=True)
,调用schedule
方法,会emitter.start()
,而这个emitter就是上面我们说的emitter_class=WindowsApiEmitter
,会另起一个线程,循环执行queue_events
方法,而这个方法就是传入一个队列,也就是上面说的事件队列,监控指定目录文件,当有事件发生时,就把事件放到队列。 - 放到队列后,也就可以在第二步中进行事件分发了。
总结
-
WindowsApiEmitter
类的作用,是循环监控文件变化等事件,如果有事件产生,就放到事件队列 -
observer
类,循环监控事件队列,如果有事件,就调用handler类分发处理此事件。并且调用这个类的schedule方法,用于启动WindowsApiEmitter
类的线程。当然,WindowsApiEmitter
是Windows下的emitter,也可以是其他系统的。 - 不同的
Emitter
和observer
进行关联。 - 这种设计模式值得学习了解使用下。