1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
| import importlib.util import threading import types from pathlib import Path from typing import Dict
from loguru import logger from watchgod import watch, PythonWatcher
_default = logger
def load_module_from_path(path: str) -> types.ModuleType: """从指定路径 加载module""" spec = importlib.util.spec_from_file_location(Path(path).name, path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module
class PluginMeta: """插件元信息"""
__slots__ = ('name', 'path', 'version', 'call')
def __init__( self, name: str, path: str, version: str, call: types.ModuleType ): self.name = name self.path = path self.version = version self.call = call
class PluginManager: """插件管理器"""
def __init__( self, plugin_path: str, is_reload=True, watch_change_sleep_ms: int = 1000, log=_default, ): self._plugins: Dict[str, PluginMeta] = {} self._plugin_path = Path(plugin_path) self._log = log self.init_plugin(is_reload, watch_change_sleep_ms)
@staticmethod def check_plugin(module: types.ModuleType) -> bool: """对模块格式检查,必须声明 _plugin_name , _plugin_version , run""" status = True if not hasattr(module, '_plugin_name'): logger.error(f'{module.__name__} 没有声明 _plugin_name 变量') status = False if not hasattr(module, '_plugin_version'): logger.error(f'{module.__name__} 没有声明 _plugin_version 变量') status = False if not hasattr(module, 'run') or not callable(module.run): logger.error(f'{module.__name__} 没有定义 run 方法') status = False return status
def init_plugin(self, is_reload: bool, watch_change_sleep_ms: int): """初始化插件目录的插件列表 :param is_reload: 是否自动重载插件 :param watch_change_sleep_ms: 重载插件变化等待间隔 单位毫秒 """ for path in self._plugin_path.glob('*.py'): try: mod = load_module_from_path(str(path)) self.set_plugin(mod, path) except Exception as e: self._log.error(f"plugin {path} load err:{e}") if is_reload: th = threading.Thread( target=self._auto_reload, args=(watch_change_sleep_ms,), daemon=True, ) th.start()
def _auto_reload(self, watch_change_sleep_ms: int = 1000): """根据目录文件变化 重载插件""" for change in watch( str(self._plugin_path), watcher_cls=PythonWatcher, normal_sleep=watch_change_sleep_ms, ): for mod, path in change: try: mod = load_module_from_path(path) self.set_plugin(mod, path) except FileNotFoundError: self.pop_plugin(path) except Exception as e: logger.error(f"plugin {path} load err:{e}")
def set_plugin(self, mod: types.ModuleType, path: Path): """注册插件""" if self.check_plugin(mod): name = mod._plugin_name self._plugins[name] = PluginMeta( name=name, path=str(path), version=mod._plugin_version, call=mod, ) self._log.info( f"plugin: {name} loaded version:{mod._plugin_version}" )
def get_plugin(self, name: str, default=None) -> PluginMeta: """获取插件""" return self._plugins.get(name, default)
def pop_plugin(self, path: str): """使用插件路径对插件进行移除""" for name, meta in list(self._plugins.items()): if meta.path == path: self._plugins.pop(name) logger.info(f"plugin: {name} unload done")
def get_plugin_name_list(self): """获取已注册的插件列表 """ return list(self._plugins.keys())
if __name__ == "__main__": pm = PluginManager('./plugins', is_reload=True, watch_change_sleep_ms=1000)
import time while True: for name in pm.get_plugin_name_list(): mod = pm.get_plugin(name) if mod: mod.call.run() time.sleep(1)
|