# █▀▀ ▄▀█   █▀▄▀█ █▀█ █▀▄ █▀ # █▀░ █▀█   █░▀░█ █▄█ █▄▀ ▄█ # https://t.me/famods # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # --------------------------------------------------------------------------------- # Name: Requirements # Description: Работа с pip пакетами в модуле # meta developer: @FAmods # meta banner: https://github.com/FajoX1/FAmods/blob/main/assets/banners/requirements.png?raw=true # requires: heta # --------------------------------------------------------------------------------- import re import os import heta import shlex import asyncio import logging import tempfile from .. import loader, utils logger = logging.getLogger(__name__) @loader.tds class Requirements(loader.Module): """Работа с pip пакетами в модуле""" strings = { "name": "Requirements", "no_dep": " В модуле нету зависимостей", "only_url_or_hash": " Только ссылка на модуль, или heta hash", "no_file_and_link": " Нужно ответить на файл или {}{} [ссылка или heta hash]", "search_deps": "🕓 Ищу зависимости...", "install_deps": "🕓 Установка зависимостей:\n\n{}", "uninstall_deps": "🕓 Удаление зависимостей:\n\n{}", "installed": " Успешно установил зависимости:\n\n{}\n\n{}", "uninstalled": " Успешно удалил зависимости:\n\n{}", "requirements": "⚙️ Зависимости:\n\n{}", } def __init__(self): self.config = loader.ModuleConfig( loader.ConfigValue( "auto_dlm", False, lambda: "Автоматическая загрузка модуля после установки зависимостей", validator=loader.validators.Boolean() ), ) async def client_ready(self, client, db): self.db = db self._client = client @loader.command() async def dldeps(self, message): """Установить pip пакеты с модуля""" link = utils.get_args_raw(message) r = await message.get_reply_message() if not link and not r: return await utils.answer(message, self.strings['no_file_and_link'].format(self.get_prefix(), "dldeps")) await utils.answer(message, self.strings['search_deps']) _heta = False if link: try: code = heta.module.get_code(link) except: try: m = heta.decode_hash(link) code = heta.module.get_code(m['link']) _heta = True except: return await utils.answer(message, self.strings['only_url_or_hash']) else: with tempfile.TemporaryDirectory() as temp_dir: file_path = os.path.join(temp_dir, r.file.name) await r.download_media(file_path) with open(file_path, 'r', encoding='utf-8') as f: code = f.read() requires_comments = re.findall(r'#\s*requires:\s*(.*)', code) all_requires = ''.join(requires_comments).strip() if not all_requires: return await utils.answer(message, self.strings['no_dep']) await utils.answer(message, self.strings['install_deps'].format(all_requires.replace(" ", "\n"))) requirements_list = shlex.split(all_requires) process = await asyncio.create_subprocess_exec('pip', 'install', *requirements_list) await process.wait() await utils.answer(message, self.strings['installed'].format(all_requires.replace(" ", "\n"), ((f"⬇️ Установите модуль:\n{self.get_prefix()}{'dlh' if _heta else 'dlmod'} {link}" if link else f"⬇️ Установите модуль через {self.get_prefix()}lm (с ответом на файл модуля)") if not self.config['auto_dlm'] else ""))) if not self.config['auto_dlm']: return if _heta: return await self.invoke("dlh", link, message.peer_id) if link: return await self.invoke("dlmod", link, message.peer_id) else: rr = await r.reply("ㅤ") return await self.invoke("loadmod", message=rr, edit=True) @loader.command() async def uldeps(self, message): """Удалить pip пакеты с модуля""" link = utils.get_args_raw(message) r = await message.get_reply_message() if not link and not r: return await utils.answer(message, self.strings['no_file_and_link'].format(self.get_prefix(), "uldeps")) await utils.answer(message, self.strings['search_deps']) if link: try: code = heta.module.get_code(link) except: try: m = heta.decode_hash(link) code = heta.module.get_code(m['link']) except: return await utils.answer(message, self.strings['only_url_or_hash']) else: with tempfile.TemporaryDirectory() as temp_dir: file_path = os.path.join(temp_dir, r.file.name) await r.download_media(file_path) with open(file_path, 'r', encoding='utf-8') as f: code = f.read() requires_comments = re.findall(r'#\s*requires:\s*(.*)', code) all_requires = ''.join(requires_comments).strip() if not all_requires: return await utils.answer(message, self.strings['no_dep']) await utils.answer(message, self.strings['uninstall_deps'].format(all_requires.replace(" ", "\n"))) requirements_list = shlex.split(all_requires) process = await asyncio.create_subprocess_exec('pip', 'uninstall', *requirements_list, '-y') await process.wait() return await utils.answer(message, self.strings['uninstalled'].format(all_requires.replace(" ", "\n"))) @loader.command() async def deps(self, message): """Посмотреть pip пакеты с модуля""" link = utils.get_args_raw(message) r = await message.get_reply_message() if not link and not r: return await utils.answer(message, self.strings['no_file_and_link'].format(self.get_prefix(), "deps")) await utils.answer(message, self.strings['search_deps']) if link: try: code = heta.module.get_code(link) except: try: m = heta.decode_hash(link) code = heta.module.get_code(m['link']) except: return await utils.answer(message, self.strings['only_url_or_hash']) else: with tempfile.TemporaryDirectory() as temp_dir: file_path = os.path.join(temp_dir, r.file.name) await r.download_media(file_path) with open(file_path, 'r', encoding='utf-8') as f: code = f.read() requires_comments = re.findall(r'#\s*requires:\s*(.*)', code) all_requires = ''.join(requires_comments).strip() if not all_requires: return await utils.answer(message, self.strings['no_dep']) return await utils.answer(message, self.strings['requirements'].format(all_requires.replace(" ", "\n"), f"⬇️ Модуль: {link}"))