# █▀▀ ▄▀█   █▀▄▀█ █▀█ █▀▄ █▀ # █▀░ █▀█   █░▀░█ █▄█ █▄▀ ▄█ # https://t.me/famods # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # --------------------------------------------------------------------------------- # Name: Cocoon [BETA] # Description: Взаимодействие с Cocoon от HikkaHost # meta developer: @FAmods & @vsecoder_m # meta banner: https://github.com/FajoX1/FAmods/blob/main/assets/banners/cocoon.png?raw=true # requires: openai httpx aiohttp bs4 markdown # --------------------------------------------------------------------------------- import re import html import uuid import httpx import asyncio import logging import markdown from openai import AsyncOpenAI from typing import Optional, Any from dataclasses import dataclass from bs4 import BeautifulSoup, NavigableString from datetime import datetime, timezone, timedelta from telethon.tl.types import User from telethon.errors import MessageNotModifiedError from aiogram.exceptions import TelegramBadRequest from .. import loader, utils from ..inline.types import InlineCall logger = logging.getLogger(__name__) TG_ALLOWED = { "b", "strong", "i", "em", "u", "ins", "s", "strike", "del", "a", "code", "pre", "blockquote", "emoji", "tg-emoji", } TAG_MAP = {"strong": "b", "em": "i", "del": "s", "strike": "s", "ins": "u"} @dataclass(frozen=True) class Usage: spent_nano: int spent_ton: str tokens_spent: int free_tokens_remaining: int free_tokens_reset_at: Optional[int] updated_at: Optional[int] def _now_utc() -> datetime: return datetime.now(timezone.utc) def _safe_int(v: Any, default: int = 0) -> int: try: return int(v) except Exception: return default def _escape_text(s: str) -> str: return html.escape(s or "", quote=False) def _days_until_reset(reset_ts: Optional[int], cycle_days: int = 30) -> int: """ Cocoon может отдавать free_tokens_reset_at как момент сброса текущего периода, который уже мог произойти сегодня. Тогда следующий сброс = reset_ts + cycle_days. """ if not reset_ts: return cycle_days try: now = _now_utc() target = datetime.fromtimestamp(int(reset_ts), tz=timezone.utc) if target <= now: target = target + timedelta(days=cycle_days) delta = target - now if delta.total_seconds() <= 0: return 0 return max(delta.days + (1 if delta.seconds > 0 else 0), 0) except Exception: return cycle_days def _normalize_reset_ts(reset_at: Optional[int]) -> Optional[int]: if not reset_at: return None reset_at = _safe_int(reset_at, 0) if reset_at <= 0: return None return reset_at def _format_compact(n: int) -> str: n = int(n) if n >= 1_000_000_000: return f"{n / 1_000_000_000:.1f}".rstrip("0").rstrip(".") + "b" if n >= 1_000_000: return f"{n / 1_000_000:.1f}".rstrip("0").rstrip(".") + "m" if n >= 1_000: return f"{n / 1_000:.1f}".rstrip("0").rstrip(".") + "k" return str(n) def _percent_remaining(spent: int, total: int) -> float: if total <= 0: return 0.0 remaining = max(total - spent, 0) return (remaining / total) * 100.0 def md_to_tg_html(text: str) -> str: if not text: return "" raw_html = markdown.markdown(text, extensions=["fenced_code", "tables", "nl2br"]) soup = BeautifulSoup(raw_html, "html.parser") def stringify(node, lang=None): res = "" for child in node.children: if isinstance(child, NavigableString): res += html.escape(str(child)) elif child.name: tag_name = child.name if tag_name in ["h1", "h2", "h3", "h4", "h5", "h6"]: content = stringify(child) res += f"{content}\n\n" elif tag_name == "p": res += stringify(child) + "\n\n" elif tag_name == "br": res += "\n" elif tag_name == "li": res += f"• {stringify(child)}\n" elif tag_name in ["ul", "ol"]: res += stringify(child) + "\n" elif tag_name == "tr": res += "| " + stringify(child) + "\n" elif tag_name in ["td", "th"]: res += stringify(child) + " | " else: target_tag = TAG_MAP.get(tag_name, tag_name) if target_tag in TG_ALLOWED: inner_html = stringify(child) if not inner_html.strip() and target_tag not in ["code", "pre"]: res += inner_html continue if target_tag == "a": href = child.get("href", "") if href: res += f'{inner_html}' else: res += inner_html elif target_tag == "code": cls = child.get("class", []) if cls and cls[0].startswith("language-"): res += f'{inner_html}' else: res += f"{inner_html}" elif target_tag == "pre": res += f"
{inner_html}
" else: res += f"<{target_tag}>{inner_html}" else: res += stringify(child) return res final_text = stringify(soup) final_text = re.sub(r"\n{3,}", "\n\n", final_text) return final_text.strip() def repair_html_tags(html_chunk: str) -> str: if not html_chunk: return "" newline_placeholder = f"MARKER_{uuid.uuid4().hex}" protected_html = html_chunk.replace("\n", newline_placeholder) soup = BeautifulSoup(protected_html, "html.parser") repaired_html = soup.decode_contents(formatter=None) final_html = repaired_html.replace(newline_placeholder, "\n") return final_html @loader.tds class Cocoon(loader.Module): """Взаимодействие с Cocoon от HikkaHost""" strings = { "name": "Cocoon [BETA]", "try_again": " Что-то пошло не так. Попробуйте снова.", "no_args": " Нужно {}{} {}", "no_token": ( " Нету токена! Вставь его в {}cfg cocoon\n\n" "🌘 Получить токен: @hikkahost_bot → 🥚 Cocoon" ), "invalid_token_or_no_sub": ( "Неверный токен или у вас нет подписки 🌘 HikkaHost.\n\n" "🌘 Получить токен: @hikkahost_bot → 🥚 Cocoon" ), "sending_request_to_cocoon": "🐣 Обрабатываю запрос в Cocoon...", "thinking": ( "🐣 Думаю...\n\n" "
{thoughts}…
" ), "answer": ( "🌘 Вопрос: {question}\n\n" "🐣 Размышления:\n" "
{thoughts}
\n\n" "🥚 {answer}\n\n" "🚀 Модель: {model}" ), "usage": ( "🥚 Cocoon API\n\n" "💡 Использовано:\n" "• {current}/{total} ({percent}% осталось)\n\n" " Лимит сбросится через {days} день(-ей)." ), "again_kb": "🔄 Сгенерировать ещё раз", } def __init__(self): self.config = loader.ModuleConfig( loader.ConfigValue( "token", None, lambda: "Токен HikkaHost API. Получить токен: @hikkahost_bot -> 🥚 Cocoon", validator=loader.validators.Hidden(loader.validators.String()), ), loader.ConfigValue( "model", "Qwen/Qwen3-32B", lambda: "Модель ИИ. Список: https://cocoon.hikka.host/v1/models", ), loader.ConfigValue( "role", "user", lambda: "Роль user-сообщения (обычно user).", validator=loader.validators.Choice( ["system", "developer", "user", "assistant", "tool"] ), ), loader.ConfigValue( "system_prompt", "", lambda: "System prompt (инструкция для модели, role=system).", ), loader.ConfigValue( "max_tokens", 3900, lambda: "Максимальное количество токенов для ответа модели.", validator=loader.validators.Integer(minimum=1), ), loader.ConfigValue( "temperature", 0.2, lambda: "Температура генерации (0.0–1.0).", ), ) async def client_ready(self, client, db): self.db = db self._client = client self.api_url = "https://cocoon.hikka.host/v1" self._openai: Optional[AsyncOpenAI] = None def _rebuild_openai_client(self) -> None: token = self.config.get("token") or "" self._openai = AsyncOpenAI( api_key=token, base_url=self.api_url, timeout=60.0, max_retries=2 ) def _ensure_client(self) -> None: if not self._openai or ( self._openai.api_key != (self.config.get("token") or "") ): self._rebuild_openai_client() async def _answer(self, message, text, *args, **kwargs): try: if len(text) > 4096: text = text[:4090] + "..." return await utils.answer(message, repair_html_tags(text), *args, **kwargs) except (MessageNotModifiedError, TelegramBadRequest): return message async def _fetch_usage(self) -> Optional[Usage]: token = self.config.get("token") if not token: return None headers = { "accept": "application/json", "X-API-Key": token, } try: async with httpx.AsyncClient(timeout=20.0) as client: r = await client.get(f"{self.api_url}/usage", headers=headers) r.raise_for_status() data = r.json() except Exception as e: logger.exception("Usage request failed: %s", e) return None if isinstance(data, dict) and data.get("detail") == "API key not recognized": return None if not isinstance(data, dict): return None return Usage( spent_nano=_safe_int(data.get("spent_nano"), 0), spent_ton=str(data.get("spent_ton", "0")), tokens_spent=_safe_int(data.get("tokens_spent"), 0), free_tokens_remaining=_safe_int(data.get("free_tokens_remaining"), 0), free_tokens_reset_at=( _safe_int(data.get("free_tokens_reset_at"), 0) or None ), updated_at=(_safe_int(data.get("updated_at"), 0) or None), ) async def _regenerate(self, call: InlineCall, arg1, arg2): await self.cocoon(arg1, inline_message=arg2) @loader.command() async def ccusage(self, message): """Статистика использования Cocoon""" if not self.config.get("token"): return await self._answer( message, self.strings["no_token"].format(self.get_prefix()) ) usage = await self._fetch_usage() if not usage: return await self._answer(message, self.strings["invalid_token_or_no_sub"]) reset_ts = _normalize_reset_ts(usage.free_tokens_reset_at) days = _days_until_reset(reset_ts, cycle_days=30) spent = usage.tokens_spent total = spent + usage.free_tokens_remaining if total <= 0: total = 1_000_000 percent = _percent_remaining(spent, total) percent_fmt = f"{percent:.1f}".rstrip("0").rstrip(".") return await self._answer( message, self.strings["usage"].format( current=_format_compact(spent), total=_format_compact(total), percent=percent_fmt, days=days, ), ) @loader.command() async def cocoon(self, message, inline_message=None): """Задать вопрос к ИИ (поддерживает ответ на сообщение)""" q = utils.get_args_raw(message) if not q: return await utils.answer( message, self.strings["no_args"].format(self.get_prefix(), "cocoon", "[вопрос]"), ) if not self.config["token"]: return await utils.answer( message, self.strings["no_token"].format(self.get_prefix()) ) usage = await self._fetch_usage() if not usage: return await utils.answer(message, self.strings["invalid_token_or_no_sub"]) user_message = message if not inline_message: message = await self.inline.form( text="...", message=message, force_me=False, ) else: message = inline_message await utils.answer(message, self.strings["sending_request_to_cocoon"]) self._ensure_client() system_prompt = (self.config.get("system_prompt") or "").strip() messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) if user_message.reply_to: reply = await user_message.get_reply_message() entity_id = None if hasattr(reply, "from_id") and reply.from_id: if hasattr(reply.from_id, "user_id") and reply.from_id.user_id: entity_id = reply.from_id.user_id elif hasattr(reply.from_id, "channel_id") and reply.from_id.channel_id: entity_id = reply.from_id.channel_id if entity_id is None: if hasattr(reply.peer_id, "user_id") and reply.peer_id.user_id: entity_id = reply.peer_id.user_id else: entity_id = reply.peer_id.channel_id entity = await self.client.get_entity(entity_id) date = reply.date.strftime("%H:%M %d.%m.%Y UTC") messages.append( { "role": "user", "content": ( ( f"{entity.first_name} {entity.last_name or ''} (user id: {entity.id}) " if isinstance(entity, User) else f"Channel {entity.title} (channel id: {entity.id}) " ) + f"msg id: {reply.id}, date: {date}: " + reply.message ), } ) messages.append({"role": self.config.get("role", "user"), "content": q}) try: response = await self._openai.chat.completions.create( messages=messages, stream=True, max_tokens=self.config.get("max_tokens", 3900), model=self.config.get("model", "Qwen/Qwen3-32B"), temperature=self.config.get("temperature", 0.2), ) response_text = "" chunk_buffer = "" async for chunk in response: if chunk.choices and chunk.choices[0].delta.content: chunk_buffer += chunk.choices[0].delta.content if len(chunk_buffer) >= 100: response_text += chunk_buffer chunk_buffer = "" thoughts = response_text.split("", 1)[0].replace( "", "" ) if "" in response_text: after_think = response_text.split("", 1)[1].strip() await self._answer( message, self.strings["answer"].format( thoughts=thoughts[:500], question=q, answer=md_to_tg_html(_escape_text(after_think) + "…"), model=self.config["model"], ), ) else: await self._answer( message, self.strings["thinking"].format( thoughts=md_to_tg_html(_escape_text(thoughts) + "…") ), ) await asyncio.sleep(0.2) if chunk_buffer: response_text += chunk_buffer responses_data = response_text.split("", 1) thoughts = responses_data[0].strip().replace("", "") after_think = responses_data[1].strip() await self._answer( message, self.strings["answer"].format( question=q, thoughts=md_to_tg_html(_escape_text(thoughts[:500])), answer=md_to_tg_html(_escape_text(after_think)), model=self.config["model"], ), reply_markup=[ { "text": self.strings["again_kb"], "callback": self._regenerate, "args": [user_message, message], } ], ) except httpx.RemoteProtocolError: return await self._answer(message, self.strings["try_again"])