# █▀▀ ▄▀█ █▀▄▀█ █▀█ █▀▄ █▀
# █▀░ █▀█ █░▀░█ █▄█ █▄▀ ▄█
# https://t.me/famods
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# ---------------------------------------------------------------------------------
# Name: Cocoon [BETA]
# Description: Взаимодействие с Cocoon от HikkaHost
# meta developer: @FAmods & @vsecoder_m
# meta banner: https://github.com/FajoX1/FAmods/blob/main/assets/banners/cocoon.png?raw=true
# requires: openai httpx aiohttp bs4 markdown
# ---------------------------------------------------------------------------------
import re
import html
import uuid
import httpx
import asyncio
import logging
import markdown
from openai import AsyncOpenAI
from typing import Optional, Any
from dataclasses import dataclass
from bs4 import BeautifulSoup, NavigableString
from datetime import datetime, timezone, timedelta
from telethon.tl.types import User
from telethon.errors import MessageNotModifiedError
from aiogram.exceptions import TelegramBadRequest
from .. import loader, utils
from ..inline.types import InlineCall
logger = logging.getLogger(__name__)
TG_ALLOWED = {
"b",
"strong",
"i",
"em",
"u",
"ins",
"s",
"strike",
"del",
"a",
"code",
"pre",
"blockquote",
"emoji",
"tg-emoji",
}
TAG_MAP = {"strong": "b", "em": "i", "del": "s", "strike": "s", "ins": "u"}
@dataclass(frozen=True)
class Usage:
spent_nano: int
spent_ton: str
tokens_spent: int
free_tokens_remaining: int
free_tokens_reset_at: Optional[int]
updated_at: Optional[int]
def _now_utc() -> datetime:
return datetime.now(timezone.utc)
def _safe_int(v: Any, default: int = 0) -> int:
try:
return int(v)
except Exception:
return default
def _escape_text(s: str) -> str:
return html.escape(s or "", quote=False)
def _days_until_reset(reset_ts: Optional[int], cycle_days: int = 30) -> int:
"""
Cocoon может отдавать free_tokens_reset_at как момент сброса текущего периода,
который уже мог произойти сегодня. Тогда следующий сброс = reset_ts + cycle_days.
"""
if not reset_ts:
return cycle_days
try:
now = _now_utc()
target = datetime.fromtimestamp(int(reset_ts), tz=timezone.utc)
if target <= now:
target = target + timedelta(days=cycle_days)
delta = target - now
if delta.total_seconds() <= 0:
return 0
return max(delta.days + (1 if delta.seconds > 0 else 0), 0)
except Exception:
return cycle_days
def _normalize_reset_ts(reset_at: Optional[int]) -> Optional[int]:
if not reset_at:
return None
reset_at = _safe_int(reset_at, 0)
if reset_at <= 0:
return None
return reset_at
def _format_compact(n: int) -> str:
n = int(n)
if n >= 1_000_000_000:
return f"{n / 1_000_000_000:.1f}".rstrip("0").rstrip(".") + "b"
if n >= 1_000_000:
return f"{n / 1_000_000:.1f}".rstrip("0").rstrip(".") + "m"
if n >= 1_000:
return f"{n / 1_000:.1f}".rstrip("0").rstrip(".") + "k"
return str(n)
def _percent_remaining(spent: int, total: int) -> float:
if total <= 0:
return 0.0
remaining = max(total - spent, 0)
return (remaining / total) * 100.0
def md_to_tg_html(text: str) -> str:
if not text:
return ""
raw_html = markdown.markdown(text, extensions=["fenced_code", "tables", "nl2br"])
soup = BeautifulSoup(raw_html, "html.parser")
def stringify(node, lang=None):
res = ""
for child in node.children:
if isinstance(child, NavigableString):
res += html.escape(str(child))
elif child.name:
tag_name = child.name
if tag_name in ["h1", "h2", "h3", "h4", "h5", "h6"]:
content = stringify(child)
res += f"{content}\n\n"
elif tag_name == "p":
res += stringify(child) + "\n\n"
elif tag_name == "br":
res += "\n"
elif tag_name == "li":
res += f"• {stringify(child)}\n"
elif tag_name in ["ul", "ol"]:
res += stringify(child) + "\n"
elif tag_name == "tr":
res += "| " + stringify(child) + "\n"
elif tag_name in ["td", "th"]:
res += stringify(child) + " | "
else:
target_tag = TAG_MAP.get(tag_name, tag_name)
if target_tag in TG_ALLOWED:
inner_html = stringify(child)
if not inner_html.strip() and target_tag not in ["code", "pre"]:
res += inner_html
continue
if target_tag == "a":
href = child.get("href", "")
if href:
res += f'{inner_html}'
else:
res += inner_html
elif target_tag == "code":
cls = child.get("class", [])
if cls and cls[0].startswith("language-"):
res += f'{inner_html}'
else:
res += f"{inner_html}"
elif target_tag == "pre":
res += f"
{inner_html}"
else:
res += f"<{target_tag}>{inner_html}{target_tag}>"
else:
res += stringify(child)
return res
final_text = stringify(soup)
final_text = re.sub(r"\n{3,}", "\n\n", final_text)
return final_text.strip()
def repair_html_tags(html_chunk: str) -> str:
if not html_chunk:
return ""
newline_placeholder = f"MARKER_{uuid.uuid4().hex}"
protected_html = html_chunk.replace("\n", newline_placeholder)
soup = BeautifulSoup(protected_html, "html.parser")
repaired_html = soup.decode_contents(formatter=None)
final_html = repaired_html.replace(newline_placeholder, "\n")
return final_html
@loader.tds
class Cocoon(loader.Module):
"""Взаимодействие с Cocoon от HikkaHost"""
strings = {
"name": "Cocoon [BETA]",
"try_again": "{}{} {}",
"no_token": (
"{}cfg cocoon\n\n"
"{thoughts}…" ), "answer": ( "
{thoughts}\n\n" "
{model}"
),
"usage": (
"