# █▀▀ ▄▀█ █▀▄▀█ █▀█ █▀▄ █▀
# █▀░ █▀█ █░▀░█ █▄█ █▄▀ ▄█
# https://t.me/famods
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# ---------------------------------------------------------------------------------
# Name: Spotify4ik
# Description: Слушай музыку в Spotify
# meta developer: @FAmods
# meta banner: https://github.com/FajoX1/FAmods/blob/main/assets/banners/spotify4ik.png?raw=true
# requires: spotipy yt-dlp aiohttp
# ---------------------------------------------------------------------------------
import os
import asyncio
import logging
import tempfile
import aiohttp
import yt_dlp
import spotipy
from telethon import types
from telethon.tl.types import ChatAdminRights
from telethon.tl.functions.channels import EditTitleRequest, EditAdminRequest
from telethon.tl.functions.account import UpdateProfileRequest
from aiogram.types import InputFile
from aiogram.types.inline_keyboard import InlineKeyboardMarkup, InlineKeyboardButton
from .. import loader, utils
logger = logging.getLogger(__name__)
@loader.tds
class Spotify4ik(loader.Module):
"""Слушай музыку в Spotify"""
strings = {
"name": "Spotify4ik",
"go_auth_link": """🔗 Ссылка для авторизации создана!
🔐 Перейди по этой ссылке.
✏️ Потом введи: {}spcode свой_auth_token""",
"need_client_tokens": """🔐 Создай приложение по этой ссылке
‼️ Важно: redirect_url приложения должен быть https://sp.fajox.one
🔑 Заполни client_id и client_secret в {}cfg Spotify4ik
💻 И снова напиши {}spauth""",
"no_auth_token": "❌ Авторизуйся в свой аккаунт через {}spauth",
"no_song_playing": "❌ Сейчас ничего не играет.",
"no_code": "❌ Должно быть {}spcode код_авторизации",
"code_installed": """🔑 Код авторизации установлен!
🎶 Наслаждайся музыкой!""",
"auth_error": "❌ Ошибка авторизации: {}",
"unexpected_error": "❌ Произошла ошибка: {}",
"track_pause": "⏸️ Трек поставлен на паузу.",
"track_play": "🎶 Играю...",
"track_loading": "💻 Загружаю трек...",
"music_bio_disabled": "🎵 Стрим музыки в био выключен",
"music_bio_enabled": "🎵 Стрим музыки в био включен",
"track_skipped": "🎵 Следующий трек...",
"track_repeat": "🔁 Трек будет повторяться.",
"track_norepeat": "🔁 Трек не будет повторяться.",
"track_liked": f"❤️ Трек добавлен в избранное!",
"channel_music_bio_disabled": "🎵 Стрим музыки в био канале выключен!",
"channel_music_bio_enabled": """🎵 Стрим музыки в био канале включен!
ℹ️ Инструкция:
1. Создай публичный канал (название любое)
2. Добавь канал в профиль
3. Добавь @username канала в config (.cfg Spotify4ik → channel)
4. Готово"""
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"client_id",
None,
lambda: "Айди приложения, Получить: https://developer.spotify.com/dashboard",
validator=loader.validators.Hidden(loader.validators.String()),
),
loader.ConfigValue(
"client_secret",
None,
lambda: "Секретный ключ приложения, Получить: https://developer.spotify.com/dashboard",
validator=loader.validators.Hidden(loader.validators.String()),
),
loader.ConfigValue(
"auth_token",
None,
lambda: "Токен для авторизации",
validator=loader.validators.Hidden(loader.validators.String()),
),
loader.ConfigValue(
"refresh_token",
None,
lambda: "Токен для обновления",
validator=loader.validators.Hidden(loader.validators.String()),
),
loader.ConfigValue(
"bio_text",
"🎵 {track_name} - {artist_name}",
lambda: "Текст био с текущим треком",
),
loader.ConfigValue(
"scopes",
(
"user-read-playback-state playlist-read-private playlist-read-collaborative"
" app-remote-control user-modify-playback-state user-library-modify"
" user-library-read"
),
lambda: "Список разрешений",
),
loader.ConfigValue(
"use_ytdl",
False,
lambda: "Для загрузки файла песни использовать yt-dl",
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"channel",
None,
lambda: "Канал для показа текущей музыки в био"
),
loader.ConfigValue(
"stream_upload_track",
False,
lambda: "Загрузка трека в био канал для стриминга",
validator=loader.validators.Boolean(),
)
)
async def client_ready(self, client, db):
self.db = db
self._client = client
self.current_track = ""
self.musicdl = await self.import_lib(
"https://famods.fajox.one/assets/musicdl.py",
suspend_on_error=True,
)
async def _create_stream_messages(self, channel):
await self.client(
EditAdminRequest(
channel=channel,
user_id=self.inline.bot_username,
admin_rights=ChatAdminRights(
change_info=True,
post_messages=True,
edit_messages=True,
delete_messages=True
),
rank="spot"
)
)
audio_path = await self.musicdl.dl("The Lost Soul Down - NBSPLV", only_document=True)
if self.config['stream_upload_track']:
first_message = await self.client.send_file(
channel,
audio_path,
caption="first_message",
attributes=[
types.DocumentAttributeAudio(
duration=0,
title="famods",
performer=""
)
],
thumb="https://github.com/fajox1/famods/raw/main/assets/photo_2025-03-26_17-03-56.jpg"
)
else:
first_message = await self.client.send_message(
channel,
"first_message",
)
display_message = await self.inline.bot.send_message(int("-100"+str(channel.id)), "display_message")
self.db.set(self.name, 'stream_upload_message', self.config['stream_upload_track'])
me = await self.client.get_me()
me_mention = f"@{me.username}" if me.username else (f"@{me.usernames[0].username}" if me.usernames else me.first_name)
try:
await self.inline.bot.set_chat_description(
chat_id=int("-100"+str(channel.id)),
description=f"Current track playing for the {me_mention}"
)
except:
pass
self.db.set(self.name, 'stream_channel_data', {
"first_message": first_message.id,
"display_message": display_message.message_id,
"channel": self.config['channel']
})
@loader.command()
async def spauth(self, message):
"""Войти в свой аккаунт"""
if not self.config['client_id'] or not self.config['client_secret']:
return await utils.answer(message, self.strings['need_client_tokens'].format(self.get_prefix(), self.get_prefix()))
sp_oauth = spotipy.oauth2.SpotifyOAuth(
client_id=self.config['client_id'],
client_secret=self.config['client_secret'],
redirect_uri="https://sp.fajox.one",
scope=self.config['scopes']
)
auth_url = sp_oauth.get_authorize_url()
await utils.answer(message, self.strings['go_auth_link'].format(auth_url, self.get_prefix()))
@loader.command()
async def spcode(self, message):
"""Ввести код авторизации"""
if not self.config['client_id'] or not self.config['client_secret']:
return await utils.answer(message, self.strings['need_client_tokens'].format(self.get_prefix()))
code = utils.get_args_raw(message)
if not code:
return await utils.answer(message, self.strings['no_code'].format(self.get_prefix()))
sp_oauth = spotipy.oauth2.SpotifyOAuth(
client_id=self.config['client_id'],
client_secret=self.config['client_secret'],
redirect_uri="https://sp.fajox.one",
scope=self.config['scopes']
)
token_info = sp_oauth.get_access_token(code)
self.config['auth_token'] = token_info['access_token']
self.config['refresh_token'] = token_info['refresh_token']
try:
sp = spotipy.Spotify(auth=token_info['access_token'])
current_playback = sp.current_playback()
except spotipy.oauth2.SpotifyOauthError as e:
return await utils.answer(message, self.strings['auth_error'].format(str(e)))
except Exception as e:
return await utils.answer(message, self.strings['unexpected_error'].format(str(e)))
await utils.answer(message, self.strings['code_installed'])
@loader.command()
async def sppause(self, message):
"""Поставить на паузу текущий трек"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
sp = spotipy.Spotify(auth=self.config['auth_token'])
try:
sp.pause_playback()
except spotipy.oauth2.SpotifyOauthError as e:
return await utils.answer(message, self.strings['auth_error'].format(str(e)))
except spotipy.exceptions.SpotifyException as e:
if "Restriction violated" in str(e):
return await utils.answer(message, self.strings['track_pause'])
if "The access token expired" in str(e):
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if "NO_ACTIVE_DEVICE" in str(e):
return await utils.answer(message, self.strings['no_song_playing'])
return await utils.answer(message, self.strings['unexpected_error'].format(str(e)))
await utils.answer(message, self.strings['track_pause'])
@loader.command()
async def spplay(self, message):
"""Воспроизвести текущий трек"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
sp = spotipy.Spotify(auth=self.config['auth_token'])
try:
sp.start_playback()
except spotipy.oauth2.SpotifyOauthError as e:
return await utils.answer(message, self.strings['auth_error'].format(str(e)))
except spotipy.exceptions.SpotifyException as e:
if "Restriction violated" in str(e):
return await utils.answer(message, self.strings['track_play'])
if "The access token expired" in str(e):
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if "NO_ACTIVE_DEVICE" in str(e):
return await utils.answer(message, self.strings['no_song_playing'])
return await utils.answer(message, self.strings['unexpected_error'].format(str(e)))
await utils.answer(message, self.strings['track_play'])
@loader.command()
async def spbegin(self, message):
"""Включить текущий трек с начала"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
sp = spotipy.Spotify(auth=self.config['auth_token'])
try:
current_playback = sp.current_playback()
if not current_playback or not current_playback.get('item'):
return await utils.answer(message, self.strings['no_song_playing'])
track_uri = current_playback['item']['uri']
sp.start_playback(uris=[track_uri])
sp.seek_track(0)
await utils.answer(message, self.strings['track_play'])
except spotipy.oauth2.SpotifyOauthError as e:
return await utils.answer(message, self.strings['auth_error'].format(str(e)))
except spotipy.exceptions.SpotifyException as e:
if "The access token expired" in str(e):
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if "NO_ACTIVE_DEVICE" in str(e):
return await utils.answer(message, self.strings['no_song_playing'])
return await utils.answer(message, self.strings['unexpected_error'].format(str(e)))
@loader.command()
async def spback(self, message):
"""Включить предыдущий трек"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
sp = spotipy.Spotify(auth=self.config['auth_token'])
try:
sp.previous_track()
except spotipy.oauth2.SpotifyOauthError as e:
return await utils.answer(message, self.strings['auth_error'].format(str(e)))
except spotipy.exceptions.SpotifyException as e:
if "The access token expired" in str(e):
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if "NO_ACTIVE_DEVICE" in str(e):
return await utils.answer(message, self.strings['no_song_playing'])
return await utils.answer(message, self.strings['unexpected_error'].format(str(e)))
await utils.answer(message, self.strings['track_play'])
@loader.command()
async def spnext(self, message):
"""Включить следующий трек"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
sp = spotipy.Spotify(auth=self.config['auth_token'])
try:
sp.next_track()
except spotipy.oauth2.SpotifyOauthError as e:
return await utils.answer(message, self.strings['auth_error'].format(str(e)))
except spotipy.exceptions.SpotifyException as e:
if "Restriction violated" in str(e):
return await utils.answer(message, self.strings['track_play'])
if "The access token expired" in str(e):
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if "NO_ACTIVE_DEVICE" in str(e):
return await utils.answer(message, self.strings['no_song_playing'])
return await utils.answer(message, self.strings['unexpected_error'].format(str(e)))
await utils.answer(message, self.strings['track_skipped'])
@loader.command()
async def spbio(self, message):
"""Включить/выключить стрим текущего трека в био"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if self.db.get(self.name, "bio_change", False):
self.db.set(self.name, 'bio_change', False)
return await utils.answer(message, self.strings['music_bio_disabled'])
self.db.set(self.name, 'bio_change', True)
await utils.answer(message, self.strings['music_bio_enabled'])
@loader.command()
async def spbiochannel(self, message):
"""Включить/выключить стрим текущего трека в канале в био"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if self.db.get(self.name, "channel_bio_change", False):
self.db.set(self.name, 'channel_bio_change', False)
return await utils.answer(message, self.strings['channel_music_bio_disabled'])
self.db.set(self.name, 'channel_bio_change', True)
await utils.answer(message, self.strings['channel_music_bio_enabled'])
@loader.command()
async def splike(self, message):
"""Лайкнуть текущий трек"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
sp = spotipy.Spotify(auth=self.config['auth_token'])
try:
current_playback = sp.current_playback()
if not current_playback or not current_playback.get('item'):
return await utils.answer(message, self.strings['no_song_playing'])
track_id = current_playback['item']['id']
sp.current_user_saved_tracks_add([track_id])
await utils.answer(message, self.strings['track_liked'])
except spotipy.oauth2.SpotifyOauthError as e:
return await utils.answer(message, self.strings['auth_error'].format(str(e)))
except spotipy.exceptions.SpotifyException as e:
if "The access token expired" in str(e):
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if "NO_ACTIVE_DEVICE" in str(e):
return await utils.answer(message, self.strings['no_song_playing'])
return await utils.answer(message, self.strings['unexpected_error'].format(str(e)))
@loader.command()
async def sprepeat(self, message):
"""Повторить текущий трек"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
sp = spotipy.Spotify(auth=self.config['auth_token'])
try:
current_playback = sp.current_playback()
if not current_playback or not current_playback.get('item'):
return await utils.answer(message, self.strings['no_song_playing'])
sp.repeat("track")
await utils.answer(message, self.strings['track_repeat'])
except spotipy.oauth2.SpotifyOauthError as e:
return await utils.answer(message, self.strings['auth_error'].format(str(e)))
except spotipy.exceptions.SpotifyException as e:
if "The access token expired" in str(e):
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if "NO_ACTIVE_DEVICE" in str(e):
return await utils.answer(message, self.strings['no_song_playing'])
return await utils.answer(message, self.strings['unexpected_error'].format(str(e)))
@loader.command()
async def spnorepeat(self, message):
"""Перестать повторять текущий трек"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
sp = spotipy.Spotify(auth=self.config['auth_token'])
try:
current_playback = sp.current_playback()
if not current_playback or not current_playback.get('item'):
return await utils.answer(message, self.strings['no_song_playing'])
sp.repeat("no")
await utils.answer(message, self.strings['track_norepeat'])
except spotipy.oauth2.SpotifyOauthError as e:
return await utils.answer(message, self.strings['auth_error'].format(str(e)))
except spotipy.exceptions.SpotifyException as e:
if "The access token expired" in str(e):
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if "NO_ACTIVE_DEVICE" in str(e):
return await utils.answer(message, self.strings['no_song_playing'])
return await utils.answer(message, self.strings['unexpected_error'].format(str(e)))
@loader.command()
async def spnow(self, message):
"""Текущий трек"""
if not self.config['auth_token']:
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
try:
sp = spotipy.Spotify(auth=self.config['auth_token'])
current_playback = sp.current_playback()
if not current_playback or not current_playback.get('item'):
return await utils.answer(message, self.strings['no_song_playing'])
await utils.answer(message, self.strings['track_loading'])
track = current_playback['item']
track_name = track.get('name', 'Unknown Track')
artist_name = track['artists'][0].get('name', 'Unknown Artist')
album_name = track['album'].get('name', 'Unknown Album')
duration_ms = track.get('duration_ms', 0)
progress_ms = current_playback.get('progress_ms', 0)
is_playing = current_playback.get('is_playing', False)
duration_min, duration_sec = divmod(duration_ms // 1000, 60)
progress_min, progress_sec = divmod(progress_ms // 1000, 60)
playlist = current_playback.get('context', {}).get('uri', '').split(':')[-1] if current_playback.get('context') else None
device_name = current_playback.get('device', {}).get('name', 'Unknown Device')+" "+current_playback.get('device', {}).get('type', '')
device_type = current_playback.get('device', {}).get('type', 'unknown')
user_profile = sp.current_user()
user_name = user_profile['display_name']
user_id = user_profile['id']
track_url = track['external_urls']['spotify']
user_url = f"https://open.spotify.com/user/{user_id}"
playlist_url = f"https://open.spotify.com/playlist/{playlist}" if playlist else None
track_info = (
f"🎧 Now Playing\n\n"
f"🎶 {track_name} - {artist_name}\n"
f"💿 Album: {album_name}\n\n"
f"🎧 Device: {device_name}\n"
+ (("❤️ From favorite tracks\n" if "playlist/collection" in playlist_url else
f"📑 From Playlist: View\n") if playlist else "")
+ f"\n🔗 Track URL: Open in Spotify"
)
with tempfile.TemporaryDirectory() as temp_dir:
if self.config['use_ytdl']:
audio_path = os.path.join(temp_dir, f"{artist_name} - {track_name}.mp3")
ydl_opts = {
"format": "bestaudio/best[ext=mp3]",
"outtmpl": audio_path,
"noplaylist": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([f"ytsearch1:{track_name} - {artist_name}"])
else:
audio_path = await self.musicdl.dl(f"{artist_name} - {track_name}", only_document=True)
album_art_url = track['album']['images'][0]['url']
async with aiohttp.ClientSession() as session:
async with session.get(album_art_url) as response:
art_path = os.path.join(temp_dir, "cover.jpg")
with open(art_path, "wb") as f:
f.write(await response.read())
await self._client.send_file(
message.chat_id,
audio_path,
caption=track_info,
attributes=[
types.DocumentAttributeAudio(
duration=duration_ms//1000,
title=track_name,
performer=artist_name
)
],
thumb=art_path,
reply_to=message.reply_to_msg_id if message.is_reply else getattr(message, "top_id", None)
)
await message.delete()
except spotipy.oauth2.SpotifyOauthError as e:
return await utils.answer(message, self.strings['auth_error'].format(str(e)))
except spotipy.exceptions.SpotifyException as e:
if "The access token expired" in str(e):
return await utils.answer(message, self.strings['no_auth_token'].format(self.get_prefix()))
if "NO_ACTIVE_DEVICE" in str(e):
return await utils.answer(message, self.strings['no_song_playing'])
return await utils.answer(message, self.strings['unexpected_error'].format(str(e)))
@loader.loop(interval=60*40, autostart=True)
async def loop_token(self):
if not self.config['auth_token']:
return
try:
sp_oauth = spotipy.oauth2.SpotifyOAuth(
client_id=self.config['client_id'],
client_secret=self.config['client_secret'],
redirect_uri="https://sp.fajox.one",
scope=self.config['scopes']
)
token_info = sp_oauth.refresh_access_token(self.config['refresh_token'])
self.config['auth_token'] = token_info['access_token']
self.config['refresh_token'] = token_info['refresh_token']
except Exception as e:
pass
# logger.error(f"Failed to refresh Spotify token: {str(e)}", exc_info=True)
@loader.loop(interval=70, autostart=True)
async def loop(self):
if not self.config['auth_token']:
return
if not self.db.get(self.name, "channel_bio_change", False):
return
if not self.config['channel']:
return
sp = spotipy.Spotify(auth=self.config['auth_token'])
current_playback = sp.current_playback()
if not current_playback or not current_playback.get('item'):
return
track = current_playback['item']
track_name = track.get('name', 'Unknown Track')
artist_name = track['artists'][0].get('name', 'Unknown Artist')
current_track = f"{track_name} - {artist_name}"
last_track = self.db.get(self.name, "last_track", False)
if last_track == current_track:
return
self.db.set(self.name, 'last_track', f"{track_name} - {artist_name}")
channel = await self.client.get_entity(self.config['channel'])
stream_channel_data = self.db.get(self.name, "stream_channel_data", False)
stream_upload_message = self.db.get(self.name, "stream_upload_message", True)
if not stream_channel_data or self.config['channel'] != stream_channel_data['channel'] or self.config['stream_upload_track'] != stream_upload_message:
await self._create_stream_messages(channel)
await asyncio.sleep(3)
stream_channel_data = self.db.get(self.name, "stream_channel_data", False)
artist_link = track['artists'][0]['external_urls']['spotify']
album_name = track['album'].get('name', 'Unknown Album')
track_url = track['external_urls']['spotify']
duration_ms = track.get('duration_ms', 0)
playlist = current_playback.get('context', {}).get('uri', '').split(':')[-1] if current_playback.get('context') else None
device_name = current_playback.get('device', {}).get('name', 'Unknown Device')+" "+current_playback.get('device', {}).get('type', '')
playlist_url = f"https://open.spotify.com/playlist/{playlist}" if playlist else None
keyboard = InlineKeyboardMarkup()
keyboard.add(
InlineKeyboardButton(
text="Open in Spotify",
url=track_url
)
)
now_play_text = f"""
🎶 {track_name} - {artist_name}
💿 Album: {album_name}
🎧 Device: {device_name}
"""+ (("❤️ From favorite tracks\n" if "playlist/collection" in playlist_url else
f"📑 From Playlist: View\n") if playlist else "")
if self.config['stream_upload_track']:
with tempfile.TemporaryDirectory() as temp_dir:
if self.config['use_ytdl']:
audio_path = os.path.join(temp_dir, f"{artist_name} - {track_name}.mp3")
ydl_opts = {
"format": "bestaudio/best[ext=mp3]",
"outtmpl": audio_path,
"noplaylist": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([f"ytsearch1:{track_name} - {artist_name}"])
else:
audio_path = await self.musicdl.dl(f"{artist_name} - {track_name}", only_document=True)
album_art_url = track['album']['images'][0]['url']
async with aiohttp.ClientSession() as session:
async with session.get(album_art_url) as response:
art_path = os.path.join(temp_dir, "cover.jpg")
with open(art_path, "wb") as f:
f.write(await response.read())
await self.inline.bot.set_chat_photo(
chat_id=int("-100"+str(channel.id)),
photo=InputFile(art_path)
)
await self.client.edit_message(
entity=channel.username,
message=stream_channel_data['first_message'],
file=audio_path,
attributes=[
types.DocumentAttributeAudio(
duration=duration_ms//1000,
title=track_name,
performer=artist_name
)
],
thumb=art_path,
text=now_play_text
)
else:
with tempfile.TemporaryDirectory() as temp_dir:
album_art_url = track['album']['images'][0]['url']
async with aiohttp.ClientSession() as session:
async with session.get(album_art_url) as response:
art_path = os.path.join(temp_dir, "cover.jpg")
with open(art_path, "wb") as f:
f.write(await response.read())
await self.inline.bot.set_chat_photo(
chat_id=int("-100"+str(channel.id)),
photo=InputFile(art_path)
)
await self.client.edit_message(
entity=channel.username,
message=stream_channel_data['first_message'],
text="🎧 Now Playing\n"+now_play_text
)
try:
await self.inline.bot.edit_message_reply_markup(
chat_id=int("-100"+str(channel.id)),
message_id=stream_channel_data['first_message'],
reply_markup=keyboard
)
except:
await self._create_stream_messages(channel)
await asyncio.sleep(2.3342)
try:
await self.client.edit_message(
entity=channel,
message=stream_channel_data['display_message'],
text=f"📱 {artist_name}",
link_preview=False
)
except:
pass
await asyncio.sleep(1.3342)
try:
await self.inline.bot.set_chat_title(
chat_id=int("-100"+str(channel.id)),
title=f"🎧 {track_name}"
)
messages = await self.client.get_messages(
entity=channel,
limit=2
)
for message in messages:
await message.delete()
except:
return
@loader.loop(interval=90, autostart=True)
async def loop_bio(self):
if self.db.get(self.name, "bio_change", False):
return
sp = spotipy.Spotify(auth=self.config['auth_token'])
try:
current_playback = sp.current_playback()
if current_playback and current_playback.get('item'):
track = current_playback['item']
track_name = track.get('name', 'Unknown Track')
artist_name = track['artists'][0].get('name', 'Unknown Artist')
bio = self.config['bio_text'].format(track_name=track_name, artist_name=artist_name)
await self._client(UpdateProfileRequest(about=bio[:70]))
except Exception as e:
logger.error(f"Error updating bio: {e}")