#!/usr/bin/env python3 """ Author: Michal Szymanski v2.4.2 Tool implementing real-time tracking of Last.fm users music activity: https://github.com/misiektoja/lastfm_monitor/ Python pip3 requirements: pylast requests python-dateutil spotipy (optional, only for Spotify-related features) python-dotenv (optional) beautifulsoup4 (optional, only for followers/followings tracking) """ VERSION = "2.4.2" # --------------------------- # CONFIGURATION SECTION START # --------------------------- CONFIG_BLOCK = """ # Create your Last.fm API key and shared secret at: # https://www.last.fm/api/account/create # # Or retrieve an existing one from: # https://www.last.fm/api/accounts # # Provide the LASTFM_API_KEY and LASTFM_API_SECRET secrets using one of the following methods: # - Pass it at runtime with -r / --lastfm-api-key and -w / --lastfm-secret # - Set it as an environment variable (e.g. export LASTFM_API_KEY=...; export LASTFM_API_SECRET=...) # - Add it to ".env" file (LASTFM_API_KEY=... and LASTFM_API_SECRET=...) for persistent use # Fallback: # - Hard-code it in the code or config file LASTFM_API_KEY = "your_lastfm_api_key" LASTFM_API_SECRET = "your_lastfm_api_secret" # This Spotify Client Credentials OAuth Flow section is optional and only needed if you want to: # - Get track duration from Spotify (via USE_TRACK_DURATION_FROM_SPOTIFY / -r), which is more accurate than Last.fm # - Use automatic playback functionality (via TRACK_SONGS / -g), which requires Spotify track IDs # # To obtain the credentials: # - Log in to Spotify Developer dashboard: https://developer.spotify.com/dashboard # - Create a new app # - For 'Redirect URL', use: http://127.0.0.1:1234 # - Select 'Web API' as the intended API # - Copy the 'Client ID' and 'Client Secret' # # Provide the SP_CLIENT_ID and SP_CLIENT_SECRET secrets using one of the following methods: # - Pass it at runtime with -z / --spotify-creds (use SP_CLIENT_ID:SP_CLIENT_SECRET format - note the colon separator) # - Set it as an environment variable (e.g. export SP_CLIENT_ID=...; export SP_CLIENT_SECRET=...) # - Add it to ".env" file (SP_CLIENT_ID=... and SP_CLIENT_SECRET=...) for persistent use # Fallback: # - Hard-code it in the code or config file # # The tool automatically refreshes the access token, so it remains valid indefinitely SP_CLIENT_ID = "your_spotify_app_client_id" SP_CLIENT_SECRET = "your_spotify_app_client_secret" # Path to cache file used to store OAuth app access tokens across tool restarts # Set to empty to use in-memory cache only SP_TOKENS_FILE = ".lastfm-monitor-oauth-app.json" # SMTP settings for sending email notifications # If left as-is, no notifications will be sent # # Provide the SMTP_PASSWORD secret using one of the following methods: # - Set it as an environment variable (e.g. export SMTP_PASSWORD=...) # - Add it to ".env" file (SMTP_PASSWORD=...) for persistent use # Fallback: # - Hard-code it in the code or config file SMTP_HOST = "your_smtp_server_ssl" SMTP_PORT = 587 SMTP_USER = "your_smtp_user" SMTP_PASSWORD = "your_smtp_password" SMTP_SSL = True SENDER_EMAIL = "your_sender_email" RECEIVER_EMAIL = "your_receiver_email" # Whether to send an email when user becomes active # Can also be enabled via the -a flag ACTIVE_NOTIFICATION = False # Whether to send an email when user goes inactive # Can also be enabled via the -i flag INACTIVE_NOTIFICATION = False # Whether to send an email when a monitored track/album plays # Can also be enabled via the -t flag TRACK_NOTIFICATION = False # Whether to send an email on every song change # Can also be enabled via the -j flag SONG_NOTIFICATION = False # Whether to send an email when user plays a song on loop # Triggered if the same song is played more than SONG_ON_LOOP_VALUE times # Can also be enabled via the -x flag SONG_ON_LOOP_NOTIFICATION = False # Whether to send an email when new scrobbles arrive while user is offline # Can also be enabled via the -f flag OFFLINE_ENTRIES_NOTIFICATION = False # Whether to send an email on errors # Can also be disabled via the -e flag ERROR_NOTIFICATION = True # How often to check for user activity when the user is considered offline (not playing music); in seconds # Can also be set using the -c flag LASTFM_CHECK_INTERVAL = 10 # 10 seconds # How often to check for user activity when the user is online (currently playing); in seconds # Can also be set using the -k flag LASTFM_ACTIVE_CHECK_INTERVAL = 5 # 5 seconds # Time after which a user is considered inactive, based on the last activity; in seconds # Can also be set using the -o flag LASTFM_INACTIVITY_CHECK = 180 # 3 mins # Whether to auto-play each listened song in your Spotify client # Can also be set using the -g flag TRACK_SONGS = False # Whether to display a real-time progress indicator showing the exact minute and second of the track the user # is currently listening to # Can also be set using the -p flag PROGRESS_INDICATOR = False # Set to True to retrieve track duration from Spotify instead of Last.fm # Recommended, as Last.fm often lacks this info or reports inaccurate values # Only works if SP_CLIENT_ID and SP_CLIENT_SECRET are defined (or provided via -z) # Can also be set with the -r flag USE_TRACK_DURATION_FROM_SPOTIFY = False # Whether to hide if duration came from Last.fm or Spotify # Duration marks are not displayed if the functionality to retrieve track duration from Spotify is disabled # Can also be set using the -q flag DO_NOT_SHOW_DURATION_MARKS = False # Multiplier for detecting short breaks in playback # The pause is detected after: LASTFM_BREAK_CHECK_MULTIPLIER * LASTFM_ACTIVE_CHECK_INTERVAL seconds of inactivity # Can be disabled by setting it to 0 # Can also be set using the -m flag LASTFM_BREAK_CHECK_MULTIPLIER = 4 # How many recent tracks we fetch after start and every time user gets online RECENT_TRACKS_NUMBER = 10 # How many recently listened songs to display in the inactive notification email # Set to 0 to disable the recently listened songs list INACTIVE_EMAIL_RECENT_SONGS_COUNT = 5 # Method used to play the song listened by the tracked user in local Spotify client under macOS # (i.e. when TRACK_SONGS / -g functionality is enabled) # Methods: # "apple-script" (recommended) # "trigger-url" SPOTIFY_MACOS_PLAYING_METHOD = "apple-script" # Method used to play the song listened by the tracked user in local Spotify client under Linux OS # (i.e. when TRACK_SONGS / -g functionality is enabled) # Methods: # "dbus-send" (most common one) # "qdbus" # "trigger-url" SPOTIFY_LINUX_PLAYING_METHOD = "dbus-send" # Method used to play the song listened by the tracked user in local Spotify client under Windows OS # (if TRACK_SONGS / -g functionality is enabled) # Methods: # "start-uri" (recommended) # "spotify-cmd" # "trigger-url" SPOTIFY_WINDOWS_PLAYING_METHOD = "start-uri" # Number of consecutive plays of the same song considered to be on loop SONG_ON_LOOP_VALUE = 3 # Threshold for treating a song as skipped, when track duration is unknown (not available from Last.fm/Spotify); in seconds SKIPPED_SONG_THRESHOLD1 = 35 # considered skipped if played for <= 35 seconds # Threshold for treating a song as skipped, when track duration is known; fraction SKIPPED_SONG_THRESHOLD2 = 0.55 # considered skipped if played for <= 55% of its duration # Thresholds for treating a song as "played longer than track duration": # Either if played for >= 130% of duration (fraction) or 30+ seconds beyond expected length LONGER_SONG_THRESHOLD1 = 1.30 # 130% of track duration LONGER_SONG_THRESHOLD2 = 30 # 30 seconds beyond track duration # Spotify track ID to play when the user goes offline (used with track_songs feature) # Leave empty to simply pause # SP_USER_GOT_OFFLINE_TRACK_ID = "5wCjNjnugSUqGDBrmQhn0e" SP_USER_GOT_OFFLINE_TRACK_ID = "" # Delay before pausing the above track after the user goes offline; in seconds # Set to 0 to keep playing indefinitely until manually paused SP_USER_GOT_OFFLINE_DELAY_BEFORE_PAUSE = 5 # 5 seconds # Enable debug mode for full technical logging (can also be enabled via --debug flag) # Shows every API request and internal state changes DEBUG_MODE = False # How often to print a "liveness check" message to the output; in seconds # Set to 0 to disable LIVENESS_CHECK_INTERVAL = 43200 # 12 hours # URL used to verify internet connectivity at startup CHECK_INTERNET_URL = 'https://ws.audioscrobbler.com/' # Timeout used when checking initial internet connectivity; in seconds CHECK_INTERNET_TIMEOUT = 5 # Threshold for displaying Last.fm 50x errors - it is to suppress sporadic issues with Last.fm API endpoint # Adjust the values according to the LASTFM_CHECK_INTERVAL and LASTFM_ACTIVE_CHECK_INTERVAL timers # If more than 15 Last.fm API related errors in 2 minutes, show an alert ERROR_500_NUMBER_LIMIT = 15 ERROR_500_TIME_LIMIT = 120 # 2 min # Threshold for displaying network errors - it is to suppress sporadic issues with internet connectivity # Adjust the values according to the LASTFM_CHECK_INTERVAL and LASTFM_ACTIVE_CHECK_INTERVAL timers # If more than 15 network related errors in 2 minutes, show an alert ERROR_NETWORK_ISSUES_NUMBER_LIMIT = 15 ERROR_NETWORK_ISSUES_TIME_LIMIT = 120 # 2 min # CSV file to write every scrobble # Can also be set using the -b flag CSV_FILE = "" # Filename with Last.fm tracks/albums to alert on # Can also be set using the -s flag MONITOR_LIST_FILE = "" # Location of the optional dotenv file which can keep secrets # If not specified it will try to auto-search for .env files # To disable auto-search, set this to the literal string "none" # Can also be set using the --env-file flag DOTENV_FILE = "" # Base name for the log file. Output will be saved to lastfm_monitor_.log # Can include a directory path to specify the location, e.g. ~/some_dir/lastfm_monitor LF_LOGFILE = "lastfm_monitor" # Whether to disable logging to lastfm_monitor_.log # Can also be disabled via the -d flag DISABLE_LOGGING = False # Width of horizontal line HORIZONTAL_LINE = 113 # Whether to clear the terminal screen after starting the tool CLEAR_SCREEN = True # Value added/subtracted via signal handlers to adjust inactivity timeout (LASTFM_INACTIVITY_CHECK); in seconds LASTFM_INACTIVITY_CHECK_SIGNAL_VALUE = 30 # 30 seconds # Whether to show Spotify URL in console and emails ENABLE_SPOTIFY_URL = True # Whether to show Last.fm URL in console and emails ENABLE_LASTFM_URL = True # Whether to show Last.fm album URL in console and emails ENABLE_LASTFM_ALBUM_URL = True # Whether to show Apple Music URL in console and emails ENABLE_APPLE_MUSIC_URL = True # Whether to show YouTube Music URL in console and emails ENABLE_YOUTUBE_MUSIC_URL = True # Whether to show Amazon Music URL in console and emails ENABLE_AMAZON_MUSIC_URL = False # Whether to show Deezer URL in console and emails ENABLE_DEEZER_URL = False # Whether to show Tidal URL in console and emails # Note: Tidal requires users to be logged in to their account in the web browser to use the search functionality ENABLE_TIDAL_URL = False # Whether to show Genius lyrics URL in console and emails ENABLE_GENIUS_LYRICS_URL = True # Whether to show AZLyrics URL in console and emails ENABLE_AZLYRICS_URL = False # Whether to show Tekstowo.pl lyrics URL in console and emails ENABLE_TEKSTOWO_URL = False # Whether to show Musixmatch lyrics URL in console and emails # Note: Musixmatch requires users to be logged in to their account in the web browser to use the search functionality ENABLE_MUSIXMATCH_URL = False # Whether to show Lyrics.com lyrics URL in console and emails ENABLE_LYRICS_COM_URL = False # Whether to use Last.fm URL in "Last played:" field in HTML email notifications (default: True) # When True: "Last played:" uses Last.fm URL and the secondary URL field shows Spotify URL # When False: "Last played:" uses Spotify URL and the secondary URL field shows Last.fm URL (old behavior) USE_LASTFM_URL_IN_LAST_PLAYED = True # Whether to track user's followings (friends) changes # Can also be enabled via the --track-followings flag TRACK_FOLLOWINGS = False # Whether to track user's followers changes # Can also be enabled via the --track-followers flag TRACK_FOLLOWERS = False # How often to check for followers/followings changes; in seconds # Can also be set using the --friends-check-interval flag FRIENDS_CHECK_INTERVAL = 900 # 15 minutes # Whether to send an email when followers change # Can also be enabled via the --notify-followers flag FOLLOWERS_NOTIFICATION = False # Whether to send an email when followings change # Can also be enabled via the --notify-followings flag FOLLOWINGS_NOTIFICATION = False # Number of consecutive checks required to confirm a change in followers/followings # to avoid false notifications caused by transient API glitches # Also used as the threshold for suppressing repeated error messages # Can also be set using the --friends-change-counter flag FRIENDS_CHANGE_COUNTER = 3 # Timeout used when confirming transient changes; in seconds # If this is set higher than FRIENDS_CHECK_INTERVAL, it effectively throttles the checks # during the confirmation phase # Can also be set using the --friends-retry-interval flag FRIENDS_RETRY_INTERVAL = 90 """ # ------------------------- # CONFIGURATION SECTION END # ------------------------- # Default dummy values so linters shut up # Do not change values below - modify them in the configuration section or config file instead LASTFM_API_KEY = "" LASTFM_API_SECRET = "" SP_CLIENT_ID = "" SP_CLIENT_SECRET = "" SP_TOKENS_FILE = "" SMTP_HOST = "" SMTP_PORT = 0 SMTP_USER = "" SMTP_PASSWORD = "" SMTP_SSL = False SENDER_EMAIL = "" RECEIVER_EMAIL = "" ACTIVE_NOTIFICATION = False INACTIVE_NOTIFICATION = False TRACK_NOTIFICATION = False SONG_NOTIFICATION = False SONG_ON_LOOP_NOTIFICATION = False OFFLINE_ENTRIES_NOTIFICATION = False ERROR_NOTIFICATION = False LASTFM_CHECK_INTERVAL = 0 LASTFM_ACTIVE_CHECK_INTERVAL = 0 LASTFM_INACTIVITY_CHECK = 0 TRACK_SONGS = False PROGRESS_INDICATOR = False USE_TRACK_DURATION_FROM_SPOTIFY = False DO_NOT_SHOW_DURATION_MARKS = False LASTFM_BREAK_CHECK_MULTIPLIER = 0 RECENT_TRACKS_NUMBER = 0 INACTIVE_EMAIL_RECENT_SONGS_COUNT = 0 SPOTIFY_MACOS_PLAYING_METHOD = "" SPOTIFY_LINUX_PLAYING_METHOD = "" SPOTIFY_WINDOWS_PLAYING_METHOD = "" SONG_ON_LOOP_VALUE = 0 SKIPPED_SONG_THRESHOLD1 = 0 SKIPPED_SONG_THRESHOLD2 = 0 LONGER_SONG_THRESHOLD1 = 0 LONGER_SONG_THRESHOLD2 = 0 SP_USER_GOT_OFFLINE_TRACK_ID = "" SP_USER_GOT_OFFLINE_DELAY_BEFORE_PAUSE = 0 LIVENESS_CHECK_INTERVAL = 0 CHECK_INTERNET_URL = "" CHECK_INTERNET_TIMEOUT = 0 ERROR_500_NUMBER_LIMIT = 0 ERROR_500_TIME_LIMIT = 0 ERROR_NETWORK_ISSUES_NUMBER_LIMIT = 0 ERROR_NETWORK_ISSUES_TIME_LIMIT = 0 CSV_FILE = "" MONITOR_LIST_FILE = "" DOTENV_FILE = "" LF_LOGFILE = "" DISABLE_LOGGING = False HORIZONTAL_LINE = 0 CLEAR_SCREEN = False LASTFM_INACTIVITY_CHECK_SIGNAL_VALUE = 0 ENABLE_GENIUS_LYRICS_URL = False ENABLE_AZLYRICS_URL = False ENABLE_TEKSTOWO_URL = False ENABLE_MUSIXMATCH_URL = False ENABLE_LYRICS_COM_URL = False USE_LASTFM_URL_IN_LAST_PLAYED = False ENABLE_SPOTIFY_URL = False ENABLE_LASTFM_URL = False ENABLE_LASTFM_ALBUM_URL = False ENABLE_APPLE_MUSIC_URL = False ENABLE_YOUTUBE_MUSIC_URL = False ENABLE_AMAZON_MUSIC_URL = False ENABLE_DEEZER_URL = False ENABLE_TIDAL_URL = False TRACK_FOLLOWINGS = False TRACK_FOLLOWERS = False FRIENDS_CHECK_INTERVAL = 0 FOLLOWERS_NOTIFICATION = False FOLLOWINGS_NOTIFICATION = False FRIENDS_CHANGE_COUNTER = 0 FRIENDS_RETRY_INTERVAL = 0 DEBUG_MODE = False LASTFM_USERNAME_GLOBAL = "" exec(CONFIG_BLOCK, globals()) # Default name for the optional config file DEFAULT_CONFIG_FILENAME = "lastfm_monitor.conf" # List of secret keys to load from env/config SECRET_KEYS = ("LASTFM_API_KEY", "LASTFM_API_SECRET", "SP_CLIENT_ID", "SP_CLIENT_SECRET", "SMTP_PASSWORD") # Strings removed from track names for generating proper Genius search URLs re_search_str = r'remaster|extended|original mix|remix|rework|vocal mix|original soundtrack|radio( |-)edit|\(feat\.|( \(.*version\))|( - .*version)' re_replace_str = r'( - (\d*)( )*remaster$)|( - (\d*)( )*remastered( version)*( \d*)*.*$)|( \((\d*)( )*remaster\)$)|( - (\d+) - remaster$)|( - extended$)|( - extended mix$)|( - (.*); extended mix$)|( - extended version$)|( - (.*) remix$)|( - remix$)|( - remixed by .*$)|( - (.*) rework$)|( - rework$)|( - vocal mix$)|( - original mix$)|( - .*original soundtrack$)|( - .*radio( |-)edit$)|( \(feat\. .*\)$)|( \(\d+.*Remaster.*\)$)|( \(.*Version\))|( - .*version)' # Default value for Spotify network-related timeouts in functions; in seconds FUNCTION_TIMEOUT = 5 # 5 seconds # Variables for caching functionality of the Spotify access token to avoid unnecessary refreshing SP_CACHED_ACCESS_TOKEN = None LIVENESS_CHECK_COUNTER = LIVENESS_CHECK_INTERVAL / LASTFM_CHECK_INTERVAL stdout_bck = None csvfieldnames = ['Date', 'Artist', 'Track', 'Album'] CLI_CONFIG_PATH = None # to solve the issue: 'SyntaxError: f-string expression part cannot include a backslash' nl_ch = "\n" import sys if sys.version_info < (3, 9): print("* Error: Python version 3.9 or higher required !") sys.exit(1) import time import string import json import os from datetime import datetime from dateutil import relativedelta import calendar import requests as req import signal import smtplib import ssl from email.header import Header from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import argparse import csv try: import pylast except ModuleNotFoundError: raise SystemExit("Error: Couldn't find the pyLast library !\n\nTo install it, run:\n pip install pylast\n\nOnce installed, re-run this tool. For more help, visit:\nhttps://github.com/pylast/pylast") from urllib.parse import quote_plus, quote import subprocess import platform import re import ipaddress from itertools import tee, islice, chain from html import escape import shutil from pathlib import Path from typing import Tuple import base64 import hashlib import hmac # Logger class to output messages to stdout and log file class Logger(object): def __init__(self, filename): self.terminal = sys.stdout self.logfile = open(filename, "a", buffering=1, encoding="utf-8") def write(self, message): self.terminal.write(message) self.logfile.write(message) self.terminal.flush() self.logfile.flush() def flush(self): pass # Signal handler when user presses Ctrl+C def signal_handler(sig, frame): sys.stdout = stdout_bck print('\n* You pressed Ctrl+C, tool is terminated.') sys.exit(0) # Checks internet connectivity def check_internet(url=CHECK_INTERNET_URL, timeout=CHECK_INTERNET_TIMEOUT): try: pylast_version = getattr(pylast, '__version__', 'unknown') headers = {'User-Agent': f'pylast/{pylast_version}'} _ = req.get(url, timeout=timeout, headers=headers) return True except req.RequestException as e: print(f"* No connectivity, please check your network:\n\n{e}") return False # Clears the terminal screen def clear_screen(enabled=True): if not enabled: return try: if platform.system() == 'Windows': os.system('cls') else: os.system('clear') except Exception: print("* Cannot clear the screen contents") # Converts absolute value of seconds to human readable format def display_time(seconds, granularity=2): intervals = ( ('years', 31556952), # approximation ('months', 2629746), # approximation ('weeks', 604800), # 60 * 60 * 24 * 7 ('days', 86400), # 60 * 60 * 24 ('hours', 3600), # 60 * 60 ('minutes', 60), ('seconds', 1), ) result = [] if seconds > 0: for name, count in intervals: value = seconds // count if value: seconds -= value * count if value == 1: name = name.rstrip('s') result.append(f"{value} {name}") return ', '.join(result[:granularity]) else: return '0 seconds' # Calculates time span between two timestamps, accepts timestamp integers, floats and datetime objects def calculate_timespan(timestamp1, timestamp2, show_weeks=True, show_hours=True, show_minutes=True, show_seconds=True, granularity=3): result = [] intervals = ['years', 'months', 'weeks', 'days', 'hours', 'minutes', 'seconds'] ts1 = timestamp1 ts2 = timestamp2 if type(timestamp1) is int: dt1 = datetime.fromtimestamp(int(ts1)) elif type(timestamp1) is float: ts1 = int(round(ts1)) dt1 = datetime.fromtimestamp(ts1) elif type(timestamp1) is datetime: dt1 = timestamp1 ts1 = int(round(dt1.timestamp())) else: return "" if type(timestamp2) is int: dt2 = datetime.fromtimestamp(int(ts2)) elif type(timestamp2) is float: ts2 = int(round(ts2)) dt2 = datetime.fromtimestamp(ts2) elif type(timestamp2) is datetime: dt2 = timestamp2 ts2 = int(round(dt2.timestamp())) else: return "" if ts1 >= ts2: ts_diff = ts1 - ts2 else: ts_diff = ts2 - ts1 dt1, dt2 = dt2, dt1 if ts_diff > 0: date_diff = relativedelta.relativedelta(dt1, dt2) years = date_diff.years months = date_diff.months weeks = date_diff.weeks if not show_weeks: weeks = 0 days = date_diff.days if weeks > 0: days = days - (weeks * 7) hours = date_diff.hours if (not show_hours and ts_diff > 86400): hours = 0 minutes = date_diff.minutes if (not show_minutes and ts_diff > 3600): minutes = 0 seconds = date_diff.seconds if (not show_seconds and ts_diff > 60): seconds = 0 date_list = [years, months, weeks, days, hours, minutes, seconds] for index, interval in enumerate(date_list): if interval > 0: name = intervals[index] if interval == 1: name = name.rstrip('s') result.append(f"{interval} {name}") return ', '.join(result[:granularity]) else: return '0 seconds' # Sends email notification # Sends an email notification def send_email(subject, body, body_html, use_ssl, smtp_timeout=15): debug_print(f"Attempting to send email: {subject}") fqdn_re = re.compile(r'(?=^.{4,253}$)(^((?!-)[a-zA-Z0-9-]{1,63}(? 0: LASTFM_INACTIVITY_CHECK = LASTFM_INACTIVITY_CHECK - LASTFM_INACTIVITY_CHECK_SIGNAL_VALUE sig_name = signal.Signals(sig).name print(f"* Signal {sig_name} received") print(f"* Last.fm timers: [inactivity: {display_time(LASTFM_INACTIVITY_CHECK)}]") print_cur_ts("Timestamp:\t\t\t") # Signal handler for SIGHUP allowing to reload secrets from .env def reload_secrets_signal_handler(sig, frame): sig_name = signal.Signals(sig).name print(f"* Signal {sig_name} received") # disable autoscan if DOTENV_FILE set to none if DOTENV_FILE and DOTENV_FILE.lower() == 'none': env_path = None else: # reload .env if python-dotenv is installed try: from dotenv import load_dotenv, find_dotenv if DOTENV_FILE: env_path = DOTENV_FILE else: env_path = find_dotenv() if env_path: load_dotenv(env_path, override=True) else: print("* No .env file found, skipping env-var reload") except ImportError: env_path = None print("* python-dotenv not installed, skipping env-var reload") if env_path: for secret in SECRET_KEYS: old_val = globals().get(secret) val = os.getenv(secret) if val is not None and val != old_val: globals()[secret] = val print(f"* Reloaded {secret} from {env_path}") print_cur_ts("Timestamp:\t\t\t") # Accesses the previous and next elements of the list def previous_and_next(some_iterable): prevs, items, nexts = tee(some_iterable, 3) prevs = chain([None], prevs) nexts = chain(islice(nexts, 1, None), [None]) return zip(prevs, items, nexts) # Prepares Spotify, Apple & lyrics search URLs for specified track and Last.fm URLs for track and album def get_spotify_apple_genius_search_urls(artist, track, album=None, network=None, track_obj=None): spotify_search_string = quote_plus(f"{artist} {track}") # Clean search string for lyrics services (remove remaster, extended, etc.) lyrics_search_string = f"{artist} {track}" if re.search(re_search_str, lyrics_search_string, re.IGNORECASE): lyrics_search_string = re.sub(re_replace_str, '', lyrics_search_string, flags=re.IGNORECASE) apple_search_string = quote(f"{artist} {track}") spotify_search_url = f"https://open.spotify.com/search/{spotify_search_string}?si=1" apple_search_url = f"https://music.apple.com/pl/search?term={apple_search_string}" genius_search_url = f"https://genius.com/search?q={quote_plus(lyrics_search_string)}" azlyrics_search_url = f"https://www.azlyrics.com/search/?q={quote_plus(lyrics_search_string)}" tekstowo_search_url = f"https://www.tekstowo.pl/szukaj,{quote_plus(lyrics_search_string)}.html" musixmatch_search_url = f"https://www.musixmatch.com/search?query={quote_plus(lyrics_search_string)}" lyrics_com_search_url = f"https://www.lyrics.com/serp.php?st={quote_plus(lyrics_search_string)}&qtype=1" youtube_music_search_url = f"https://music.youtube.com/search?q={spotify_search_string}" amazon_music_search_url = f"https://music.amazon.com/search/{spotify_search_string}" deezer_search_url = f"https://www.deezer.com/search/{spotify_search_string}" tidal_search_url = f"https://tidal.com/search?q={spotify_search_string}" # Get Last.fm URL - use track object if available, otherwise construct manually lastfm_url = "" if track_obj and hasattr(track_obj, 'get_url'): try: lastfm_url = track_obj.get_url() except Exception: # Fallback to manual construction if get_url() fails artist_encoded = quote_plus(str(artist)) track_encoded = quote_plus(str(track)) lastfm_url = f"https://www.last.fm/music/{artist_encoded}/_/{track_encoded}" elif network: # If we have network but no track object, try creating one try: track_obj_temp = pylast.Track(artist, track, network) lastfm_url = track_obj_temp.get_url() except Exception: # Fallback to manual construction artist_encoded = quote_plus(str(artist)) track_encoded = quote_plus(str(track)) lastfm_url = f"https://www.last.fm/music/{artist_encoded}/_/{track_encoded}" else: # No track object or network, construct manually artist_encoded = quote_plus(str(artist)) track_encoded = quote_plus(str(track)) lastfm_url = f"https://www.last.fm/music/{artist_encoded}/_/{track_encoded}" # Get Last.fm album URL if album provided lastfm_album_url = "" if album: if network: try: lastfm_album_url = pylast.Album(artist, album, network).get_url() except Exception: try: # Fallback to manual construction artist_encoded = quote_plus(str(artist)) album_encoded = quote_plus(str(album)) lastfm_album_url = f"https://www.last.fm/music/{artist_encoded}/{album_encoded}" except Exception: lastfm_album_url = "" else: try: artist_encoded = quote_plus(str(artist)) album_encoded = quote_plus(str(album)) lastfm_album_url = f"https://www.last.fm/music/{artist_encoded}/{album_encoded}" except Exception: lastfm_album_url = "" return spotify_search_url, apple_search_url, genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, lastfm_url, lastfm_album_url # Formats lyrics URLs for console output based on configuration def format_lyrics_urls_console(genius_url, azlyrics_url, tekstowo_url, musixmatch_url, lyrics_com_url): lines = [] if ENABLE_GENIUS_LYRICS_URL: lines.append(f"Genius lyrics URL:\t\t{genius_url}") if ENABLE_AZLYRICS_URL: lines.append(f"AZLyrics URL:\t\t\t{azlyrics_url}") if ENABLE_TEKSTOWO_URL: lines.append(f"Tekstowo.pl URL:\t\t{tekstowo_url}") if ENABLE_MUSIXMATCH_URL: lines.append(f"Musixmatch URL:\t\t\t{musixmatch_url}") if ENABLE_LYRICS_COM_URL: lines.append(f"Lyrics.com URL:\t\t\t{lyrics_com_url}") return "\n".join(lines) if lines else "" # Formats lyrics URLs for plain text email body based on configuration def format_lyrics_urls_email_text(genius_url, azlyrics_url, tekstowo_url, musixmatch_url, lyrics_com_url): lines = [] if ENABLE_GENIUS_LYRICS_URL: lines.append(f"Genius lyrics URL: {genius_url}") if ENABLE_AZLYRICS_URL: lines.append(f"AZLyrics URL: {azlyrics_url}") if ENABLE_TEKSTOWO_URL: lines.append(f"Tekstowo.pl URL: {tekstowo_url}") if ENABLE_MUSIXMATCH_URL: lines.append(f"Musixmatch URL: {musixmatch_url}") if ENABLE_LYRICS_COM_URL: lines.append(f"Lyrics.com URL: {lyrics_com_url}") return "\n".join(lines) if lines else "" # Formats lyrics URLs for HTML email body based on configuration def format_lyrics_urls_email_html(genius_url, azlyrics_url, tekstowo_url, musixmatch_url, lyrics_com_url, artist, track): lines = [] escaped_artist = escape(artist) escaped_track = escape(track) if ENABLE_GENIUS_LYRICS_URL: lines.append(f'Genius lyrics URL: {escaped_artist} - {escaped_track}') if ENABLE_AZLYRICS_URL: lines.append(f'AZLyrics URL: {escaped_artist} - {escaped_track}') if ENABLE_TEKSTOWO_URL: lines.append(f'Tekstowo.pl URL: {escaped_artist} - {escaped_track}') if ENABLE_MUSIXMATCH_URL: lines.append(f'Musixmatch URL: {escaped_artist} - {escaped_track}') if ENABLE_LYRICS_COM_URL: lines.append(f'Lyrics.com URL: {escaped_artist} - {escaped_track}') return "
".join(lines) if lines else "" # Formats music service URLs for console output based on configuration # Note: This excludes the primary "Track:" URL which is controlled by USE_LASTFM_URL_IN_LAST_PLAYED def format_music_urls_console(spotify_url, lastfm_url, lastfm_album_url, apple_music_url, youtube_music_url, amazon_music_url, deezer_url, tidal_url): lines = [] if ENABLE_SPOTIFY_URL: lines.append(f"Spotify URL:\t\t\t{spotify_url}") if ENABLE_LASTFM_URL: lines.append(f"Last.fm URL:\t\t\t{lastfm_url}") if ENABLE_LASTFM_ALBUM_URL and lastfm_album_url: lines.append(f"Last.fm album URL:\t\t{lastfm_album_url}") if ENABLE_APPLE_MUSIC_URL: lines.append(f"Apple Music URL:\t\t{apple_music_url}") if ENABLE_YOUTUBE_MUSIC_URL: lines.append(f"YouTube Music URL:\t\t{youtube_music_url}") if ENABLE_AMAZON_MUSIC_URL: lines.append(f"Amazon Music URL:\t\t{amazon_music_url}") if ENABLE_DEEZER_URL: lines.append(f"Deezer URL:\t\t\t{deezer_url}") if ENABLE_TIDAL_URL: lines.append(f"Tidal URL:\t\t\t{tidal_url}") return "\n".join(lines) if lines else "" # Formats music service URLs for plain text email body based on configuration # Note: This excludes the primary "Track:" / "Last played:" URL which is controlled by USE_LASTFM_URL_IN_LAST_PLAYED def format_music_urls_email_text(spotify_url, lastfm_url, lastfm_album_url, apple_music_url, youtube_music_url, amazon_music_url, deezer_url, tidal_url): lines = [] if ENABLE_SPOTIFY_URL: lines.append(f"Spotify URL: {spotify_url}") if ENABLE_LASTFM_URL: lines.append(f"Last.fm URL: {lastfm_url}") if ENABLE_LASTFM_ALBUM_URL and lastfm_album_url: lines.append(f"Last.fm album URL: {lastfm_album_url}") if ENABLE_APPLE_MUSIC_URL: lines.append(f"Apple Music URL: {apple_music_url}") if ENABLE_YOUTUBE_MUSIC_URL: lines.append(f"YouTube Music URL: {youtube_music_url}") if ENABLE_AMAZON_MUSIC_URL: lines.append(f"Amazon Music URL: {amazon_music_url}") if ENABLE_DEEZER_URL: lines.append(f"Deezer URL: {deezer_url}") if ENABLE_TIDAL_URL: lines.append(f"Tidal URL: {tidal_url}") return "\n".join(lines) if lines else "" # Formats music service URLs for HTML email body based on configuration # Note: This excludes the primary "Track:" / "Last played:" URL which is controlled by USE_LASTFM_URL_IN_LAST_PLAYED # Note: Last.fm album URL is not included here as it's part of the Album line in HTML emails # secondary_url and secondary_url_label are the URL and label for the secondary URL field (Spotify or Last.fm) def format_music_urls_email_html(spotify_url, lastfm_url, lastfm_album_url, apple_music_url, youtube_music_url, amazon_music_url, deezer_url, tidal_url, artist, track, secondary_url, secondary_url_label): lines = [] escaped_artist = escape(artist) escaped_track = escape(track) # Secondary URL (Spotify or Last.fm) - only show if enabled if secondary_url_label == "Spotify URL" and ENABLE_SPOTIFY_URL: lines.append(f'{secondary_url_label}: {escaped_artist} - {escaped_track}') elif secondary_url_label == "Last.fm URL" and ENABLE_LASTFM_URL: lines.append(f'{secondary_url_label}: {escaped_artist} - {escaped_track}') if ENABLE_APPLE_MUSIC_URL: lines.append(f'Apple Music URL: {escaped_artist} - {escaped_track}') if ENABLE_YOUTUBE_MUSIC_URL: lines.append(f'YouTube Music URL: {escaped_artist} - {escaped_track}') if ENABLE_AMAZON_MUSIC_URL: lines.append(f'Amazon Music URL: {escaped_artist} - {escaped_track}') if ENABLE_DEEZER_URL: lines.append(f'Deezer URL: {escaped_artist} - {escaped_track}') if ENABLE_TIDAL_URL: lines.append(f'Tidal URL: {escaped_artist} - {escaped_track}') return "
".join(lines) if lines else "" # Returns the list of recently played Last.fm tracks def lastfm_get_recent_tracks(username, network, number): try: recent_tracks = network.get_user(username).get_recent_tracks(limit=number) return recent_tracks except Exception: raise # Returns a set of usernames that the user is following (friends) - scraped from web def lastfm_get_friends(username): from bs4 import BeautifulSoup # type: ignore url = f"https://www.last.fm/user/{quote_plus(username)}/following" pylast_version = getattr(pylast, '__version__', 'unknown') headers = {'User-Agent': f'pylast/{pylast_version} lastfm_monitor'} try: response = req.get(url, headers=headers, timeout=FUNCTION_TIMEOUT * 2) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Check if user has no followings page_text = soup.get_text() if "doesn't follow anyone" in page_text.lower() or "not following anyone" in page_text.lower() or "no followings" in page_text.lower(): return set() followings = set() # Navigation links to exclude (these are page navigation, not actual followings) excluded_paths = { 'overview', 'reports', 'library', 'playlists', 'following', 'followers', 'loved', 'obsessions', 'events', 'neighbours', 'tags', 'shouts', 'charts', 'music', 'events', 'search', 'upgrade', 'pro', 'log', 'sign' } # Look for the main content area that contains followings list followings_container = None container_selectors = [ 'main', '.content', '#content', '.user-list', '.friends-list', '[class*="following"]', '[id*="following"]' ] for container_selector in container_selectors: container = soup.select_one(container_selector) if container: # Check if this container has following-related content container_text = container.get_text().lower() if 'following' in container_text or 'follows' in container_text: followings_container = container break # If we found a container, search within it; otherwise search the whole page search_area = followings_container if followings_container else soup # Look for user profile links - these should be in the format /user/username (not /user/username/something) for link in search_area.find_all('a', href=True): href = link.get('href', '') # Only process links that match /user/username pattern (exactly 2 path segments) if isinstance(href, str) and href.startswith('/user/'): parts = href.split('/') # href should be like /user/username or /user/username?something if len(parts) >= 3 and parts[1] == 'user': user_from_href = parts[2].split('?')[0].split('#')[0] # Exclude navigation items and the user themselves if (user_from_href and user_from_href.lower() not in excluded_paths and user_from_href != username and user_from_href.lower() != username.lower()): # Additional validation: check if the link text looks like a username link_text = link.get_text(strip=True) if link_text and link_text.lower() not in excluded_paths: # Make sure it's not a navigation link by checking parent classes parent_classes = [] parent = link.parent for _ in range(3): # Check up to 3 levels up if parent and hasattr(parent, 'get'): classes = parent.get('class') if classes and isinstance(classes, list): parent_classes.extend([str(c).lower() for c in classes]) parent = parent.parent if hasattr(parent, 'parent') else None # Exclude if it's in navigation-related containers nav_indicators = ['nav', 'menu', 'sidebar', 'header', 'footer', 'tab'] if not any(indicator in ' '.join(parent_classes) for indicator in nav_indicators): followings.add(user_from_href) if not followings: # If we found 0 followings, check if the "no followings" text was present # If not, it might be a scraping error/API glitch page_lower = page_text.lower() empty_indicators = [ "doesn't follow anyone", "not following anyone", "no followings", "anyone yet", "is not following", "isn't following" ] if not any(indicator in page_lower for indicator in empty_indicators): raise RuntimeError("No followings found and no 'empty followings' indicator detected (possible scraping error or transient API issue)") return followings except req.RequestException as e: raise RuntimeError(f"Failed to scrape followings from Last.fm: {e}") except Exception as e: raise RuntimeError(f"Failed to parse followings page: {e}") # Returns a set of usernames that are following the user (scraped from web) def lastfm_get_followers(username): from bs4 import BeautifulSoup # type: ignore url = f"https://www.last.fm/user/{quote_plus(username)}/followers" pylast_version = getattr(pylast, '__version__', 'unknown') headers = {'User-Agent': f'pylast/{pylast_version} lastfm_monitor'} try: response = req.get(url, headers=headers, timeout=FUNCTION_TIMEOUT * 2) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Check if user has no followers page_text = soup.get_text() if "doesn't have any followers" in page_text.lower() or "no followers" in page_text.lower(): return set() followers = set() # Navigation links to exclude (these are page navigation, not actual followers) excluded_paths = { 'overview', 'reports', 'library', 'playlists', 'following', 'followers', 'loved', 'obsessions', 'events', 'neighbours', 'tags', 'shouts', 'charts', 'music', 'events', 'search', 'upgrade', 'pro', 'log', 'sign' } # Look for the main content area that contains followers list # Try to find the followers container first followers_container = None container_selectors = [ 'main', '.content', '#content', '.user-list', '.friends-list', '[class*="followers"]', '[id*="followers"]' ] for container_selector in container_selectors: container = soup.select_one(container_selector) if container: # Check if this container has follower-related content container_text = container.get_text().lower() if 'followers' in container_text or 'following' in container_text: followers_container = container break # If we found a container, search within it; otherwise search the whole page search_area = followers_container if followers_container else soup # Look for user profile links - these should be in the format /user/username (not /user/username/something) for link in search_area.find_all('a', href=True): href = link.get('href', '') # Only process links that match /user/username pattern (exactly 2 path segments) if isinstance(href, str) and href.startswith('/user/'): parts = href.split('/') # href should be like /user/username or /user/username?something if len(parts) >= 3 and parts[1] == 'user': user_from_href = parts[2].split('?')[0].split('#')[0] # Exclude navigation items and the user themselves if (user_from_href and user_from_href.lower() not in excluded_paths and user_from_href != username and user_from_href.lower() != username.lower()): # Additional validation: check if the link text looks like a username # (not like "Overview", "Following", etc. which are navigation) link_text = link.get_text(strip=True) if link_text and link_text.lower() not in excluded_paths: # Make sure it's not a navigation link by checking parent classes parent_classes = [] parent = link.parent for _ in range(3): # Check up to 3 levels up if parent and hasattr(parent, 'get'): classes = parent.get('class') if classes and isinstance(classes, list): parent_classes.extend([str(c).lower() for c in classes]) parent = parent.parent if hasattr(parent, 'parent') else None # Exclude if it's in navigation-related containers nav_indicators = ['nav', 'menu', 'sidebar', 'header', 'footer', 'tab'] if not any(indicator in ' '.join(parent_classes) for indicator in nav_indicators): followers.add(user_from_href) if not followers: # If we found 0 followers, check if the "no followers" text was present # If not, it might be a scraping error/API glitch page_lower = page_text.lower() empty_indicators = [ "doesn't have any followers", "no followers", "any followers yet", "no one is following", "no followers found" ] if not any(indicator in page_lower for indicator in empty_indicators): raise RuntimeError("No followers found and no 'empty followers' indicator detected (possible scraping error or transient API issue)") return followers except req.RequestException as e: raise RuntimeError(f"Failed to scrape followers from Last.fm: {e}") except Exception as e: raise RuntimeError(f"Failed to parse followers page: {e}") # Loads previous friends/followers state from JSON file def load_friends_state(username, friends_type): filename = f"lastfm_{username}_{friends_type}.json" if os.path.isfile(filename): try: with open(filename, 'r', encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): return set(data) elif isinstance(data, dict) and 'users' in data: return set(data['users']) else: return set() except Exception as e: print(f"* Warning: Cannot load {friends_type} state from '{filename}': {e}") return set() return set() # Saves current friends/followers state to JSON file def save_friends_state(username, friends_type, users_set): filename = f"lastfm_{username}_{friends_type}.json" try: data = { 'users': sorted(list(users_set)), 'count': len(users_set), 'last_updated': int(time.time()) } with open(filename, 'w', encoding="utf-8") as f: json.dump(data, f, indent=2) except Exception as e: print(f"* Warning: Cannot save {friends_type} state to '{filename}': {e}") # Checks for changes in friends/followers and returns change information def check_friends_changes(username, track_followings, track_followers, save_state=True, raise_on_error=False): changes = {} if track_followings: try: previous_friends = load_friends_state(username, 'followings') current_friends = lastfm_get_friends(username) added_friends = current_friends - previous_friends removed_friends = previous_friends - current_friends if added_friends or removed_friends: changes['followings'] = { 'added': sorted(list(added_friends)), 'removed': sorted(list(removed_friends)), 'current_count': len(current_friends), 'previous_count': len(previous_friends) } if save_state: save_friends_state(username, 'followings', current_friends) else: # Still save to update timestamp even if no changes if save_state: save_friends_state(username, 'followings', current_friends) except Exception as e: if raise_on_error: raise e if track_followers: try: previous_followers = load_friends_state(username, 'followers') current_followers = lastfm_get_followers(username) added_followers = current_followers - previous_followers removed_followers = previous_followers - current_followers if added_followers or removed_followers: changes['followers'] = { 'added': sorted(list(added_followers)), 'removed': sorted(list(removed_followers)), 'current_count': len(current_followers), 'previous_count': len(previous_followers) } if save_state: save_friends_state(username, 'followers', current_followers) else: # Still save to update timestamp even if no changes if save_state: save_friends_state(username, 'followers', current_followers) except Exception as e: if raise_on_error: raise e return changes # Sends notification about friends/followers changes def notify_friends_changes(username, changes, skip_initial_line=False): if not changes: return current_time = int(time.time()) check_interval_str = display_time(FRIENDS_CHECK_INTERVAL) if FRIENDS_CHECK_INTERVAL > 0 else "N/A" check_range = get_range_of_dates_from_tss(current_time - FRIENDS_CHECK_INTERVAL, current_time, short=True) if FRIENDS_CHECK_INTERVAL > 0 else "" # Handle followings changes if 'followings' in changes: f_changes = changes['followings'] added = f_changes['added'] removed = f_changes['removed'] current_count = f_changes['current_count'] previous_count = f_changes['previous_count'] change_count = len(added) - len(removed) if not skip_initial_line: print("─" * HORIZONTAL_LINE) change_str = f"{change_count:+d}" if change_count != 0 else "0" print(f"* Followings number changed by user {username} from {previous_count} to {current_count} ({change_str})") if added: print(f"\nAdded followings:") print() for user in added: user_url = f"https://www.last.fm/user/{quote_plus(user)}" print(f"- {user} [ {user_url} ]") if removed: if not added: print() elif added: print() print(f"Removed followings:") print() for user in removed: user_url = f"https://www.last.fm/user/{quote_plus(user)}" print(f"- {user} [ {user_url} ]") if FOLLOWINGS_NOTIFICATION: change_str = f"{change_count:+d}" if change_count != 0 else "0" subject = f"Last.fm user {username} followings number has changed! ({change_str}, {previous_count} -> {current_count})" body_parts = [] body_parts.append(f"Followings number changed by user {username} from {previous_count} to {current_count} ({change_str})") body_parts.append("") if added: body_parts.append("Added followings:") body_parts.append("") for user in added: body_parts.append(f"- {user}") if removed: if added: body_parts.append("") body_parts.append("Removed followings:") body_parts.append("") for user in removed: body_parts.append(f"- {user}") body_parts.append("") if check_range: body_parts.append(f"Check interval: {check_interval_str} ({check_range})") body_parts.append(f"Timestamp: {get_cur_ts('')}") html_parts = [] html_parts.append(f"Followings number changed by user {escape(username)} from {previous_count} to {current_count} ({change_str})") html_parts.append("

") if added: html_parts.append("Added followings:") html_parts.append("

") for user in added: user_url = f"https://www.last.fm/user/{quote_plus(user)}" html_parts.append(f'- {escape(user)}') html_parts.append("
") if removed: if added: html_parts.append("
") html_parts.append("Removed followings:") html_parts.append("

") for user in removed: user_url = f"https://www.last.fm/user/{quote_plus(user)}" html_parts.append(f'- {escape(user)}') html_parts.append("
") html_parts.append("
") if check_range: html_parts.append(f"Check interval: {check_interval_str} ({check_range})") html_parts.append(f"
Timestamp: {get_cur_ts('')}") body = "\n".join(body_parts) body_html = f"{''.join(html_parts)}" print(f"\nSending email notification to {RECEIVER_EMAIL}") send_email(subject, body, body_html, SMTP_SSL) if check_range: print(f"\nCheck interval:\t\t\t{check_interval_str} ({check_range})") print_cur_ts("Timestamp:\t\t\t") # Handle followers changes if 'followers' in changes: f_changes = changes['followers'] added = f_changes['added'] removed = f_changes['removed'] current_count = f_changes['current_count'] previous_count = f_changes['previous_count'] change_count = len(added) - len(removed) if not skip_initial_line: print("─" * HORIZONTAL_LINE) change_str = f"{change_count:+d}" if change_count != 0 else "0" print(f"* Followers number changed for user {username} from {previous_count} to {current_count} ({change_str})") if added: print(f"\nAdded followers:") print() for user in added: user_url = f"https://www.last.fm/user/{quote_plus(user)}" print(f"- {user} [ {user_url} ]") if removed: if not added: print() elif added: print() print(f"Removed followers:") print() for user in removed: user_url = f"https://www.last.fm/user/{quote_plus(user)}" print(f"- {user} [ {user_url} ]") if FOLLOWERS_NOTIFICATION: change_str = f"{change_count:+d}" if change_count != 0 else "0" subject = f"Last.fm user {username} followers number has changed! ({change_str}, {previous_count} -> {current_count})" body_parts = [] body_parts.append(f"Followers number changed for user {username} from {previous_count} to {current_count} ({change_str})") body_parts.append("") if added: body_parts.append("Added followers:") body_parts.append("") for user in added: body_parts.append(f"- {user}") if removed: if added: body_parts.append("") body_parts.append("Removed followers:") body_parts.append("") for user in removed: body_parts.append(f"- {user}") body_parts.append("") if check_range: body_parts.append(f"Check interval: {check_interval_str} ({check_range})") body_parts.append(f"Timestamp: {get_cur_ts('')}") html_parts = [] html_parts.append(f"Followers number changed for user {escape(username)} from {previous_count} to {current_count} ({change_str})") html_parts.append("

") if added: html_parts.append("Added followers:") html_parts.append("

") for user in added: user_url = f"https://www.last.fm/user/{quote_plus(user)}" html_parts.append(f'- {escape(user)}') html_parts.append("
") if removed: if added: html_parts.append("
") html_parts.append("Removed followers:") html_parts.append("

") for user in removed: user_url = f"https://www.last.fm/user/{quote_plus(user)}" html_parts.append(f'- {escape(user)}') html_parts.append("
") html_parts.append("
") if check_range: html_parts.append(f"Check interval: {check_interval_str} ({check_range})") html_parts.append(f"
Timestamp: {get_cur_ts('')}") body = "\n".join(body_parts) body_html = f"{''.join(html_parts)}" print(f"\nSending email notification to {RECEIVER_EMAIL}") send_email(subject, body, body_html, SMTP_SSL) if check_range: print(f"\nCheck interval:\t\t\t{check_interval_str} ({check_range})") print_cur_ts("Timestamp:\t\t\t") # Displays the list of recently played Last.fm tracks def lastfm_list_tracks(username, user, network, number, csv_file_name): list_operation = "* Listing & saving" if csv_file_name else "* Listing" print(f"{list_operation} {number} tracks recently listened by {username} ...\n") try: new_track = user.get_now_playing() recent_tracks = lastfm_get_recent_tracks(username, network, number) except Exception as e: print(f"* Error: Cannot display recent tracks for the user: {e}") sys.exit(1) try: if csv_file_name: init_csv_file(csv_file_name) except Exception as e: print(f"* Error: {e}") # Helper function to shorten strings in the middle def _shorten_middle(s, max_len, ellipsis="..."): if s is None: return "" s = str(s) if len(s) <= max_len: return s keep = max_len - len(ellipsis) if keep <= 0: return ellipsis[:max_len] left = keep // 2 right = keep - left return f"{s[:left]}{ellipsis}{s[-right:]}" # Collect track data and identify duplicates track_entries = [] last_played = 0 p = 0 duplicate_entries = False for previous, t, nxt in previous_and_next(reversed(recent_tracks)): i = len(track_entries) + 1 if i == len(recent_tracks): last_played = int(t.timestamp) artist = str(t.track.artist) if t.track.artist else "" title = str(t.track.title) if t.track.title else "" album = str(t.album) if t.album else "" timestamp = int(t.timestamp) date_str = datetime.fromtimestamp(timestamp).strftime("%d %b %Y, %H:%M:%S") day_str = calendar.day_abbr[datetime.fromtimestamp(timestamp).weekday()] is_duplicate = False if previous and previous.timestamp == t.timestamp: p += 1 duplicate_entries = True is_duplicate = True track_entries.append({ 'num': i, 'artist': artist, 'title': title, 'album': album, 'date': date_str, 'day': day_str, 'is_duplicate': is_duplicate }) try: if csv_file_name: write_csv_entry(csv_file_name, datetime.fromtimestamp(timestamp), artist, title, album) except Exception as e: print(f"* Error: {e}") # Calculate column widths based on terminal size try: term_width = shutil.get_terminal_size(fallback=(100, 24)).columns except Exception: term_width = 100 w_num = 4 w_day = 4 w_date = 24 # Find the maximum lengths needed for artist, title, and album max_artist_len = 0 max_title_len = 0 max_album_len = 0 for entry in track_entries: artist_len = len(str(entry['artist'])) if entry['artist'] else 0 title_len = len(str(entry['title'])) if entry['title'] else 0 album_len = len(str(entry['album'])) if entry['album'] else 0 if artist_len > max_artist_len: max_artist_len = artist_len if title_len > max_title_len: max_title_len = title_len if album_len > max_album_len: max_album_len = album_len # Calculate spacing and fixed widths for table width calculation # Format: "# Day Date/Time Artist Title Album" # Total spacing: 2 + 2 + 2 + 2 + 2 = 10 spaces spacing_between_cols = 2 total_spacing = 5 * spacing_between_cols # 5 gaps between 6 columns fixed_cols_width = w_num + w_day + w_date # Calculate available width for variable columns (artist, title, album) available_width = term_width - fixed_cols_width - total_spacing # Allocate space: prioritize artist and title, but always ensure album is visible # Ensure minimum widths w_artist_min = 15 w_title_min = 15 w_album_min = 20 # Album must always be visible with at least this width # Calculate ideal widths (what we'd like if we had unlimited space) ideal_artist = max(w_artist_min, max_artist_len) ideal_title = max(w_title_min, max_title_len) ideal_album = max(w_album_min, max_album_len) # Strategy: Always reserve minimum for album, then prioritize artist and title # First, ensure we have enough space for minimums min_total_needed = w_artist_min + w_title_min + w_album_min if available_width < min_total_needed: # Very narrow terminal: scale everything proportionally but keep minimums scale = available_width / min_total_needed w_artist = max(10, int(w_artist_min * scale)) # Absolute minimum 10 w_title = max(10, int(w_title_min * scale)) w_album = max(10, int(w_album_min * scale)) # Distribute any remainder remaining = available_width - (w_artist + w_title + w_album) if remaining > 0: # Give remainder to title (highest priority after artist) w_title += remaining else: # We have at least minimum space - allocate intelligently # Always reserve minimum for album first space_for_artist_title = available_width - w_album_min # Calculate ideal needs for artist and title if max_artist_len < w_artist_min: # Artist is shorter than minimum - use actual length, give extra to title ideal_artist_actual = max_artist_len extra_for_title = w_artist_min - max_artist_len ideal_title_actual = max(w_title_min, max_title_len + extra_for_title) else: ideal_artist_actual = ideal_artist ideal_title_actual = ideal_title ideal_artist_title_needed = ideal_artist_actual + ideal_title_actual if space_for_artist_title >= ideal_artist_title_needed: # Plenty of space: give artist and title their ideal lengths w_artist = ideal_artist_actual w_title = ideal_title_actual # Album gets what's left (at least minimum, up to ideal) remaining_for_album = available_width - (w_artist + w_title) w_album = min(ideal_album, remaining_for_album) elif space_for_artist_title >= w_artist_min + w_title_min: # Can fit minimums, but need to scale artist/title proportionally if max_artist_len + max_title_len > 0: # Scale proportionally based on their ideal lengths total_ideal = ideal_artist_actual + ideal_title_actual w_artist = int(ideal_artist_actual * space_for_artist_title / total_ideal) # Ensure minimums if max_artist_len < w_artist_min: w_artist = min(w_artist, max_artist_len) else: w_artist = max(w_artist, w_artist_min) w_title = max(w_title_min, space_for_artist_title - w_artist) # Use all available space if w_artist + w_title < space_for_artist_title: w_title = space_for_artist_title - w_artist else: w_artist = w_artist_min if max_artist_len >= w_artist_min else max_artist_len w_title = w_title_min w_album = w_album_min else: # Very constrained: use minimums for all w_artist = w_artist_min if max_artist_len >= w_artist_min else max_artist_len w_title = w_title_min w_album = w_album_min # Final verification: ensure total width doesn't exceed terminal width total_row_width = w_num + w_day + w_date + w_artist + w_title + w_album + total_spacing if total_row_width > term_width: # We need to reduce, but album must stay at minimum excess = total_row_width - term_width # Try to reduce album first, but not below minimum if w_album > w_album_min: reduction = min(excess, w_album - w_album_min) w_album -= reduction excess -= reduction # If still too wide, reduce artist and title proportionally if excess > 0: total_row_width = w_num + w_day + w_date + w_artist + w_title + w_album + total_spacing if total_row_width > term_width: excess = total_row_width - term_width current_artist_title = w_artist + w_title if current_artist_title > excess: # Scale proportionally, but ensure album stays at minimum scale = (current_artist_title - excess) / current_artist_title w_artist = max(10, int(w_artist * scale)) # Absolute minimum 10 w_title = max(10, int(w_title * scale)) # Ensure album is at minimum w_album = w_album_min # Print table header if track_entries: print() hdr = ( f"{'#'.ljust(w_num)} " f"{'Day'.ljust(w_day)} " f"{'Date/Time'.ljust(w_date)} " f"{'Artist'.ljust(w_artist)} " f"{'Title'.ljust(w_title)} " f"{'Album'.ljust(w_album)}" ) sep = ( f"{'-' * w_num} " f"{'-' * w_day} " f"{'-' * w_date} " f"{'-' * w_artist} " f"{'-' * w_title} " f"{'-' * w_album}" ) print(hdr) print(sep) # Print table rows for entry in track_entries: # For duplicates, reserve space for [DUP] prefix dup_prefix_len = 6 # "[DUP] " if entry['is_duplicate']: artist_fmt = _shorten_middle(entry['artist'], w_artist - dup_prefix_len) title_fmt = _shorten_middle(entry['title'], w_title - dup_prefix_len) artist_fmt = f"[DUP] {artist_fmt}" title_fmt = f"[DUP] {title_fmt}" else: artist_fmt = _shorten_middle(entry['artist'], w_artist) title_fmt = _shorten_middle(entry['title'], w_title) album_fmt = _shorten_middle(entry['album'], w_album) row = ( f"{str(entry['num']).ljust(w_num)} " f"{entry['day'].ljust(w_day)} " f"{entry['date'].ljust(w_date)} " f"{artist_fmt.ljust(w_artist)} " f"{title_fmt.ljust(w_title)} " f"{album_fmt.ljust(w_album)}" ) print(row) # Use the calculated table width for the horizontal line print("─" * total_row_width) if last_played > 0 and not new_track: print(f"*** User played last time {calculate_timespan(int(time.time()), last_played, show_seconds=True)} ago! ({get_date_from_ts(last_played)})") if duplicate_entries: print(f"*** Duplicate entries ({p}) found, possible PRIVATE MODE") if new_track: artist = str(new_track.artist) track = str(new_track.title) album = str(new_track.info.get('album', '')) if new_track.info.get('album') else "" print("*** User is currently ACTIVE !") print(f"\nTrack:\t\t{artist} - {track}") print(f"Album:\t\t{album}") # Sends a lightweight request to check token validity since Spotipy deprecates as_dict=True and there is no # get_cached_token() method implemented yet for Client Credentials OAuth Flow def check_token_validity(token): url = "https://api.spotify.com/v1/browse/categories?limit=1&fields=categories.items(id)" pylast_version = getattr(pylast, '__version__', 'unknown') headers = {"Authorization": f"Bearer {token}", "User-Agent": f"pylast/{pylast_version}"} try: return req.get(url, headers=headers, timeout=FUNCTION_TIMEOUT).status_code == 200 except Exception: return False # Gets Spotify access token based on provided sp_client_id & sp_client_secret values (Client Credentials OAuth Flow) def spotify_get_access_token(sp_client_id, sp_client_secret): global SP_CACHED_ACCESS_TOKEN try: from spotipy.oauth2 import SpotifyClientCredentials from spotipy.cache_handler import CacheFileHandler, MemoryCacheHandler except ImportError: print("* Warning: the 'spotipy' package is required for Spotify-related features, install it with `pip install spotipy`") return None if SP_CACHED_ACCESS_TOKEN and check_token_validity(SP_CACHED_ACCESS_TOKEN): debug_print("Using cached Spotify access token") return SP_CACHED_ACCESS_TOKEN if SP_TOKENS_FILE: cache_handler = CacheFileHandler(cache_path=SP_TOKENS_FILE) else: cache_handler = MemoryCacheHandler() auth_manager = SpotifyClientCredentials(client_id=sp_client_id, client_secret=sp_client_secret, cache_handler=cache_handler) SP_CACHED_ACCESS_TOKEN = auth_manager.get_access_token(as_dict=False) debug_print("Successfully obtained new Spotify access token") return SP_CACHED_ACCESS_TOKEN # Converts Spotify URI (e.g. spotify:user:username) to URL (e.g. https://open.spotify.com/user/username) def spotify_convert_uri_to_url(uri): # add si parameter so link opens in native Spotify app after clicking si = "?si=1" # si="" url = "" if "spotify:user:" in uri: s_id = uri.split(':', 2)[2] url = f"https://open.spotify.com/user/{s_id}{si}" elif "spotify:artist:" in uri: s_id = uri.split(':', 2)[2] url = f"https://open.spotify.com/artist/{s_id}{si}" elif "spotify:track:" in uri: s_id = uri.split(':', 2)[2] url = f"https://open.spotify.com/track/{s_id}{si}" elif "spotify:album:" in uri: s_id = uri.split(':', 2)[2] url = f"https://open.spotify.com/album/{s_id}{si}" elif "spotify:playlist:" in uri: s_id = uri.split(':', 2)[2] url = f"https://open.spotify.com/playlist/{s_id}{si}" return url # Processes track items returned by Spotify search Web API def spotify_search_process_track_items(track_items, original_artist, original_track, cleaned_track=None, original_album=None): sp_track_uri_id = None sp_track_duration = 0 best_item = None best_score = -1 for item in track_items: item_name = str(item.get("name")) item_artists_list = [a.get("name") for a in item.get("artists", [])] item_artists_str = ", ".join(item_artists_list) item_album_name = item.get("album", {}).get("name", "") item_duration = int(item.get("duration_ms", 0) / 1000) debug_print(f" Found item: {item_artists_str} - {item_name} [{item_album_name}] ({item_duration}s)") # Artist match check artist_match = any(original_artist.lower() in a.lower() for a in item_artists_list) if not artist_match: debug_print(" Skipping item (artist mismatch)") continue score = 0 if item_name.lower() == original_track.lower(): score = 100 # Perfect match with original name elif cleaned_track and item_name.lower() == cleaned_track.lower(): score = 80 # Match with cleaned name elif original_track.lower() in item_name.lower() or item_name.lower() in original_track.lower(): score = 50 # Partial match # Album match bonus (+20 points) if original_album and item_album_name and item_album_name.lower() == original_album.lower(): score += 20 debug_print(f" Album match! (+20 bonus)") if score > best_score: best_score = score best_item = item debug_print(f" => New best match! (score={score})") else: debug_print(f" => Match not better than current best (score={score})") if best_item and best_score > 0: sp_track_uri_id = best_item.get("id") sp_track_duration = int(best_item.get("duration_ms") / 1000) return sp_track_uri_id, sp_track_duration # Returns Spotify track ID & duration for specific artist, track and optionally album def spotify_search_song_trackid_duration(access_token, artist, track, album=""): artist, track = map(str, (artist, track)) album = str(album) if album else "" re_chars_to_remove = r'([\"\'])' artist_sanitized = re.sub(re_chars_to_remove, '', artist, flags=re.IGNORECASE) track_sanitized = re.sub(re_chars_to_remove, '', track, flags=re.IGNORECASE) album_sanitized = re.sub(re_chars_to_remove, '', album, flags=re.IGNORECASE) debug_print(f"Checking Spotify for track duration. Strategy: URL_SPECIFIC_FULL -> URL_SPECIFIC_FIELD -> URL_SPECIFIC_PHRASE -> URL_CLEANED_FIELD -> URL_BROAD") url_specific_full = f'https://api.spotify.com/v1/search?q={quote(f"artist:\"{artist_sanitized}\" track:\"{track_sanitized}\" album:\"{album_sanitized}\"")}&type=track&limit=5' url_specific_field = f'https://api.spotify.com/v1/search?q={quote(f"artist:\"{artist_sanitized}\" track:\"{track_sanitized}\"")}&type=track&limit=5' url_specific_phrase = f'https://api.spotify.com/v1/search?q={quote(f"\"{artist_sanitized}\" \"{track_sanitized}\"")}&type=track&limit=5' debug_print(f"Spotify search URL_SPECIFIC_FULL: {url_specific_full}") debug_print(f"Spotify search URL_SPECIFIC_FIELD: {url_specific_field}") debug_print(f"Spotify search URL_SPECIFIC_PHRASE: {url_specific_phrase}") pylast_version = getattr(pylast, '__version__', 'unknown') # Using a browser-like User-Agent to avoid potential API filtering/limitations user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36" headers = {"Authorization": "Bearer " + access_token, "User-Agent": user_agent} sp_track_uri_id = None sp_track_duration = 0 if album: try: response = req.get(url_specific_full, headers=headers, timeout=FUNCTION_TIMEOUT) response.raise_for_status() json_response = response.json() if json_response.get("tracks"): total = json_response["tracks"].get("total", 0) debug_print(f"URL_SPECIFIC_FULL found {total} tracks") if total > 0: sp_track_uri_id, sp_track_duration = spotify_search_process_track_items(json_response["tracks"]["items"], artist, track, original_album=album) if sp_track_uri_id: debug_print(f"Match found via URL_SPECIFIC_FULL") except Exception: pass if not sp_track_uri_id: try: response = req.get(url_specific_field, headers=headers, timeout=FUNCTION_TIMEOUT) response.raise_for_status() json_response = response.json() if json_response.get("tracks"): total = json_response["tracks"].get("total", 0) debug_print(f"URL_SPECIFIC_FIELD found {total} tracks") if total > 0: sp_track_uri_id, sp_track_duration = spotify_search_process_track_items(json_response["tracks"]["items"], artist, track, original_album=album) if sp_track_uri_id: debug_print(f"Match found via URL_SPECIFIC_FIELD") except Exception: pass if not sp_track_uri_id: try: response = req.get(url_specific_phrase, headers=headers, timeout=FUNCTION_TIMEOUT) response.raise_for_status() json_response = response.json() if json_response.get("tracks"): total = json_response["tracks"].get("total", 0) debug_print(f"URL_SPECIFIC_PHRASE found {total} tracks") if total > 0: sp_track_uri_id, sp_track_duration = spotify_search_process_track_items(json_response["tracks"]["items"], artist, track, original_album=album) if sp_track_uri_id: debug_print(f"Match found via URL_SPECIFIC_PHRASE") except Exception: pass # If still not found, try a broader search by cleaning the track name track_cleaned = "" if not sp_track_uri_id and re.search(re_search_str, track, re.IGNORECASE): track_cleaned = re.sub(re_replace_str, '', track, flags=re.IGNORECASE).strip() # Sanitize track_cleaned to remove quotes that might break the search query track_cleaned = re.sub(re_chars_to_remove, '', track_cleaned, flags=re.IGNORECASE) if track_cleaned and track_cleaned.lower() != track.lower(): url_cleaned_field = f'https://api.spotify.com/v1/search?q={quote(f"artist:\"{artist_sanitized}\" track:\"{track_cleaned}\"")}&type=track&limit=5' debug_print(f"Spotify search URL_CLEANED_FIELD (fallback): {url_cleaned_field}") try: response = req.get(url_cleaned_field, headers=headers, timeout=FUNCTION_TIMEOUT) response.raise_for_status() json_response = response.json() if json_response.get("tracks"): total = json_response["tracks"].get("total", 0) debug_print(f"URL_CLEANED_FIELD found {total} tracks") if total > 0: sp_track_uri_id, sp_track_duration = spotify_search_process_track_items(json_response["tracks"]["items"], artist, track, cleaned_track=track_cleaned, original_album=album) if sp_track_uri_id: debug_print(f"Match found via URL_CLEANED_FIELD") except Exception: pass # Final fallback: broad search without field qualifiers if not sp_track_uri_id: search_query = f"\"{artist_sanitized}\" \"{track_cleaned if track_cleaned else track_sanitized}\"" url_broad = f'https://api.spotify.com/v1/search?q={quote(search_query)}&type=track&limit=5' debug_print(f"Spotify search URL_BROAD (fallback): {url_broad}") try: response = req.get(url_broad, headers=headers, timeout=FUNCTION_TIMEOUT) response.raise_for_status() json_response = response.json() if json_response.get("tracks"): total = json_response["tracks"].get("total", 0) debug_print(f"URL_BROAD found {total} tracks") if total > 0: sp_track_uri_id, sp_track_duration = spotify_search_process_track_items(json_response["tracks"]["items"], artist, track, cleaned_track=track_cleaned if track_cleaned else None, original_album=album) if sp_track_uri_id: debug_print(f"Match found via URL_BROAD") except Exception: pass return sp_track_uri_id, sp_track_duration def spotify_macos_play_song(sp_track_uri_id, method=SPOTIFY_MACOS_PLAYING_METHOD): if method == "apple-script": # apple-script script = f'tell app "Spotify" to play track "spotify:track:{sp_track_uri_id}"' proc = subprocess.Popen(['osascript', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) stdout, stderr = proc.communicate(script) else: # trigger-url - just trigger track URL in the client subprocess.call(('open', spotify_convert_uri_to_url(f"spotify:track:{sp_track_uri_id}"))) def spotify_macos_play_pause(action, method=SPOTIFY_MACOS_PLAYING_METHOD): if method == "apple-script": # apple-script if str(action).lower() == "pause": script = 'tell app "Spotify" to pause' proc = subprocess.Popen(['osascript', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) stdout, stderr = proc.communicate(script) elif str(action).lower() == "play": script = 'tell app "Spotify" to play' proc = subprocess.Popen(['osascript', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) stdout, stderr = proc.communicate(script) def spotify_linux_play_song(sp_track_uri_id, method=SPOTIFY_LINUX_PLAYING_METHOD): if method == "dbus-send": # dbus-send subprocess.call((f"dbus-send --type=method_call --dest=org.mpris.MediaPlayer2.spotify /org/mpris/MediaPlayer2 org.mpris.MediaPlayer2.Player.OpenUri string:'spotify:track:{sp_track_uri_id}'"), shell=True) elif method == "qdbus": # qdbus subprocess.call((f"qdbus org.mpris.MediaPlayer2.spotify /org/mpris/MediaPlayer2 org.mpris.MediaPlayer2.Player.OpenUri spotify:track:{sp_track_uri_id}"), shell=True) else: # trigger-url - just trigger track URL in the client subprocess.call(('xdg-open', spotify_convert_uri_to_url(f"spotify:track:{sp_track_uri_id}")), stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) def spotify_linux_play_pause(action, method=SPOTIFY_LINUX_PLAYING_METHOD): if method == "dbus-send": # dbus-send if str(action).lower() == "pause": subprocess.call((f"dbus-send --type=method_call --dest=org.mpris.MediaPlayer2.spotify /org/mpris/MediaPlayer2 org.mpris.MediaPlayer2.Player.Pause"), shell=True) elif str(action).lower() == "play": subprocess.call((f"dbus-send --type=method_call --dest=org.mpris.MediaPlayer2.spotify /org/mpris/MediaPlayer2 org.mpris.MediaPlayer2.Player.Play"), shell=True) elif method == "qdbus": # qdbus if str(action).lower() == "pause": subprocess.call((f"qdbus org.mpris.MediaPlayer2.spotify /org/mpris/MediaPlayer2 org.mpris.MediaPlayer2.Player.Pause"), shell=True) elif str(action).lower() == "play": subprocess.call((f"qdbus org.mpris.MediaPlayer2.spotify /org/mpris/MediaPlayer2 org.mpris.MediaPlayer2.Player.Play"), shell=True) def spotify_win_play_song(sp_track_uri_id, method=SPOTIFY_WINDOWS_PLAYING_METHOD): WIN_SPOTIFY_APP_PATH = r'%APPDATA%\Spotify\Spotify.exe' if method == "start-uri": # start-uri subprocess.call((f"start spotify:track:{sp_track_uri_id}"), shell=True) elif method == "spotify-cmd": # spotify-cmd subprocess.call((f"{WIN_SPOTIFY_APP_PATH} --uri=spotify:track:{sp_track_uri_id}"), shell=True) else: # trigger-url - just trigger track URL in the client os.startfile(spotify_convert_uri_to_url(f"spotify:track:{sp_track_uri_id}")) # Finds an optional config file def find_config_file(cli_path=None): """ Search for an optional config file in: 1) CLI-provided path (must exist if given) 2) ./{DEFAULT_CONFIG_FILENAME} 3) ~/.{DEFAULT_CONFIG_FILENAME} 4) script-directory/{DEFAULT_CONFIG_FILENAME} """ if cli_path: p = Path(os.path.expanduser(cli_path)) return str(p) if p.is_file() else None candidates = [ Path.cwd() / DEFAULT_CONFIG_FILENAME, Path.home() / f".{DEFAULT_CONFIG_FILENAME}", Path(__file__).parent / DEFAULT_CONFIG_FILENAME, ] for p in candidates: if p.is_file(): return str(p) return None # Resolves an executable path by checking if it's a valid file or searching in $PATH def resolve_executable(path): if os.path.isfile(path) and os.access(path, os.X_OK): return path found = shutil.which(path) if found: return found raise FileNotFoundError(f"Could not find executable '{path}'") def get_track_info(artist, track, album, network): sp_track_uri_id = None sp_track_duration = 0 track_duration = 0 duration_mark = "" debug_print(f"get_track_info(artist='{artist}', track='{track}', album='{album}')") if (USE_TRACK_DURATION_FROM_SPOTIFY or TRACK_SONGS) and SP_CLIENT_ID and SP_CLIENT_SECRET and SP_CLIENT_ID != "your_spotify_app_client_id" and SP_CLIENT_SECRET != "your_spotify_app_client_secret": try: accessToken = spotify_get_access_token(SP_CLIENT_ID, SP_CLIENT_SECRET) except Exception as e: debug_print(f"* spotify_get_access_token(): {e}") accessToken = None if accessToken: sp_track_uri_id, sp_track_duration = spotify_search_song_trackid_duration(accessToken, artist, track, album) debug_print(f"Spotify search result: id='{sp_track_uri_id}', duration={sp_track_duration}s") if not USE_TRACK_DURATION_FROM_SPOTIFY: sp_track_duration = 0 if sp_track_duration > 0: track_duration = sp_track_duration if not DO_NOT_SHOW_DURATION_MARKS: duration_mark = " S*" else: try: lf_track = pylast.Track(artist, track, network) lf_duration = lf_track.get_duration() debug_print(f"Last.fm fallback: raw duration={lf_duration}ms") if lf_duration and lf_duration > 0: if USE_TRACK_DURATION_FROM_SPOTIFY and not DO_NOT_SHOW_DURATION_MARKS: duration_mark = " L*" # Last.fm returns duration in milliseconds track_duration = int(lf_duration / 1000) except Exception as e: debug_print(f"Last.fm fallback error: {e}") track_duration = 0 debug_print(f"Final track_duration={track_duration}s, duration_mark='{duration_mark}'") return track_duration, sp_track_uri_id, duration_mark # Returns the hex salt and PBKDF2 iteration count used for key derivation def _get_kdf_params() -> Tuple[str, int]: salt_hex = "10368003bf43b4c3230602b970a37e95" iterations = 200000 return salt_hex, iterations # Derive a 32-byte key from the provided password using PBKDF2-HMAC-SHA256. def _derive_key(password: str, salt_hex: str, iterations: int) -> bytes: salt = bytes.fromhex(salt_hex) return hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, iterations, dklen=32) # Returns the base64 payload containing HMAC plus ciphertext def get_payload() -> str: return "JMFkEQDf9n4Cl+c7w4thPigWUj7lTsclulxPBsQznBo8h268HncHU3qwLg==" # Derive the key from password, verify HMAC, XOR-decrypt and return plaintext # The password must be a date in YYYYMMDD format def decode(password: str) -> str: payload_b64 = get_payload() try: raw = base64.b64decode(payload_b64) except Exception as exc: raise ValueError("payload is not valid base64") from exc if len(raw) <= 32: raise ValueError("payload too short") mac_received = raw[:32] cipher = raw[32:] salt_hex, iterations = _get_kdf_params() key = _derive_key(password, salt_hex, iterations) mac_calc = hmac.new(key, cipher, hashlib.sha256).digest() if not hmac.compare_digest(mac_calc, mac_received): raise ValueError("HMAC verification failed") plaintext_bytes = bytes(c ^ key[i % len(key)] for i, c in enumerate(cipher)) try: return plaintext_bytes.decode("utf-8") except Exception as exc: raise ValueError("decrypted bytes are not valid UTF-8") from exc # Main function that monitors activity of the specified Last.fm user def lastfm_monitor_user(user, network, username, tracks, csv_file_name): lf_active_ts_start = 0 lf_active_ts_last = 0 lf_track_ts_start = 0 lf_track_ts_start_old = 0 lf_track_ts_start_after_resume = 0 lf_user_online = False alive_counter = 0 track_duration = 0 playing_paused = False playing_paused_ts = 0 playing_resumed_ts = 0 paused_counter = 0 playing_track = None new_track = None listened_songs = 0 looped_songs = 0 skipped_songs = 0 signal_previous_the_same = False artist = "" track = "" artist_old = "" track_old = "" song_on_loop = 0 recent_songs_session = [] sp_track_uri_id = None sp_track_duration = 0 duration_mark = "" pauses_number = 0 error_500_counter = 0 error_500_start_ts = 0 error_network_issue_counter = 0 error_network_issue_start_ts = 0 friends_check_last_ts = 0 debug_print(f"Starting monitor loop for user: {username}") try: if csv_file_name: init_csv_file(csv_file_name) except Exception as e: print(f"* Error: {e}") lastfm_last_activity_file = f"lastfm_{username}_last_activity.json" last_activity_read = [] last_activity_ts = 0 last_activity_artist = "" last_activity_track = "" last_activity_album = "" if os.path.isfile(lastfm_last_activity_file): try: with open(lastfm_last_activity_file, 'r', encoding="utf-8") as f: last_activity_read = json.load(f) except Exception as e: print(f"* Cannot load last status from '{lastfm_last_activity_file}' file: {e}") if last_activity_read: last_activity_ts = last_activity_read[0] last_activity_artist = last_activity_read[1] last_activity_track = last_activity_read[2] # Album is stored at index 3 if available if len(last_activity_read) > 3: last_activity_album = last_activity_read[3] lastfm_last_activity_file_mdate_dt = datetime.fromtimestamp(int(os.path.getmtime(lastfm_last_activity_file))) lastfm_last_activity_file_mdate = lastfm_last_activity_file_mdate_dt.strftime("%d %b %Y, %H:%M:%S") lastfm_last_activity_file_mdate_weekday = str(calendar.day_abbr[(lastfm_last_activity_file_mdate_dt).weekday()]) print(f"* Last activity loaded from file '{lastfm_last_activity_file}' ({lastfm_last_activity_file_mdate_weekday} {lastfm_last_activity_file_mdate})") try: new_track = user.get_now_playing() recent_tracks = lastfm_get_recent_tracks(username, network, RECENT_TRACKS_NUMBER) except Exception as e: print(f"* Error: {e}") sys.exit(1) # Handle case where user has no tracks yet (fresh account) if not recent_tracks or len(recent_tracks) == 0: print("\n*** User has no tracks yet (fresh account). Waiting for first track to appear...\n") last_track_start_ts_old2 = 0 lf_track_ts_start_old = 0 last_track_start_ts_old = 0 # If user is currently playing music but has no history yet, handle it if new_track is not None: app_started_and_user_offline = False lf_active_ts_start = int(time.time()) lf_active_ts_last = lf_active_ts_start lf_track_ts_start = lf_active_ts_start lf_track_ts_start_after_resume = lf_active_ts_start playing_resumed_ts = lf_active_ts_start song_on_loop = 1 artist = str(new_track.artist) track = str(new_track.title) album = str(new_track.info.get('album', '')) if new_track.info.get('album') else "" artist_old = artist track_old = track last_activity_artist = artist last_activity_track = track playing_track = new_track lf_user_online = True debug_print(f"{username} is now ONLINE (initial track)") print(f"\nTrack:\t\t\t\t{artist} - {track}") print(f"Album:\t\t\t\t{album}") track_duration, sp_track_uri_id, duration_mark = get_track_info(artist, track, album, network) if track_duration > 0: print(f"Duration:\t\t\t{display_time(track_duration)}{duration_mark}") spotify_search_url, apple_search_url, genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, lastfm_url, lastfm_album_url = get_spotify_apple_genius_search_urls(str(artist), str(track), album, network, playing_track) music_urls_output = format_music_urls_console(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url) lyrics_output = format_lyrics_urls_console(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url) if music_urls_output or lyrics_output: print() # Always add newline before first section (music URLs or lyrics) if music_urls_output: print(music_urls_output) if lyrics_output: print(lyrics_output) print("\n*** User is currently ACTIVE (first track) !") listened_songs = 1 recent_songs_session = [{'artist': artist, 'track': track, 'timestamp': lf_track_ts_start, 'skipped': False, 'cont': False}] last_activity_to_save = [] last_activity_to_save.append(lf_track_ts_start) last_activity_to_save.append(artist) last_activity_to_save.append(track) last_activity_to_save.append(album) try: with open(lastfm_last_activity_file, 'w', encoding="utf-8") as f: json.dump(last_activity_to_save, f, indent=2) except Exception as e: print(f"* Cannot save last status to '{lastfm_last_activity_file}' file: {e}") try: if csv_file_name: write_csv_entry(csv_file_name, datetime.fromtimestamp(int(lf_track_ts_start)), artist, track, album) except Exception as e: print(f"* Error: {e}") duration_m_body = "" duration_m_body_html = "" if track_duration > 0: duration_m_body = f"\nDuration: {display_time(track_duration)}{duration_mark}" duration_m_body_html = f"
Duration: {display_time(track_duration)}{duration_mark}" m_subject = f"Last.fm user {username} is active: '{artist} - {track}'" lyrics_urls_text = format_lyrics_urls_email_text(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url) lyrics_urls_html = format_lyrics_urls_email_html(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, artist, track) lyrics_section_text = f"\n{lyrics_urls_text}\n\n" if lyrics_urls_text else "\n\n" lyrics_section_html = f"
{lyrics_urls_html}

" if lyrics_urls_html else "

" # Determine URLs for "Track:" and secondary URL field based on configuration if USE_LASTFM_URL_IN_LAST_PLAYED: track_url = lastfm_url secondary_url = spotify_search_url secondary_url_label = "Spotify URL" else: track_url = spotify_search_url secondary_url = lastfm_url secondary_url_label = "Last.fm URL" music_urls_text = format_music_urls_email_text(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url) music_urls_html = format_music_urls_email_html(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, artist, track, secondary_url, secondary_url_label) music_section_text = f"\n\n{music_urls_text}\n" if music_urls_text else "\n" music_section_html = f"

{music_urls_html}" if music_urls_html else "" # When both music and lyrics are empty, use single

instead of
+

if not music_urls_html and not lyrics_urls_html: music_section_html = "

" lyrics_section_html = "" elif not music_urls_html: music_section_html = "
" m_body = f"Track: {artist} - {track}{duration_m_body}\nAlbum: {album}{music_section_text}{lyrics_section_text}Last activity: {get_date_from_ts(lf_active_ts_last)}{get_cur_ts(nl_ch + 'Timestamp: ')}" album_html = f'{escape(album)}' if (ENABLE_LASTFM_ALBUM_URL and lastfm_album_url) else escape(album) m_body_html = f"Track: {escape(artist)} - {escape(track)}{duration_m_body_html}
Album: {album_html}{music_section_html}{lyrics_section_html}Last activity: {get_date_from_ts(lf_active_ts_last)}{get_cur_ts('
Timestamp: ')}" if ACTIVE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) # If tracking functionality is enabled then play the current song via Spotify client if TRACK_SONGS and sp_track_uri_id: if platform.system() == 'Darwin': # macOS spotify_macos_play_song(sp_track_uri_id) elif platform.system() == 'Windows': # Windows spotify_win_play_song(sp_track_uri_id) else: # Linux variants spotify_linux_play_song(sp_track_uri_id) else: app_started_and_user_offline = True playing_track = None lf_user_online = False lf_active_ts_last = 0 last_activity_artist = "" last_activity_track = "" artist_old = "" track_old = "" print(f"* Last activity:\t\tNo tracks yet") print(f"* Last track:\t\t\tNo tracks yet") print(f"\n*** User is OFFLINE (no tracks yet) !") else: last_track_start_ts_old2 = int(recent_tracks[0].timestamp) lf_track_ts_start_old = last_track_start_ts_old2 # User is offline (does not play music at the moment) if new_track is None: app_started_and_user_offline = True playing_track = None last_track_start_ts_old = 0 lf_user_online = False lf_active_ts_last = int(recent_tracks[0].timestamp) if lf_active_ts_last >= last_activity_ts: last_activity_artist = recent_tracks[0].track.artist last_activity_track = recent_tracks[0].track.title if recent_tracks[0].album: last_activity_album = str(recent_tracks[0].album) elif lf_active_ts_last < last_activity_ts and last_activity_ts > 0: lf_active_ts_last = last_activity_ts last_activity_dt = datetime.fromtimestamp(lf_active_ts_last).strftime("%d %b %Y, %H:%M:%S") last_activity_ts_weekday = str(calendar.day_abbr[(datetime.fromtimestamp(lf_active_ts_last)).weekday()]) artist_old = str(last_activity_artist) track_old = str(last_activity_track) print(f"* Last activity:\t\t{last_activity_ts_weekday} {last_activity_dt}") print(f"* Last track:\t\t\t{last_activity_artist} - {last_activity_track}") if last_activity_album: print(f"* Last album:\t\t\t{last_activity_album}") track_duration, sp_track_uri_id, duration_mark = get_track_info(last_activity_artist, last_activity_track, last_activity_album, network) if track_duration > 0: print(f"* Last track duration:\t\t{display_time(track_duration)}{duration_mark}") spotify_search_url, apple_search_url, genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, lastfm_url, lastfm_album_url = get_spotify_apple_genius_search_urls(str(last_activity_artist), str(last_activity_track), last_activity_album, network) music_urls_output = format_music_urls_console(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url) lyrics_output = format_lyrics_urls_console(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url) if music_urls_output or lyrics_output: print() # Always add newline before first section (music URLs or lyrics) if music_urls_output: print(music_urls_output) if lyrics_output: print(f"{lyrics_output}\n") elif not music_urls_output: print() # Add newline before "User is OFFLINE" when both music and lyrics are disabled elif music_urls_output: print() # Add newline after music URLs when lyrics are disabled print(f"*** User is OFFLINE for {calculate_timespan(int(time.time()), lf_active_ts_last, show_seconds=False)} !") # User is online (plays music at the moment) else: app_started_and_user_offline = False lf_active_ts_start = int(time.time()) lf_active_ts_last = lf_active_ts_start lf_track_ts_start = lf_active_ts_start lf_track_ts_start_after_resume = lf_active_ts_start playing_resumed_ts = lf_active_ts_start song_on_loop = 1 artist = str(new_track.artist) track = str(new_track.title) album = str(new_track.info.get('album', '')) if new_track.info.get('album') else "" artist_old = artist track_old = track print(f"\nTrack:\t\t\t\t{artist} - {track}") print(f"Album:\t\t\t\t{album}") track_duration, sp_track_uri_id, duration_mark = get_track_info(artist, track, album, network) if track_duration > 0: print(f"Duration:\t\t\t{display_time(track_duration)}{duration_mark}") spotify_search_url, apple_search_url, genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, lastfm_url, lastfm_album_url = get_spotify_apple_genius_search_urls(str(artist), str(track), album, network, new_track) music_urls_output = format_music_urls_console(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url) lyrics_output = format_lyrics_urls_console(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url) if music_urls_output or lyrics_output: print() # Always add newline before first section (music URLs or lyrics) if music_urls_output: print(music_urls_output) if lyrics_output: print(lyrics_output) print("\n*** User is currently ACTIVE !") listened_songs = 1 recent_songs_session = [{'artist': artist, 'track': track, 'timestamp': lf_track_ts_start, 'skipped': False, 'cont': False}] last_activity_to_save = [] last_activity_to_save.append(lf_track_ts_start) last_activity_to_save.append(artist) last_activity_to_save.append(track) last_activity_to_save.append(album) try: with open(lastfm_last_activity_file, 'w', encoding="utf-8") as f: json.dump(last_activity_to_save, f, indent=2) except Exception as e: print(f"* Cannot save last status to '{lastfm_last_activity_file}' file: {e}") try: if csv_file_name: write_csv_entry(csv_file_name, datetime.fromtimestamp(int(lf_track_ts_start)), artist, track, album) except Exception as e: print(f"* Error: {e}") duration_m_body = "" duration_m_body_html = "" if track_duration > 0: duration_m_body = f"\nDuration: {display_time(track_duration)}{duration_mark}" duration_m_body_html = f"
Duration: {display_time(track_duration)}{duration_mark}" m_subject = f"Last.fm user {username} is active: '{artist} - {track}'" lyrics_urls_text = format_lyrics_urls_email_text(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url) lyrics_urls_html = format_lyrics_urls_email_html(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, artist, track) lyrics_section_text = f"\n{lyrics_urls_text}\n\n" if lyrics_urls_text else "\n\n" lyrics_section_html = f"
{lyrics_urls_html}

" if lyrics_urls_html else "

" # Determine URLs for "Track:" and secondary URL field based on configuration if USE_LASTFM_URL_IN_LAST_PLAYED: track_url = lastfm_url secondary_url = spotify_search_url secondary_url_label = "Spotify URL" else: track_url = spotify_search_url secondary_url = lastfm_url secondary_url_label = "Last.fm URL" music_urls_text = format_music_urls_email_text(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url) music_urls_html = format_music_urls_email_html(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, artist, track, secondary_url, secondary_url_label) music_section_text = f"\n\n{music_urls_text}\n" if music_urls_text else "\n" music_section_html = f"

{music_urls_html}" if music_urls_html else "" # When both music and lyrics are empty, use single

instead of
+

if not music_urls_html and not lyrics_urls_html: music_section_html = "

" lyrics_section_html = "" elif not music_urls_html: music_section_html = "
" m_body = f"Track: {artist} - {track}{duration_m_body}\nAlbum: {album}{music_section_text}{lyrics_section_text}Last activity: {get_date_from_ts(lf_active_ts_last)}{get_cur_ts(nl_ch + 'Timestamp: ')}" album_html = f'{escape(album)}' if (ENABLE_LASTFM_ALBUM_URL and lastfm_album_url) else escape(album) m_body_html = f"Track: {escape(artist)} - {escape(track)}{duration_m_body_html}
Album: {album_html}{music_section_html}{lyrics_section_html}Last activity: {get_date_from_ts(lf_active_ts_last)}{get_cur_ts('
Timestamp: ')}" if ACTIVE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) playing_track = new_track # If user has tracks, use the first one's timestamp, otherwise use current time if recent_tracks and len(recent_tracks) > 0: last_track_start_ts_old = int(recent_tracks[0].timestamp) else: last_track_start_ts_old = lf_active_ts_start lf_user_online = True # If tracking functionality is enabled then play the current song via Spotify client # Only play when user is online and actively playing if TRACK_SONGS and sp_track_uri_id: if platform.system() == 'Darwin': # macOS spotify_macos_play_song(sp_track_uri_id) elif platform.system() == 'Windows': # Windows spotify_win_play_song(sp_track_uri_id) else: # Linux variants spotify_linux_play_song(sp_track_uri_id) i = 0 p = 0 duplicate_entries = False print("\nList of recently listened tracks:\n") if not recent_tracks or len(recent_tracks) == 0: print("(No tracks yet)") else: for previous, t, nxt in previous_and_next(reversed(recent_tracks)): i += 1 print(f'{i}\t{datetime.fromtimestamp(int(t.timestamp)).strftime("%d %b %Y, %H:%M:%S")}\t{calendar.day_abbr[(datetime.fromtimestamp(int(t.timestamp))).weekday()]}\t{t.track}') if previous: if previous.timestamp == t.timestamp: p += 1 duplicate_entries = True print("DUPLICATE ENTRY") if duplicate_entries: print(f"*** Duplicate entries ({p}) found, possible PRIVATE MODE") print(f"\nTracks/albums to monitor: {tracks}") print_cur_ts("\nTimestamp:\t\t\t") email_sent = False tracks_upper = {t.upper() for t in tracks} # Initialize friends/followers tracking if enabled if TRACK_FOLLOWINGS or TRACK_FOLLOWERS: print(f"* Friends/followers tracking enabled") # Do initial check try: followings_file_exists = os.path.isfile(f"lastfm_{username}_followings.json") followers_file_exists = os.path.isfile(f"lastfm_{username}_followers.json") # Load existing state if available if TRACK_FOLLOWINGS and followings_file_exists: followings_loaded = load_friends_state(username, 'followings') followings_count = len(followings_loaded) print(f"* Loading followings for user {username} from file lastfm_{username}_followings.json ({followings_count})") if TRACK_FOLLOWERS and followers_file_exists: followers_loaded = load_friends_state(username, 'followers') followers_count = len(followers_loaded) print(f"* Loading followers for user {username} from file lastfm_{username}_followers.json ({followers_count})") # Perform initial check to build baseline # We use raise_on_error=True so initialization failures (e.g. scraping issues) are visible initial_changes = check_friends_changes(username, TRACK_FOLLOWINGS, TRACK_FOLLOWERS, save_state=True, raise_on_error=True) # Announce baseline creation for missing files if TRACK_FOLLOWINGS and not followings_file_exists: if os.path.isfile(f"lastfm_{username}_followings.json"): followings_count = len(load_friends_state(username, 'followings')) print(f"* Saving followings for user {username} to file lastfm_{username}_followings.json ({followings_count})") if TRACK_FOLLOWERS and not followers_file_exists: if os.path.isfile(f"lastfm_{username}_followers.json"): followers_count = len(load_friends_state(username, 'followers')) print(f"* Saving followers for user {username} to file lastfm_{username}_followers.json ({followers_count})") # Only notify if there are real changes (not initial fetch/baseline build) if initial_changes: # Filter out initial additions (baseline) from notification to_notify = {} if 'followings' in initial_changes and not followings_file_exists: pass # Handled by "Saving baseline" above elif 'followings' in initial_changes: to_notify['followings'] = initial_changes['followings'] if 'followers' in initial_changes and not followers_file_exists: pass # Handled by "Saving baseline" above elif 'followers' in initial_changes: to_notify['followers'] = initial_changes['followers'] if to_notify: notify_friends_changes(username, to_notify, skip_initial_line=True) else: # Baseline was built but no "real" changes to report print_cur_ts("\nTimestamp:\t\t\t") else: # No changes detected during baseline build print_cur_ts("\nTimestamp:\t\t\t") except Exception as e: print(f"* Warning: Initial friends check failed: {e}") print_cur_ts("\nTimestamp:\t\t\t") friends_check_last_ts = int(time.time()) # Main loop friends_pending_changes = None friends_streak = 0 friends_next_check_ts = 0 while True: try: # Check for friends/followers changes if enabled and interval has passed if (TRACK_FOLLOWINGS or TRACK_FOLLOWERS) and FRIENDS_CHECK_INTERVAL > 0: current_ts = int(time.time()) # Determine if it's time for a regular check or a retry check do_check = False is_retry = False if friends_streak != 0: # We are in a confirmation/retry streak (change or error) if current_ts >= friends_next_check_ts: do_check = True is_retry = True elif (current_ts - friends_check_last_ts) >= FRIENDS_CHECK_INTERVAL: # Regular check interval reached do_check = True if do_check: try: # Use save_state=False by default to avoid saving to file during suspected transient changes # Use raise_on_error=True to detect check failures and avoid resetting streak changes = check_friends_changes(username, TRACK_FOLLOWINGS, TRACK_FOLLOWERS, save_state=False, raise_on_error=True) # Reset error streak on any successful check if friends_streak < 0: friends_streak = 0 if changes: if changes == friends_pending_changes: friends_streak += 1 else: friends_pending_changes = changes friends_streak = 1 if friends_streak >= FRIENDS_CHANGE_COUNTER: # Final confirmation after enough checks # Now call it again with save_state=True to persist changes check_friends_changes(username, TRACK_FOLLOWINGS, TRACK_FOLLOWERS, save_state=True, raise_on_error=True) notify_friends_changes(username, changes, skip_initial_line=not PROGRESS_INDICATOR) friends_streak = 0 friends_pending_changes = None friends_check_last_ts = current_ts else: # Suspected transient change, schedule retry retry_interval = FRIENDS_RETRY_INTERVAL friends_next_check_ts = current_ts + retry_interval # Show streak info with details change_details = [] for key in ['followings', 'followers']: if key in changes: c = changes[key] diff = c['current_count'] - c['previous_count'] diff_str = f"{diff:+d}" if diff != 0 else "0" change_details.append(f"{key}: {c['previous_count']} -> {c['current_count']} ({diff_str})") detail_str = "; ".join(change_details) print(f"* Suspected transient change ({detail_str}) (streak {friends_streak}/{FRIENDS_CHANGE_COUNTER}); will confirm in {display_time(retry_interval)}") print_cur_ts("Timestamp:\t\t\t") else: # No changes or back to baseline if friends_streak > 0: # Recovered from a suspected change print(f"* Friend/follower count recovered back to normal baseline after {friends_streak} suspected transient checks") print_cur_ts("Timestamp:\t\t\t") friends_streak = 0 friends_pending_changes = None if not is_retry: friends_check_last_ts = current_ts # If it wasn't a retry, we update the timestamp to update lastfm baseline file check_friends_changes(username, TRACK_FOLLOWINGS, TRACK_FOLLOWERS, save_state=True, raise_on_error=True) except Exception as e: if friends_streak == 0: # Start measuring error streak (negative values) friends_streak = -1 elif friends_streak < 0: # Continue error streak friends_streak -= 1 if friends_streak > 0: # We were tracking a change but hit an error retry_interval = FRIENDS_RETRY_INTERVAL friends_next_check_ts = current_ts + retry_interval print(f"* Error during friends check: {e}") print(f"* Preserving confirmation streak ({friends_streak}/{FRIENDS_CHANGE_COUNTER}) despite error; will retry in {display_time(retry_interval)}") print_cur_ts("Timestamp:\t\t\t") else: # Error streak logic (negative streak) current_error_streak = abs(friends_streak) # Throttling: Alert on threshold, then every 10 attempts if current_error_streak == FRIENDS_CHANGE_COUNTER or (current_error_streak > FRIENDS_CHANGE_COUNTER and (current_error_streak - FRIENDS_CHANGE_COUNTER) % 10 == 0): print(f"* Error confirming friends (attempt {current_error_streak}): {e}") print_cur_ts("Timestamp:\t\t\t") retry_interval = FRIENDS_RETRY_INTERVAL friends_next_check_ts = current_ts + retry_interval debug_print(f"Fetching now playing / recent tracks...") recent_tracks = lastfm_get_recent_tracks(username, network, 1) # Handle case where user still has no tracks if not recent_tracks or len(recent_tracks) == 0: # Wait for first track to appear time.sleep(LASTFM_ACTIVE_CHECK_INTERVAL) continue last_track_start_ts = int(recent_tracks[0].timestamp) new_track = user.get_now_playing() email_sent = False lf_current_ts = int(time.time()) - LASTFM_ACTIVE_CHECK_INTERVAL # Detecting new Last.fm entries when user is offline if not lf_user_online: # If this is the first track appearing (user had no tracks before) if last_track_start_ts_old2 == 0: debug_print("First track appeared!") print("\n*** First track appeared! Starting monitoring...\n") last_track_start_ts_old2 = last_track_start_ts lf_track_ts_start_old = last_track_start_ts if last_track_start_ts > last_track_start_ts_old2: debug_print(f"Detected new entries while offline ({last_track_start_ts} > {last_track_start_ts_old2})") print("\n*** New last.fm entries showed up while user was offline!\n") lf_track_ts_start_old = last_track_start_ts duplicate_entries = False i = 0 added_entries_list = "" try: recent_tracks_while_offline = lastfm_get_recent_tracks(username, network, 100) for previous, t, nxt in previous_and_next(reversed(recent_tracks_while_offline)): if int(t.timestamp) > int(last_track_start_ts_old2): if 0 <= (lf_track_ts_start + LASTFM_ACTIVE_CHECK_INTERVAL - int(t.timestamp)) <= 60: continue print(f'{datetime.fromtimestamp(int(t.timestamp)).strftime("%d %b %Y, %H:%M:%S")}\t{calendar.day_abbr[(datetime.fromtimestamp(int(t.timestamp))).weekday()]}\t{t.track}') added_entries_list += f'{datetime.fromtimestamp(int(t.timestamp)).strftime("%d %b %Y, %H:%M:%S")}, {calendar.day_abbr[(datetime.fromtimestamp(int(t.timestamp))).weekday()]}: {t.track}\n' i += 1 if previous: if previous.timestamp == t.timestamp: duplicate_entries = True print("DUPLICATE ENTRY") if csv_file_name: write_csv_entry(csv_file_name, datetime.fromtimestamp(int(t.timestamp)), str(t.track.artist), str(t.track.title), str(t.album)) except Exception as e: print(f"* Error: {e}") if i > 0 and OFFLINE_ENTRIES_NOTIFICATION: if added_entries_list: added_entries_list_mbody = f"\n\n{added_entries_list}" m_subject = f"Last.fm user {username}: new entries showed up while user was offline" m_body = f"New last.fm entries showed up while user was offline!{added_entries_list_mbody}{get_cur_ts(nl_ch + 'Timestamp: ')}" print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, "", SMTP_SSL) print_cur_ts("\nTimestamp:\t\t\t") # User is online (plays music at the moment) if new_track is not None: # User paused music earlier if playing_paused is True and lf_user_online: playing_resumed_ts = lf_current_ts lf_track_ts_start_after_resume += (playing_resumed_ts - playing_paused_ts) paused_counter += (int(playing_resumed_ts) - int(playing_paused_ts)) print(f"User RESUMED playing after {calculate_timespan(int(playing_resumed_ts), int(playing_paused_ts))}") print_cur_ts("\nTimestamp:\t\t\t") # If tracking functionality is enabled then RESUME the current song via Spotify client if TRACK_SONGS: if platform.system() == 'Darwin': # macOS spotify_macos_play_pause("play") elif platform.system() == 'Windows': # Windows pass else: # Linux variants spotify_linux_play_pause("play") playing_paused = False # Trying to overcome the issue with Last.fm API reporting newly played song (but still continues the same) if (lf_current_ts <= (lf_track_ts_start + 20)) and (last_track_start_ts > last_track_start_ts_old) and (new_track == playing_track): last_track_start_ts_old = last_track_start_ts # Track has changed if (new_track != playing_track or (last_track_start_ts > last_track_start_ts_old and last_track_start_ts > lf_track_ts_start_old - 20)): alive_counter = 0 if new_track == playing_track: song_on_loop += 1 if song_on_loop == SONG_ON_LOOP_VALUE: looped_songs += 1 else: song_on_loop = 1 playing_track = new_track artist = str(playing_track.artist) track = str(playing_track.title) album = str(playing_track.info.get('album', '')) if playing_track.info.get('album') else "" info = playing_track.info played_for_m_body = "" played_for_m_body_html = "" # Handling how long user played the previous track, if skipped it etc. - in case track duration is available if track_duration > 0 and lf_track_ts_start_after_resume > 0 and lf_user_online: played_for_display = False played_for_time = lf_current_ts - lf_track_ts_start_after_resume listened_percentage = (played_for_time) / (track_duration - 1) if (played_for_time) < (track_duration - LASTFM_ACTIVE_CHECK_INTERVAL - 1): played_for = f"{display_time(played_for_time)} (out of {display_time(track_duration)})" played_for_html = f"{display_time(played_for_time)} (out of {display_time(track_duration)})" if listened_percentage <= SKIPPED_SONG_THRESHOLD2: if signal_previous_the_same: played_for += f" - CONT ({int(listened_percentage * 100)}%)" played_for_html += f" - CONT ({int(listened_percentage * 100)}%)" signal_previous_the_same = False # Mark previous track as CONT in recent_songs_session if len(recent_songs_session) > 0 and recent_songs_session[-1]['artist'] == artist_old and recent_songs_session[-1]['track'] == track_old: recent_songs_session[-1]['cont'] = True else: played_for += f" - SKIPPED ({int(listened_percentage * 100)}%)" played_for_html += f" - SKIPPED ({int(listened_percentage * 100)}%)" skipped_songs += 1 # Mark previous track as skipped in recent_songs_session if len(recent_songs_session) > 0 and recent_songs_session[-1]['artist'] == artist_old and recent_songs_session[-1]['track'] == track_old: recent_songs_session[-1]['skipped'] = True else: played_for += f" ({int(listened_percentage * 100)}%)" played_for_html += f" ({int(listened_percentage * 100)}%)" played_for_display = True else: played_for = display_time(played_for_time) played_for_html = played_for if listened_percentage >= LONGER_SONG_THRESHOLD1 or (played_for_time - track_duration >= LONGER_SONG_THRESHOLD2): played_for += f" - LONGER than track duration (+ {display_time(played_for_time - track_duration)}, {int(listened_percentage * 100)}%)" played_for_html += f" - LONGER than track duration (+ {display_time(played_for_time - track_duration)}, {int(listened_percentage * 100)}%)" played_for_display = True if played_for_display: played_for_m_body = f"\n\nUser played the previous track ({artist_old} - {track_old}) for: {played_for}" played_for_m_body_html = f"

User played the previous track ({escape(artist_old)} - {escape(track_old)}) for: {played_for_html}" if PROGRESS_INDICATOR: print("─" * HORIZONTAL_LINE) print(f"User played the previous track for: {played_for}") if not PROGRESS_INDICATOR: print("─" * HORIZONTAL_LINE) # Handling how long user played the previous track, if skipped it etc. - in case track duration is NOT available elif track_duration <= 0 and lf_track_ts_start_after_resume > 0 and lf_user_online: played_for = display_time(lf_current_ts - lf_track_ts_start_after_resume) played_for_html = f"{display_time(lf_current_ts - lf_track_ts_start_after_resume)}" if ((lf_current_ts - lf_track_ts_start_after_resume) <= SKIPPED_SONG_THRESHOLD1): if signal_previous_the_same: played_for_m_body = f"\n\nUser CONT the previous track ({artist_old} - {track_old}) for: {played_for}" played_for_m_body_html = f"

User CONT the previous track ({escape(artist_old)} - {escape(track_old)}) for: {played_for_html}" played_for_str = f"User CONT the previous track for {played_for}" signal_previous_the_same = False # Mark previous track as CONT in recent_songs_session if len(recent_songs_session) > 0 and recent_songs_session[-1]['artist'] == artist_old and recent_songs_session[-1]['track'] == track_old: recent_songs_session[-1]['cont'] = True else: skipped_songs += 1 played_for_m_body = f"\n\nUser SKIPPED the previous track ({artist_old} - {track_old}) after: {played_for}" played_for_m_body_html = f"

User SKIPPED the previous track ({escape(artist_old)} - {escape(track_old)}) after: {played_for_html}" played_for_str = f"User SKIPPED the previous track after {played_for}" # Mark previous track as skipped in recent_songs_session if len(recent_songs_session) > 0 and recent_songs_session[-1]['artist'] == artist_old and recent_songs_session[-1]['track'] == track_old: recent_songs_session[-1]['skipped'] = True else: played_for_m_body = f"\n\nUser played the previous track ({artist_old} - {track_old}) for: {played_for}" played_for_m_body_html = f"

User played the previous track ({escape(artist_old)} - {escape(track_old)}) for: {played_for_html}" played_for_str = f"User played the previous track for: {played_for}" if PROGRESS_INDICATOR: print("─" * HORIZONTAL_LINE) print(played_for_str) if not PROGRESS_INDICATOR: print("─" * HORIZONTAL_LINE) if PROGRESS_INDICATOR: print("─" * HORIZONTAL_LINE) print(f"Last.fm user:\t\t\t{username}\n") listened_songs += 1 # Clearing the flag used to indicate CONT songs (continued from previous playing session) if listened_songs == 2: signal_previous_the_same = False if lf_track_ts_start > 0: lf_track_ts_start_old = lf_track_ts_start lf_track_ts_start = lf_current_ts lf_track_ts_start_after_resume = lf_track_ts_start last_track_start_ts_old = last_track_start_ts # Add current song to recent songs session list recent_songs_session.append({ 'artist': artist, 'track': track, 'timestamp': lf_track_ts_start, 'skipped': False, 'cont': False }) # Keep only last INACTIVE_EMAIL_RECENT_SONGS_COUNT songs (or 5 if not set) max_songs = INACTIVE_EMAIL_RECENT_SONGS_COUNT if INACTIVE_EMAIL_RECENT_SONGS_COUNT > 0 else 5 if len(recent_songs_session) > max_songs: recent_songs_session.pop(0) print(f"Track:\t\t\t\t{artist} - {track}") print(f"Album:\t\t\t\t{album}") track_duration, sp_track_uri_id, duration_mark = get_track_info(artist, track, album, network) if track_duration > 0: print(f"Duration:\t\t\t{display_time(track_duration)}{duration_mark}") spotify_search_url, apple_search_url, genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, lastfm_url, lastfm_album_url = get_spotify_apple_genius_search_urls(str(artist), str(track), album, network, playing_track) music_urls_output = format_music_urls_console(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url) if music_urls_output: print(f"\n{music_urls_output}") lyrics_output = format_lyrics_urls_console(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url) if lyrics_output: if not music_urls_output: print() # Add newline before lyrics when music URLs are disabled print(lyrics_output) last_activity_to_save = [] last_activity_to_save.append(lf_track_ts_start) last_activity_to_save.append(artist) last_activity_to_save.append(track) last_activity_to_save.append(album) try: with open(lastfm_last_activity_file, 'w', encoding="utf-8") as f: json.dump(last_activity_to_save, f, indent=2) except Exception as e: print(f"* Cannot save last status to '{lastfm_last_activity_file}' file: {e}") duration_m_body = "" duration_m_body_html = "" if track_duration > 0: duration_m_body = f"\nDuration: {display_time(track_duration)}{duration_mark}" duration_m_body_html = f"
Duration: {display_time(track_duration)}{duration_mark}" # If tracking functionality is enabled then play the current song via Spotify client if TRACK_SONGS and sp_track_uri_id: if platform.system() == 'Darwin': # macOS spotify_macos_play_song(sp_track_uri_id) elif platform.system() == 'Windows': # Windows spotify_win_play_song(sp_track_uri_id) else: # Linux variants spotify_linux_play_song(sp_track_uri_id) # User was offline and got active # Handle case where user had no tracks initially and is becoming active for the first time if (not lf_user_online and lf_active_ts_start == 0) or (not lf_user_online and (lf_track_ts_start - lf_active_ts_last) > LASTFM_INACTIVITY_CHECK and lf_active_ts_last > 0) or (not lf_user_online and lf_active_ts_last > 0 and app_started_and_user_offline): app_started_and_user_offline = False last_track_start_changed = "" last_track_start_changed_html = "" lf_active_ts_last_old = lf_active_ts_last # If user had no tracks initially, lf_active_ts_last will be 0, so skip the check if lf_active_ts_last > 0 and last_track_start_ts > (lf_active_ts_last + 60) and (int(time.time()) - last_track_start_ts > 240): last_track_start_changed = f"\n(last track start changed from {get_short_date_from_ts(lf_active_ts_last)} to {get_short_date_from_ts(last_track_start_ts)} - offline mode ?)" last_track_start_changed_html = f"
(last track start changed from {get_short_date_from_ts(lf_active_ts_last)} to {get_short_date_from_ts(last_track_start_ts)} - offline mode ?)" lf_active_ts_last = last_track_start_ts duplicate_entries = False private_mode = "" private_mode_html = "" try: p = 0 recent_tracks_while_offline = lastfm_get_recent_tracks(username, network, RECENT_TRACKS_NUMBER) for previous, t, nxt in previous_and_next(reversed(recent_tracks_while_offline)): if previous: if previous.timestamp == t.timestamp: p += 1 duplicate_entries = True except Exception as e: print(f"* Error: {e}") if duplicate_entries: private_mode = f"\n\nDuplicate entries ({p}) found, possible private mode ({get_range_of_dates_from_tss(lf_active_ts_last_old, lf_track_ts_start, short=True)})" private_mode_html = f"

Duplicate entries ({p}) found, possible private mode ({get_range_of_dates_from_tss(lf_active_ts_last_old, lf_track_ts_start, short=True)})" print(f"\n*** Duplicate entries ({p}) found, possible PRIVATE MODE ({get_range_of_dates_from_tss(lf_active_ts_last_old, lf_track_ts_start, short=True)})") # Only show timespan if user had previous activity if lf_active_ts_last > 0: print(f"\n*** User got ACTIVE after being offline for {calculate_timespan(int(lf_track_ts_start), int(lf_active_ts_last))}{last_track_start_changed}") print(f"*** Last activity:\t\t{get_date_from_ts(lf_active_ts_last)}") else: print(f"\n*** User got ACTIVE (first track)") # We signal that the currently played song is the same as previous one before user got inactive, so might be continuation of previous track if artist_old == artist and track_old == track: signal_previous_the_same = True else: signal_previous_the_same = False paused_counter = 0 listened_songs = 1 skipped_songs = 0 looped_songs = 0 pauses_number = 0 lf_active_ts_start = lf_track_ts_start playing_resumed_ts = lf_track_ts_start recent_songs_session = [{'artist': artist, 'track': track, 'timestamp': lf_track_ts_start, 'skipped': False, 'cont': False}] # Handle email subject and body - only include timespan if user had previous activity if lf_active_ts_last > 0: m_subject = f"Last.fm user {username} is active: '{artist} - {track}' (after {calculate_timespan(int(lf_track_ts_start), int(lf_active_ts_last), show_seconds=False)} - {get_short_date_from_ts(lf_active_ts_last)})" offline_timespan = calculate_timespan(int(lf_track_ts_start), int(lf_active_ts_last)) last_activity_text = f"\n\nLast activity: {get_date_from_ts(lf_active_ts_last)}" last_activity_html = f"

Last activity: {get_date_from_ts(lf_active_ts_last)}" else: m_subject = f"Last.fm user {username} is active: '{artist} - {track}'" offline_timespan = "" last_activity_text = "" last_activity_html = "" lyrics_urls_text = format_lyrics_urls_email_text(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url) lyrics_urls_html = format_lyrics_urls_email_html(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, artist, track) lyrics_section_text = f"\n{lyrics_urls_text}" if lyrics_urls_text else "" lyrics_section_html = f"
{lyrics_urls_html}" if lyrics_urls_html else "" # Determine URLs for "Track:" and secondary URL field based on configuration if USE_LASTFM_URL_IN_LAST_PLAYED: track_url = lastfm_url secondary_url = spotify_search_url secondary_url_label = "Spotify URL" else: track_url = spotify_search_url secondary_url = lastfm_url secondary_url_label = "Last.fm URL" music_urls_text = format_music_urls_email_text(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url) music_urls_html = format_music_urls_email_html(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, artist, track, secondary_url, secondary_url_label) music_section_text = f"\n\n{music_urls_text}\n" if music_urls_text else "\n" music_section_html = f"

{music_urls_html}" if music_urls_html else "" # When both music and lyrics are empty, don't add

here because there's a hardcoded

after played_for_m_body_html if not music_urls_html and not lyrics_urls_html: music_section_html = "" lyrics_section_html = "" elif not music_urls_html: music_section_html = "
" if lf_active_ts_last > 0: m_body = f"Track: {artist} - {track}{duration_m_body}\nAlbum: {album}{music_section_text}{lyrics_section_text}{played_for_m_body}\n\nFriend got active after being offline for {offline_timespan}{last_track_start_changed}{private_mode}{last_activity_text}{get_cur_ts(nl_ch + 'Timestamp: ')}" album_html = f'{escape(album)}' if (ENABLE_LASTFM_ALBUM_URL and lastfm_album_url) else escape(album) m_body_html = f"Track: {escape(artist)} - {escape(track)}{duration_m_body_html}
Album: {album_html}{music_section_html}{lyrics_section_html}{played_for_m_body_html}

Friend got active after being offline for {offline_timespan}{last_track_start_changed_html}{private_mode_html}{last_activity_html}{get_cur_ts('
Timestamp: ')}" else: lyrics_section_text_fresh = f"\n{lyrics_urls_text}\n" if lyrics_urls_text else "\n" lyrics_section_html_fresh = f"
{lyrics_urls_html}
" if lyrics_urls_html else "
" # When both music and lyrics are empty, check if played_for_m_body_html is empty # If it's empty, we need

before timestamp; if not, it already starts with

if not music_urls_html and not lyrics_urls_html: if not played_for_m_body_html: music_section_html = "

" else: music_section_html = "" lyrics_section_html_fresh = "" elif not music_urls_html: music_section_html = "
" m_body = f"Track: {artist} - {track}{duration_m_body}\nAlbum: {album}{music_section_text}{lyrics_section_text_fresh}{played_for_m_body}{get_cur_ts(nl_ch + 'Timestamp: ')}" album_html = f'{escape(album)}' if (ENABLE_LASTFM_ALBUM_URL and lastfm_album_url) else escape(album) m_body_html = f"Track: {escape(artist)} - {escape(track)}{duration_m_body_html}
Album: {album_html}{music_section_html}{lyrics_section_html_fresh}{played_for_m_body_html}{get_cur_ts('
Timestamp: ')}" if ACTIVE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) email_sent = True if (TRACK_NOTIFICATION or SONG_NOTIFICATION) and not email_sent: timespan_str = f"\n\nSongs Played: {listened_songs}" timespan_str_html = f"

Songs Played: {listened_songs}" # Only show timespan if lf_active_ts_start is properly set (not 0) and different from current track start if lf_active_ts_start > 0 and lf_track_ts_start != lf_active_ts_start: timespan = calculate_timespan(int(lf_track_ts_start), int(lf_active_ts_start)) timespan_str += f" ({timespan})" timespan_str_html += f" ({timespan})" m_subject = f"Last.fm user {username}: '{artist} - {track}'" lyrics_urls_text = format_lyrics_urls_email_text(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url) lyrics_urls_html = format_lyrics_urls_email_html(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, artist, track) # Determine URLs for "Track:" and secondary URL field based on configuration if USE_LASTFM_URL_IN_LAST_PLAYED: track_url = lastfm_url secondary_url = spotify_search_url secondary_url_label = "Spotify URL" else: track_url = spotify_search_url secondary_url = lastfm_url secondary_url_label = "Last.fm URL" music_urls_text = format_music_urls_email_text(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url) music_urls_html = format_music_urls_email_html(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, artist, track, secondary_url, secondary_url_label) music_section_text = f"\n\n{music_urls_text}\n" if music_urls_text else "\n" music_section_html = f"

{music_urls_html}" if music_urls_html else "" lyrics_section_text = f"\n{lyrics_urls_text}" if lyrics_urls_text else "" lyrics_section_html = f"
{lyrics_urls_html}" if lyrics_urls_html else "" # When both music and lyrics are empty, don't add

here because there's a hardcoded

in get_cur_ts if not music_urls_html and not lyrics_urls_html: music_section_html = "" lyrics_section_html = "" elif not music_urls_html: music_section_html = "
" m_body = f"Track: {artist} - {track}{duration_m_body}\nAlbum: {album}{music_section_text}{lyrics_section_text}{played_for_m_body}{timespan_str}{get_cur_ts(nl_ch + nl_ch + 'Timestamp: ')}" album_html = f'{escape(album)}' if (ENABLE_LASTFM_ALBUM_URL and lastfm_album_url) else escape(album) m_body_html = f"Track: {escape(artist)} - {escape(track)}{duration_m_body_html}
Album: {album_html}{music_section_html}{lyrics_section_html}{played_for_m_body_html}{timespan_str_html}{get_cur_ts('

Timestamp: ')}" # Check for loop first, before track/song notifications if song_on_loop == SONG_ON_LOOP_VALUE: print("─" * HORIZONTAL_LINE) print(f"User plays song on LOOP ({song_on_loop} times)") print("─" * HORIZONTAL_LINE) if song_on_loop == SONG_ON_LOOP_VALUE and SONG_ON_LOOP_NOTIFICATION: timespan_str = f"\n\nSongs Played: {listened_songs}" timespan_str_html = f"

Songs Played: {listened_songs}" # Only show timespan if lf_active_ts_start is properly set (not 0) and different from current track start if lf_active_ts_start > 0 and lf_track_ts_start != lf_active_ts_start: timespan = calculate_timespan(int(lf_track_ts_start), int(lf_active_ts_start)) timespan_str += f" ({timespan})" timespan_str_html += f" ({timespan})" m_subject = f"Last.fm user {username} plays song on loop: '{artist} - {track}'" lyrics_urls_text = format_lyrics_urls_email_text(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url) lyrics_urls_html = format_lyrics_urls_email_html(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, artist, track) # Determine URLs for "Track:" and secondary URL field based on configuration if USE_LASTFM_URL_IN_LAST_PLAYED: track_url = lastfm_url secondary_url = spotify_search_url secondary_url_label = "Spotify URL" else: track_url = spotify_search_url secondary_url = lastfm_url secondary_url_label = "Last.fm URL" music_urls_text = format_music_urls_email_text(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url) music_urls_html = format_music_urls_email_html(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, artist, track, secondary_url, secondary_url_label) music_section_text = f"\n\n{music_urls_text}\n" if music_urls_text else "\n" music_section_html = f"

{music_urls_html}" if music_urls_html else "" lyrics_section_text = f"\n{lyrics_urls_text}" if lyrics_urls_text else "" lyrics_section_html = f"
{lyrics_urls_html}" if lyrics_urls_html else "" # When both music and lyrics are empty, don't add

here because there's a hardcoded

before "User plays song on LOOP" if not music_urls_html and not lyrics_urls_html: music_section_html = "" lyrics_section_html = "" elif not music_urls_html: music_section_html = "
" m_body = f"Track: {artist} - {track}{duration_m_body}\nAlbum: {album}{music_section_text}{lyrics_section_text}{played_for_m_body}\n\nUser plays song on LOOP ({song_on_loop} times){timespan_str}{get_cur_ts(nl_ch + nl_ch + 'Timestamp: ')}" album_html = f'{escape(album)}' if (ENABLE_LASTFM_ALBUM_URL and lastfm_album_url) else escape(album) m_body_html = f"Track: {escape(artist)} - {escape(track)}{duration_m_body_html}
Album: {album_html}{music_section_html}{lyrics_section_html}{played_for_m_body_html}

User plays song on LOOP ({song_on_loop} times){timespan_str_html}{get_cur_ts('

Timestamp: ')}" print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) email_sent = True # Send track/song notifications only if loop notification was not sent if track.upper() in tracks_upper or album.upper() in tracks_upper: print("\n*** Track/album matched with the list!") if TRACK_NOTIFICATION and not email_sent: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) email_sent = True if SONG_NOTIFICATION and not email_sent: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) email_sent = True lf_user_online = True lf_active_ts_last = int(time.time()) artist_old = artist track_old = track try: if csv_file_name: write_csv_entry(csv_file_name, datetime.fromtimestamp(int(lf_track_ts_start)), artist, track, album) except Exception as e: print(f"* Error: {e}") if listened_songs: if lf_track_ts_start == lf_active_ts_start: print(f"\nSongs Played:\t\t\t{listened_songs}") else: # Only show timespan if lf_active_ts_start is properly set (not 0) and different from current track start if lf_active_ts_start > 0 and lf_track_ts_start != lf_active_ts_start: print(f"\nSongs Played:\t\t\t{listened_songs} ({calculate_timespan(int(lf_track_ts_start), int(lf_active_ts_start))})") else: print(f"\nSongs Played:\t\t\t{listened_songs}") print_cur_ts("\nTimestamp:\t\t\t") # Track has not changed, user is online and continues playing else: lf_active_ts_last = int(time.time()) # We display progress indicator if flag is enabled if lf_user_online and PROGRESS_INDICATOR: ts = datetime.fromtimestamp(lf_active_ts_last).strftime('%H:%M:%S') delta_ts = lf_active_ts_last - lf_track_ts_start_after_resume if delta_ts > 0: delta_diff_str = "%02d:%02d:%02d" % (delta_ts // 3600, delta_ts // 60 % 60, delta_ts % 60) else: delta_diff_str = "00:00:00" print(f"# {ts} +{delta_diff_str}") # User is offline (does not play music at the moment) else: alive_counter += 1 # User paused playing the music if ((int(time.time()) - lf_active_ts_last) > (LASTFM_ACTIVE_CHECK_INTERVAL * LASTFM_BREAK_CHECK_MULTIPLIER)) and lf_user_online and lf_active_ts_last > 0 and lf_active_ts_start > 0 and (LASTFM_ACTIVE_CHECK_INTERVAL * LASTFM_BREAK_CHECK_MULTIPLIER) < LASTFM_INACTIVITY_CHECK and LASTFM_BREAK_CHECK_MULTIPLIER > 0 and playing_paused is False: playing_paused = True playing_paused_ts = lf_active_ts_last pauses_number += 1 if PROGRESS_INDICATOR: print("─" * HORIZONTAL_LINE) print(f"User PAUSED playing after {calculate_timespan(int(playing_resumed_ts), int(playing_paused_ts))} (inactivity timer: {display_time(LASTFM_BREAK_CHECK_MULTIPLIER * LASTFM_ACTIVE_CHECK_INTERVAL)})") print(f"Last activity:\t\t\t{get_date_from_ts(lf_active_ts_last)}") print_cur_ts("\nTimestamp:\t\t\t") # If tracking functionality is enabled then PAUSE the current song via Spotify client if TRACK_SONGS: if platform.system() == 'Darwin': # macOS spotify_macos_play_pause("pause") elif platform.system() == 'Windows': # Windows pass else: # Linux variants spotify_linux_play_pause("pause") # User got inactive if ((int(time.time()) - lf_active_ts_last) > LASTFM_INACTIVITY_CHECK) and lf_user_online and lf_active_ts_last > 0 and lf_active_ts_start > 0: lf_user_online = False played_for_m_body = "" played_for_m_body_html = "" # Handling how long user played the last track - in case track duration is available if track_duration > 0 and lf_track_ts_start_after_resume > 0: played_for_time = lf_active_ts_last - lf_track_ts_start_after_resume listened_percentage = (played_for_time) / (track_duration - 1) if (played_for_time) < (track_duration - LASTFM_ACTIVE_CHECK_INTERVAL - 1): played_for = f"{display_time(played_for_time)} (out of {display_time(track_duration)})" played_for_html = f"{display_time(played_for_time)} (out of {display_time(track_duration)})" played_for += f" ({int(listened_percentage * 100)}%)" played_for_html += f" ({int(listened_percentage * 100)}%)" else: played_for = display_time(played_for_time) played_for_html = f"{display_time(played_for_time)}" played_for_m_body = f"\n\nUser played the last track for: {played_for}" played_for_m_body_html = f"

User played the last track for: {played_for_html}" print(f"User played the last track for: {played_for}") if not PROGRESS_INDICATOR: print("─" * HORIZONTAL_LINE) # Handling how long user played the last track - in case track duration is NOT available elif track_duration <= 0 and lf_track_ts_start_after_resume > 0: played_for = display_time((lf_active_ts_last) - lf_track_ts_start_after_resume) played_for_m_body = f"\n\nUser played the last track for: {played_for}" played_for_m_body_html = f"

User played the last track for: {played_for}" played_for_str = f"User played the last track for: {played_for}" print(played_for_str) if not PROGRESS_INDICATOR: print("─" * HORIZONTAL_LINE) if PROGRESS_INDICATOR: print("─" * HORIZONTAL_LINE) print(f"*** User got INACTIVE after listening to music for {calculate_timespan(int(lf_active_ts_last), int(lf_active_ts_start))}") print(f"*** User played music from {get_range_of_dates_from_tss(lf_active_ts_start, lf_active_ts_last, short=True, between_sep=' to ')}") playing_resumed_ts = int(time.time()) paused_mbody = "" paused_mbody_html = "" pauses_number -= 1 if paused_counter > 0: paused_percentage = int((paused_counter / (int(lf_active_ts_last) - int(lf_active_ts_start))) * 100) print(f"*** User paused music {pauses_number} times for {display_time(paused_counter)} ({paused_percentage}%)") paused_mbody = f"\nUser paused music {pauses_number} times for {display_time(paused_counter)} ({paused_percentage}%)" paused_mbody_html = f"
User paused music {pauses_number} times for {display_time(paused_counter)} ({paused_percentage}%)" paused_counter = 0 listened_songs_text = f"*** User played {listened_songs} songs" listened_songs_mbody = f"\n\nUser played {listened_songs} songs" listened_songs_mbody_html = f"

User played {listened_songs} songs" if skipped_songs > 0: skipped_songs_text = f", skipped {skipped_songs} songs ({int((skipped_songs / listened_songs) * 100)}%)" listened_songs_text += skipped_songs_text listened_songs_mbody += skipped_songs_text listened_songs_mbody_html += f", skipped {skipped_songs} songs ({int((skipped_songs / listened_songs) * 100)}%)" if looped_songs > 0: looped_songs_text = f"\n*** User played {looped_songs} songs on loop" looped_songs_mbody = f"\nUser played {looped_songs} songs on loop" looped_songs_mbody_html = f"
User played {looped_songs} songs on loop" listened_songs_text += looped_songs_text listened_songs_mbody += looped_songs_mbody listened_songs_mbody_html += looped_songs_mbody_html print(f"{listened_songs_text}\n") print(f"*** Last activity:\t\t{get_date_from_ts(lf_active_ts_last)} (inactive timer: {display_time(LASTFM_INACTIVITY_CHECK)})") # If tracking functionality is enabled then either pause the current song via Spotify client or play the indicated SP_USER_GOT_OFFLINE_TRACK_ID "finishing" song if TRACK_SONGS: if SP_USER_GOT_OFFLINE_TRACK_ID: if platform.system() == 'Darwin': # macOS spotify_macos_play_song(SP_USER_GOT_OFFLINE_TRACK_ID) if SP_USER_GOT_OFFLINE_DELAY_BEFORE_PAUSE > 0: time.sleep(SP_USER_GOT_OFFLINE_DELAY_BEFORE_PAUSE) spotify_macos_play_pause("pause") elif platform.system() == 'Windows': # Windows pass else: # Linux variants spotify_linux_play_song(SP_USER_GOT_OFFLINE_TRACK_ID) if SP_USER_GOT_OFFLINE_DELAY_BEFORE_PAUSE > 0: time.sleep(SP_USER_GOT_OFFLINE_DELAY_BEFORE_PAUSE) spotify_linux_play_pause("pause") else: if platform.system() == 'Darwin': # macOS spotify_macos_play_pause("pause") elif platform.system() == 'Windows': # Windows pass else: # Linux variants spotify_linux_play_pause("pause") last_activity_to_save = [] last_activity_to_save.append(lf_active_ts_last) last_activity_to_save.append(artist) last_activity_to_save.append(track) last_activity_to_save.append(album) try: with open(lastfm_last_activity_file, 'w', encoding="utf-8") as f: json.dump(last_activity_to_save, f, indent=2) except Exception as e: print(f"* Cannot save last status to '{lastfm_last_activity_file}' file: {e}") if INACTIVE_NOTIFICATION: # Format recently listened songs list for email (skip if only 1 song) recent_songs_mbody = "" recent_songs_mbody_html = "" if listened_songs > 1 and len(recent_songs_session) > 0 and INACTIVE_EMAIL_RECENT_SONGS_COUNT > 0: # Get last up to INACTIVE_EMAIL_RECENT_SONGS_COUNT songs songs_to_show = recent_songs_session[-min(INACTIVE_EMAIL_RECENT_SONGS_COUNT, len(recent_songs_session)):] recent_songs_list = [] recent_songs_list_html = [] for song in songs_to_show: song_date = get_date_from_ts(song['timestamp']) marker = "" marker_html = "" if song.get('cont', False): marker = ", CONT" marker_html = ", CONT" elif song.get('skipped', False): marker = ", SKIPPED" marker_html = ", SKIPPED" recent_songs_list.append(f"{song['artist']} - {song['track']} ({song_date}{marker})") recent_songs_list_html.append(f"{escape(song['artist'])} - {escape(song['track'])} ({song_date}{marker_html})") if recent_songs_list: recent_songs_mbody = f"\n\nRecently listened songs in this session:\n" + "\n".join(recent_songs_list) recent_songs_mbody_html = f"

Recently listened songs in this session:
" + "
".join(recent_songs_list_html) m_subject = f"Last.fm user {username} is inactive: '{artist} - {track}' (after {calculate_timespan(int(lf_active_ts_last), int(lf_active_ts_start), show_seconds=False)}: {get_range_of_dates_from_tss(lf_active_ts_start, lf_active_ts_last, short=True)})" # Get URLs for the last played track spotify_search_url, apple_search_url, genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, lastfm_url, lastfm_album_url = get_spotify_apple_genius_search_urls(str(artist), str(track), album, network) lyrics_urls_text = format_lyrics_urls_email_text(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url) lyrics_urls_html = format_lyrics_urls_email_html(genius_search_url, azlyrics_search_url, tekstowo_search_url, musixmatch_search_url, lyrics_com_search_url, artist, track) lyrics_section_text = f"\n{lyrics_urls_text}\n\n" if lyrics_urls_text else "\n\n" lyrics_section_html = f"
{lyrics_urls_html}

" if lyrics_urls_html else "

" # Determine URLs for "Last played:" and secondary URL field based on configuration if USE_LASTFM_URL_IN_LAST_PLAYED: last_played_url = lastfm_url secondary_url = spotify_search_url secondary_url_label = "Spotify URL" else: last_played_url = spotify_search_url secondary_url = lastfm_url secondary_url_label = "Last.fm URL" music_urls_text = format_music_urls_email_text(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url) music_urls_html = format_music_urls_email_html(spotify_search_url, lastfm_url, lastfm_album_url, apple_search_url, youtube_music_search_url, amazon_music_search_url, deezer_search_url, tidal_search_url, artist, track, secondary_url, secondary_url_label) music_section_text = f"\n\n{music_urls_text}\n" if music_urls_text else "\n" music_section_html = f"

{music_urls_html}" if music_urls_html else "" # When both music and lyrics are empty, use single

instead of
+

if not music_urls_html and not lyrics_urls_html: music_section_html = "

" lyrics_section_html = "" elif not music_urls_html: music_section_html = "
" m_body = f"Last played: {artist} - {track}{duration_m_body}\nAlbum: {album}{music_section_text}{lyrics_section_text}User got inactive after listening to music for {calculate_timespan(int(lf_active_ts_last), int(lf_active_ts_start))}\nUser played music from {get_range_of_dates_from_tss(lf_active_ts_start, lf_active_ts_last, short=True, between_sep=' to ')}{paused_mbody}{listened_songs_mbody}{played_for_m_body}{recent_songs_mbody}\n\nLast activity: {get_date_from_ts(lf_active_ts_last)}\nInactivity timer: {display_time(LASTFM_INACTIVITY_CHECK)}{get_cur_ts(nl_ch + 'Timestamp: ')}" album_html = f'{escape(album)}' if (ENABLE_LASTFM_ALBUM_URL and lastfm_album_url) else escape(album) m_body_html = f"Last played: {escape(artist)} - {escape(track)}{duration_m_body_html}
Album: {album_html}{music_section_html}{lyrics_section_html}User got inactive after listening to music for {calculate_timespan(int(lf_active_ts_last), int(lf_active_ts_start))}
User played music from {get_range_of_dates_from_tss(lf_active_ts_start, lf_active_ts_last, short=True, between_sep=' to ')}{paused_mbody_html}{listened_songs_mbody_html}{played_for_m_body_html}{recent_songs_mbody_html}

Last activity: {get_date_from_ts(lf_active_ts_last)}
Inactivity timer: {display_time(LASTFM_INACTIVITY_CHECK)}{get_cur_ts('
Timestamp: ')}" print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) email_sent = True lf_active_ts_start = 0 playing_track = None last_track_start_ts = 0 listened_songs = 0 looped_songs = 0 skipped_songs = 0 pauses_number = 0 recent_songs_session = [] print_cur_ts("\nTimestamp:\t\t\t") if LIVENESS_CHECK_COUNTER and alive_counter >= LIVENESS_CHECK_COUNTER: print_cur_ts("Liveness check, timestamp:\t") alive_counter = 0 # Stuff to do regardless if the user is online or offline if last_track_start_ts > 0: last_track_start_ts_old2 = last_track_start_ts ERROR_500_ZERO_TIME_LIMIT = ERROR_500_TIME_LIMIT + LASTFM_CHECK_INTERVAL if LASTFM_CHECK_INTERVAL * ERROR_500_NUMBER_LIMIT > ERROR_500_ZERO_TIME_LIMIT: ERROR_500_ZERO_TIME_LIMIT = LASTFM_CHECK_INTERVAL * (ERROR_500_NUMBER_LIMIT + 1) if error_500_start_ts and ((int(time.time()) - error_500_start_ts) >= ERROR_500_ZERO_TIME_LIMIT): error_500_start_ts = 0 error_500_counter = 0 ERROR_NETWORK_ZERO_TIME_LIMIT = ERROR_NETWORK_ISSUES_TIME_LIMIT + LASTFM_CHECK_INTERVAL if LASTFM_CHECK_INTERVAL * ERROR_NETWORK_ISSUES_NUMBER_LIMIT > ERROR_NETWORK_ZERO_TIME_LIMIT: ERROR_NETWORK_ZERO_TIME_LIMIT = LASTFM_CHECK_INTERVAL * (ERROR_NETWORK_ISSUES_NUMBER_LIMIT + 1) if error_network_issue_start_ts and ((int(time.time()) - error_network_issue_start_ts) >= ERROR_NETWORK_ZERO_TIME_LIMIT): error_network_issue_start_ts = 0 error_network_issue_counter = 0 except Exception as e: str_matches = ["http code 500", "http code 504", "http code 503", "http code 502"] if any(x in str(e).lower() for x in str_matches): if not error_500_start_ts: error_500_start_ts = int(time.time()) error_500_counter = 1 else: error_500_counter += 1 str_matches = ["timed out", "timeout", "name resolution", "failed to resolve", "family not supported", "429 client", "aborted"] if any(x in str(e).lower() for x in str_matches) or str(e) == '': if not error_network_issue_start_ts: error_network_issue_start_ts = int(time.time()) error_network_issue_counter = 1 else: error_network_issue_counter += 1 if error_500_start_ts and (error_500_counter >= ERROR_500_NUMBER_LIMIT and (int(time.time()) - error_500_start_ts) >= ERROR_500_TIME_LIMIT): print(f"* Error 50x ({error_500_counter}x times in the last {display_time((int(time.time()) - error_500_start_ts))}): '{e}'") print_cur_ts("Timestamp:\t\t\t") error_500_start_ts = 0 error_500_counter = 0 elif error_network_issue_start_ts and (error_network_issue_counter >= ERROR_NETWORK_ISSUES_NUMBER_LIMIT and (int(time.time()) - error_network_issue_start_ts) >= ERROR_NETWORK_ISSUES_TIME_LIMIT): print(f"* Error with network ({error_network_issue_counter}x times in the last {display_time((int(time.time()) - error_network_issue_start_ts))}): '{e}'") print_cur_ts("Timestamp:\t\t\t") error_network_issue_start_ts = 0 error_network_issue_counter = 0 elif not error_500_start_ts and not error_network_issue_start_ts: print(f"* Error: '{e}'") if 'Invalid API key' in str(e) or 'API Key Suspended' in str(e): print("* API key might not be valid anymore!") if ERROR_NOTIFICATION and not email_sent: m_subject = f"lastfm_monitor: API key error! (user: {username})" m_body = f"API key might not be valid anymore: {e}{get_cur_ts(nl_ch + nl_ch + 'Timestamp: ')}" m_body_html = f"API key might not be valid anymore: {escape(str(e))}{get_cur_ts('

Timestamp: ')}" print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) email_sent = True print_cur_ts("Timestamp:\t\t\t") if lf_user_online: check_interval = LASTFM_ACTIVE_CHECK_INTERVAL else: check_interval = LASTFM_CHECK_INTERVAL debug_print(f"Sleeping for {check_interval}s before next check") time.sleep(check_interval) new_track = None def main(): global CLI_CONFIG_PATH, DOTENV_FILE, LIVENESS_CHECK_COUNTER, LASTFM_API_KEY, LASTFM_API_SECRET, SP_CLIENT_ID, SP_CLIENT_SECRET, CSV_FILE, MONITOR_LIST_FILE, FILE_SUFFIX, DISABLE_LOGGING, LF_LOGFILE, ACTIVE_NOTIFICATION, INACTIVE_NOTIFICATION, TRACK_NOTIFICATION, SONG_NOTIFICATION, SONG_ON_LOOP_NOTIFICATION, OFFLINE_ENTRIES_NOTIFICATION, ERROR_NOTIFICATION, LASTFM_CHECK_INTERVAL, LASTFM_ACTIVE_CHECK_INTERVAL, LASTFM_INACTIVITY_CHECK, TRACK_SONGS, PROGRESS_INDICATOR, USE_TRACK_DURATION_FROM_SPOTIFY, DO_NOT_SHOW_DURATION_MARKS, LASTFM_BREAK_CHECK_MULTIPLIER, SMTP_PASSWORD, stdout_bck, SP_TOKENS_FILE, TRACK_FOLLOWINGS, TRACK_FOLLOWERS, FRIENDS_CHECK_INTERVAL, FOLLOWERS_NOTIFICATION, FOLLOWINGS_NOTIFICATION, FRIENDS_CHANGE_COUNTER, FRIENDS_RETRY_INTERVAL, DEBUG_MODE, LASTFM_USERNAME_GLOBAL if "--generate-config" in sys.argv: print(CONFIG_BLOCK.strip("\n")) sys.exit(0) if "--version" in sys.argv: print(f"{os.path.basename(sys.argv[0])} v{VERSION}") sys.exit(0) stdout_bck = sys.stdout signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) clear_screen(CLEAR_SCREEN) print(f"Last.fm Monitoring Tool v{VERSION}\n") parser = argparse.ArgumentParser( prog="lastfm_monitor", description=("Monitor a Last.fm user's scrobbles and send customizable email alerts [ https://github.com/misiektoja/lastfm_monitor/ ]"), formatter_class=argparse.RawTextHelpFormatter ) # Positional parser.add_argument( "username", nargs="?", metavar="LASTFM_USERNAME", help="Last.fm username to monitor" ) # Version, just to list in help, it is handled earlier parser.add_argument( "--version", action="version", version=f"%(prog)s v{VERSION}" ) # Configuration & dotenv files conf = parser.add_argument_group("Configuration & dotenv files") conf.add_argument( "--config-file", dest="config_file", metavar="PATH", help="Location of the optional config file", ) conf.add_argument( "--generate-config", action="store_true", help="Print default config template and exit", ) conf.add_argument( "--env-file", dest="env_file", metavar="PATH", help="Path to optional dotenv file (auto-search if not set, disable with 'none')", ) # API credentials creds = parser.add_argument_group("API credentials") creds.add_argument( "-u", "--lastfm-api-key", dest="lastfm_api_key", metavar="LASTFM_API_KEY", help="Last.fm API key" ) creds.add_argument( "-w", "--lastfm-secret", dest="lastfm_secret", metavar="LASTFM_API_SECRET", help="Last.fm API secret" ) creds.add_argument( "-z", "--spotify-creds", dest="spotify_creds", metavar='SPOTIFY_CLIENT_ID:SPOTIFY_CLIENT_SECRET', help="Spotify OAuth app client credentials - specify both values as SPOTIFY_CLIENT_ID:SPOTIFY_CLIENT_SECRET" ) # Notifications notify = parser.add_argument_group("Notifications") notify.add_argument( "-a", "--notify-active", dest="notify_active", action="store_true", default=None, help="Email when user becomes active" ) notify.add_argument( "-i", "--notify-inactive", dest="notify_inactive", action="store_true", default=None, help="Email when user goes inactive" ) notify.add_argument( "-t", "--notify-track", dest="notify_track", action="store_true", default=None, help="Email when a monitored track/album plays" ) notify.add_argument( "-j", "--notify-song-changes", dest="notify_song_changes", action="store_true", default=None, help="Email on every song change" ) notify.add_argument( "-f", "--notify-offline-entries", dest="notify_offline_entries", action="store_true", default=None, help="Email when new scrobbles arrive while user is offline" ) notify.add_argument( "-x", "--notify-loop", dest="notify_loop", action="store_true", default=None, help="Email when user plays a song on loop" ) notify.add_argument( "--notify-followers", dest="notify_followers", action="store_true", default=None, help="Email when followers change" ) notify.add_argument( "--notify-followings", dest="notify_followings", action="store_true", default=None, help="Email when followings (friends) change" ) notify.add_argument( "-e", "--no-error-notify", action="store_false", dest="notify_errors", default=None, help="Disable email on errors (e.g. invalid API key)" ) notify.add_argument( "--send-test-email", dest="send_test_email", action="store_true", help="Send a test email to verify SMTP settings" ) # Intervals & Timers times = parser.add_argument_group("Intervals & timers") times.add_argument( "-c", "--check-interval", dest="check_interval", metavar="SECONDS", type=int, help="Polling interval when user is offline" ) times.add_argument( "-k", "--active-interval", dest="active_interval", metavar="SECONDS", type=int, help="Polling interval when user is active" ) times.add_argument( "-o", "--offline-timer", dest="offline_timer", metavar="SECONDS", type=int, help="Time to mark inactive user as offline" ) times.add_argument( "-m", "--break-multiplier", dest="break_multiplier", metavar="N", type=int, help="Detect play breaks as N×active-interval" ) times.add_argument( "--friends-check-interval", dest="friends_check_interval", metavar="SECONDS", type=int, help="How often to check for followers/followings changes" ) times.add_argument( "--friends-change-counter", dest="friends_change_counter", metavar="N", type=int, help="Number of consecutive checks to confirm friend changes" ) times.add_argument( "--friends-retry-interval", dest="friends_retry_interval", metavar="SECONDS", type=int, help="Retry timeout for friend change confirmation" ) # Listing mode listing = parser.add_argument_group("Listing") listing.add_argument( "-l", "--list-recent", dest="list_recent", action="store_true", help="Print the user's most recent tracks" ) listing.add_argument( "-n", "--recent-count", dest="recent_count", metavar="N", type=int, help="Number of recent tracks to list (use with -l)" ) # Features & Output opts = parser.add_argument_group("Features & output") opts.add_argument( "-p", "--progress", dest="progress", action="store_true", default=None, help="Show a progress indicator while user is listening" ) opts.add_argument( "-g", "--track-in-spotify", dest="track_in_spotify", action="store_true", default=None, help="Auto-play each scrobble in your Spotify client" ) opts.add_argument( "-r", "--fetch-duration", dest="fetch_duration", action="store_true", default=None, help="Fetch track duration from Spotify when credentials are set" ) opts.add_argument( "-q", "--hide-duration-source", dest="hide_duration_source", action="store_true", default=None, help="Do not show whether duration came from Last.fm or Spotify" ) opts.add_argument( "-b", "--csv-file", dest="csv_file", metavar="CSV_FILE", type=str, help="Write every scrobble to a CSV file" ) opts.add_argument( "-s", "--monitor-list", dest="monitor_list", metavar="TRACKS_FILE", type=str, help="Filename with tracks/albums to alert on" ) opts.add_argument( "--track-followings", dest="track_followings", action="store_true", default=None, help="Track changes in user's followings (friends)" ) opts.add_argument( "--track-followers", dest="track_followers", action="store_true", default=None, help="Track changes in user's followers" ) opts.add_argument( "-d", "--disable-logging", dest="disable_logging", action="store_true", default=None, help="Disable logging to lastfm_monitor_.log" ) opts.add_argument( "--debug", dest="debug_mode", action="store_true", default=None, help="Enable debug mode (full API traces, internal logic logs)" ) args = parser.parse_args() if len(sys.argv) == 1: parser.print_help(sys.stderr) sys.exit(1) if args.config_file: CLI_CONFIG_PATH = os.path.expanduser(args.config_file) cfg_path = find_config_file(CLI_CONFIG_PATH) if not cfg_path and CLI_CONFIG_PATH: print(f"* Error: Config file '{CLI_CONFIG_PATH}' does not exist") sys.exit(1) if cfg_path: try: with open(cfg_path, "r") as cf: exec(cf.read(), globals()) except Exception as e: print(f"* Error loading config file '{cfg_path}': {e}") sys.exit(1) if args.env_file: DOTENV_FILE = os.path.expanduser(args.env_file) else: if DOTENV_FILE: DOTENV_FILE = os.path.expanduser(DOTENV_FILE) if DOTENV_FILE and DOTENV_FILE.lower() == 'none': env_path = None else: try: from dotenv import load_dotenv, find_dotenv if DOTENV_FILE: env_path = DOTENV_FILE if not os.path.isfile(env_path): print(f"* Warning: dotenv file '{env_path}' does not exist\n") else: load_dotenv(env_path, override=True) else: env_path = find_dotenv() or None if env_path: load_dotenv(env_path, override=True) except ImportError: env_path = DOTENV_FILE if DOTENV_FILE else None if env_path: print(f"* Warning: Cannot load dotenv file '{env_path}' because 'python-dotenv' is not installed\n\nTo install it, run:\n pip install python-dotenv\n\nOnce installed, re-run this tool\n") if env_path: for secret in SECRET_KEYS: val = os.getenv(secret) if val is not None: globals()[secret] = val if not check_internet(): sys.exit(1) if args.send_test_email: print("* Sending test email notification ...\n") if send_email("lastfm_monitor: test email", "This is test email - your SMTP settings seems to be correct !", "", SMTP_SSL, smtp_timeout=5) == 0: print("* Email sent successfully !") else: sys.exit(1) sys.exit(0) if not args.username: print("* Error: LASTFM_USERNAME argument is required !") sys.exit(1) if args.lastfm_api_key: LASTFM_API_KEY = args.lastfm_api_key if args.lastfm_secret: LASTFM_API_SECRET = args.lastfm_secret if not LASTFM_API_KEY or LASTFM_API_KEY == "your_lastfm_api_key": print("* Error: LASTFM_API_KEY (-u / --lastfm_api_key) value is empty or incorrect") sys.exit(1) if not LASTFM_API_SECRET or LASTFM_API_SECRET == "your_lastfm_api_secret": print("* Error: LASTFM_API_SECRET (-w / --lastfm-secret) value is empty or incorrect") sys.exit(1) if args.debug_mode is True: DEBUG_MODE = True LASTFM_USERNAME_GLOBAL = args.username if args.spotify_creds: try: SP_CLIENT_ID, SP_CLIENT_SECRET = args.spotify_creds.split(":") except ValueError: print("* Error: -z / --spotify-creds has invalid format - use SP_CLIENT_ID:SP_CLIENT_SECRET") sys.exit(1) if SP_TOKENS_FILE: SP_TOKENS_FILE = os.path.expanduser(SP_TOKENS_FILE) if args.fetch_duration: USE_TRACK_DURATION_FROM_SPOTIFY = args.fetch_duration if args.check_interval: LASTFM_CHECK_INTERVAL = args.check_interval LIVENESS_CHECK_COUNTER = LIVENESS_CHECK_INTERVAL / LASTFM_CHECK_INTERVAL if args.active_interval: LASTFM_ACTIVE_CHECK_INTERVAL = args.active_interval if args.offline_timer: LASTFM_INACTIVITY_CHECK = args.offline_timer if args.break_multiplier: LASTFM_BREAK_CHECK_MULTIPLIER = args.break_multiplier network = pylast.LastFMNetwork(LASTFM_API_KEY, LASTFM_API_SECRET) user = network.get_user(args.username) if args.csv_file: CSV_FILE = os.path.expanduser(args.csv_file) else: if CSV_FILE: CSV_FILE = os.path.expanduser(CSV_FILE) if CSV_FILE: try: with open(CSV_FILE, 'a', newline='', buffering=1, encoding="utf-8") as _: pass except Exception as e: print(f"* Error: CSV file cannot be opened for writing: {e}") sys.exit(1) if args.list_recent: if args.recent_count and args.recent_count > 0: tracks_n = args.recent_count else: tracks_n = 30 try: lastfm_list_tracks(args.username, user, network, tracks_n, CSV_FILE) except Exception as e: print(f"* Error: {e}") sys.exit(1) sys.exit(0) if args.monitor_list: MONITOR_LIST_FILE = os.path.expanduser(args.monitor_list) else: if MONITOR_LIST_FILE: MONITOR_LIST_FILE = os.path.expanduser(MONITOR_LIST_FILE) if MONITOR_LIST_FILE: try: try: with open(MONITOR_LIST_FILE, encoding="utf-8") as file: lines = file.read().splitlines() except UnicodeDecodeError: with open(MONITOR_LIST_FILE, encoding="cp1252") as file: lines = file.read().splitlines() lf_tracks = [ line.strip() for line in lines if line.strip() and not line.strip().startswith("#") ] except Exception as e: print(f"* Error: File with Last.fm tracks cannot be opened: {e}") sys.exit(1) else: lf_tracks = [] if args.disable_logging is True: DISABLE_LOGGING = True if not DISABLE_LOGGING: log_path = Path(os.path.expanduser(LF_LOGFILE)) if log_path.parent != Path('.'): if log_path.suffix == "": log_path = log_path.parent / f"{log_path.name}_{args.username}.log" else: if log_path.suffix == "": log_path = Path(f"{log_path.name}_{args.username}.log") log_path.parent.mkdir(parents=True, exist_ok=True) FINAL_LOG_PATH = str(log_path) sys.stdout = Logger(FINAL_LOG_PATH) else: FINAL_LOG_PATH = None if args.notify_active is True: ACTIVE_NOTIFICATION = True if args.notify_inactive is True: INACTIVE_NOTIFICATION = True if args.notify_track is True: TRACK_NOTIFICATION = True if args.notify_song_changes is True: SONG_NOTIFICATION = True if args.notify_offline_entries is True: OFFLINE_ENTRIES_NOTIFICATION = True if args.notify_loop is True: SONG_ON_LOOP_NOTIFICATION = True if args.notify_errors is False: ERROR_NOTIFICATION = False if args.notify_followers is True: FOLLOWERS_NOTIFICATION = True if args.notify_followings is True: FOLLOWINGS_NOTIFICATION = True if args.track_followings is True: TRACK_FOLLOWINGS = True if args.track_followers is True: TRACK_FOLLOWERS = True # Check for beautifulsoup4 if followers/followings tracking is enabled if TRACK_FOLLOWINGS or TRACK_FOLLOWERS: try: import bs4 # type: ignore except ImportError: print("* Error: beautifulsoup4 is required for followers/followings tracking") print("* Install it with: pip install beautifulsoup4") sys.exit(1) if args.friends_check_interval: FRIENDS_CHECK_INTERVAL = args.friends_check_interval if args.friends_change_counter: FRIENDS_CHANGE_COUNTER = args.friends_change_counter if args.friends_retry_interval: FRIENDS_RETRY_INTERVAL = args.friends_retry_interval if args.track_in_spotify is True: TRACK_SONGS = True if args.progress is True: PROGRESS_INDICATOR = True if args.hide_duration_source is True: DO_NOT_SHOW_DURATION_MARKS = True if args.fetch_duration is True: USE_TRACK_DURATION_FROM_SPOTIFY = True if not USE_TRACK_DURATION_FROM_SPOTIFY: DO_NOT_SHOW_DURATION_MARKS = True if SMTP_HOST.startswith("your_smtp_server_"): ACTIVE_NOTIFICATION = False INACTIVE_NOTIFICATION = False SONG_NOTIFICATION = False TRACK_NOTIFICATION = False OFFLINE_ENTRIES_NOTIFICATION = False SONG_ON_LOOP_NOTIFICATION = False ERROR_NOTIFICATION = False print(f"* Last.fm polling intervals:\t[offline check: {display_time(LASTFM_CHECK_INTERVAL)}] [active check: {display_time(LASTFM_ACTIVE_CHECK_INTERVAL)}]\n*\t\t\t\t[inactivity: {display_time(LASTFM_INACTIVITY_CHECK)}]") if TRACK_FOLLOWINGS or TRACK_FOLLOWERS: print(f"* Friends/followers tracking:\t[followings = {TRACK_FOLLOWINGS}] [followers = {TRACK_FOLLOWERS}]" + (f" [interval: {display_time(FRIENDS_CHECK_INTERVAL)}]" if FRIENDS_CHECK_INTERVAL > 0 else "")) print(f"* Email notifications:\t\t[active = {ACTIVE_NOTIFICATION}] [inactive = {INACTIVE_NOTIFICATION}] [tracked = {TRACK_NOTIFICATION}] [every song = {SONG_NOTIFICATION}]\n*\t\t\t\t[songs on loop = {SONG_ON_LOOP_NOTIFICATION}] [offline entries = {OFFLINE_ENTRIES_NOTIFICATION}] [errors = {ERROR_NOTIFICATION}]\n*\t\t\t\t[followers = {FOLLOWERS_NOTIFICATION}] [followings = {FOLLOWINGS_NOTIFICATION}]") print(f"* Progress indicator:\t\t{PROGRESS_INDICATOR}") print(f"* Track listened songs:\t\t{TRACK_SONGS}") print(f"* Track duration (Spotify):\t{USE_TRACK_DURATION_FROM_SPOTIFY}") print(f"* Show duration marks:\t\t{not DO_NOT_SHOW_DURATION_MARKS}") print(f"* Play break multiplier:\t{LASTFM_BREAK_CHECK_MULTIPLIER} ({display_time(LASTFM_BREAK_CHECK_MULTIPLIER * LASTFM_ACTIVE_CHECK_INTERVAL)})") print(f"* Liveness check:\t\t{bool(LIVENESS_CHECK_INTERVAL)}" + (f" ({display_time(LIVENESS_CHECK_INTERVAL)})" if LIVENESS_CHECK_INTERVAL else "")) print(f"* CSV logging enabled:\t\t{bool(CSV_FILE)}" + (f" ({CSV_FILE})" if CSV_FILE else "")) print(f"* Alert on monitored tracks:\t{bool(MONITOR_LIST_FILE)}" + (f" ({MONITOR_LIST_FILE})" if MONITOR_LIST_FILE else "")) print(f"* Output logging enabled:\t{not DISABLE_LOGGING}" + (f" ({FINAL_LOG_PATH})" if not DISABLE_LOGGING else "")) if TRACK_SONGS or USE_TRACK_DURATION_FROM_SPOTIFY: print(f"* Spotify token cache file:\t{SP_TOKENS_FILE or 'None (memory only)'}") print(f"* Configuration file:\t\t{cfg_path}") print(f"* Dotenv file:\t\t\t{env_path or 'None'}") print(f"* Debug mode:\t\t\t{DEBUG_MODE}\n") # We define signal handlers only for Linux, Unix & MacOS since Windows has limited number of signals supported if platform.system() != 'Windows': signal.signal(signal.SIGUSR1, toggle_active_inactive_notifications_signal_handler) signal.signal(signal.SIGUSR2, toggle_song_notifications_signal_handler) signal.signal(signal.SIGURG, toggle_progress_indicator_signal_handler) signal.signal(signal.SIGCONT, toggle_track_notifications_signal_handler) signal.signal(signal.SIGPIPE, toggle_songs_on_loop_notifications_signal_handler) signal.signal(signal.SIGTRAP, increase_inactivity_check_signal_handler) signal.signal(signal.SIGABRT, decrease_inactivity_check_signal_handler) signal.signal(signal.SIGHUP, reload_secrets_signal_handler) out = f"Monitoring user {args.username}" print(out) # print("-" * len(out)) print("─" * HORIZONTAL_LINE) lastfm_monitor_user(user, network, args.username, lf_tracks, CSV_FILE) sys.stdout = stdout_bck sys.exit(0) if __name__ == "__main__": main()