# Copyright 2019-2023 Peppy Player peppy.player@gmail.com # # This file is part of Peppy Player. # # Peppy Player is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Peppy Player is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Peppy Player. If not, see . import os import sys import logging import sqlite3 from timeit import default_timer as timer from datetime import timedelta from mutagen import File from mutagen.mp4 import MP4 # DB Util constants DEFAULT_TABLE_NAME = "metadata" DEFAULT_SUMMARY_TABLE_NAME = "summary" FOLDER = "folder" FILENAME = "filename" TYPE = "type" GENRE = "genre" ALBUM = "album" COMPOSER = "composer" ARTIST = "artist" PERFORMER = "performer" TITLE = "title" DATE = "date" BASEFOLDER = "basefolder" ORIGINOS = "originos" EXTENSIONS = (".aac", ".ac3", ".aiff", ".ape", ".flac", ".m4a", ".mp3", ".ogg", ".opus", ".wav", ".wma", ".wv") METADATA = [GENRE, ALBUM, COMPOSER, ARTIST, PERFORMER, TITLE, DATE] MP4_METADATA = ["\xa9gen", "\xa9alb", "\xa9wrt", "\xa9ART", "aART", "\xa9nam", "\xa9day"] INFO = ["sample_rate", "channels", "bits_per_sample", "length", "bitrate"] ALL_METADATA = [FOLDER, FILENAME, TYPE] ALL_METADATA.extend(METADATA + INFO) SUMMARY = [BASEFOLDER, ORIGINOS, GENRE, ARTIST, COMPOSER, ALBUM, TITLE, DATE, TYPE, FOLDER, FILENAME] # Collector constants SCANNED_FOLDERS = "Scanned folders: " TIME_TO_PARSE = "time_to_parse" SCAN_TIME = "Scan time (h:mm:ss): " PARSING_TIME = "Parsing time (h:mm:ss): " ESTIMATED_TIME = "Estimated metadata parse time (h:mm:ss): " ESTIMATED_DISK_SPACE = "Estimated disk space for metadata database (bytes): " FILES = "files" TOTAL_TRACKS = "total tracks" TOTAL_FILES = "Total audio files: " FILES_STATISTICS = "Files Statistics" METADATA_STATISTICS = "Metadata Statistics" DATABASE_STATISTICS = "Database Statistics" UPDATE_STATISTICS = "Update Statistics" UP_TO_DATE = "The database file is up-to-date" ERRORS = "Errors: " AVERAGE_PARSE_TIME = 0.016 AVERAGE_METADATA_SIZE = 278 EMPTY_COLLECTION_SIZE = 8192 STARS = 60 PROGRESS_BAR_PREFIX = "Progress:" PROGRESS_BAR_SUFFIX = "Complete" PROGRESS_BAR_LENGTH = 50 BATCH_SIZE = 300 FILE_BATCH_SIZE = 50 FULL_BLOCK_CHARACTER = chr(9608) ADDED_PREFIX = "Added:" ADDED_SUFFIX = "file" UPDATE_TIME = "Update time (h:mm:ss):" class DbUtil(object): """ Database utility class. Keeps the connection to the database and provides utility SQL functions. """ def __init__(self, db_filename=None): """ Initializer. :param db_filename: folder and filename of the database file """ self.db_path = db_filename self.table_name = DEFAULT_TABLE_NAME self.summary_table_name = DEFAULT_SUMMARY_TABLE_NAME self.metadata_keys = METADATA self.info_keys = INFO csv = ",".join([m + " text" for m in ALL_METADATA]) self.CREATE_METADATA_TABLE = f"""CREATE TABLE IF NOT EXISTS {self.table_name} (id integer PRIMARY KEY,{csv});""" csv = ",".join([m + " text" for m in SUMMARY]) self.CREATE_SUMMARY_TABLE = f"""CREATE TABLE IF NOT EXISTS {self.summary_table_name} ({csv});""" csv = ",".join([m for m in ALL_METADATA]) values = ",".join(["?" for _ in ALL_METADATA]) self.INSERT_DATA = f"""INSERT INTO {self.table_name}({csv}) VALUES({values});""" csv = ",".join([m for m in SUMMARY]) values = ",".join(["?" for _ in SUMMARY]) self.INSERT_SUMMARY_DATA = f"""INSERT INTO {self.summary_table_name}({csv}) VALUES({values});""" self.conn = None def is_db_file_available(self): """ Check that the database file exists :return: True - exists, False - doesn't exist """ if not self.db_path or not os.path.isfile(self.db_path): logging.debug(f"""Collection database {self.db_path} not found""") return False else: logging.debug(f"""Collection database {self.db_path} exists""") return True def is_metadata_available(self): """ Check if metadata table exists :return: True - table exists, False - table doesn't exist """ query = f"""SELECT name FROM sqlite_master WHERE type='table' AND name='{self.table_name}';""" if self.run_query(query): return True else: return False def connect(self): """ Connect to the collection database """ try: self.conn = sqlite3.connect(self.db_path, check_same_thread=False) logging.debug(f"""Connected to the collection database {self.db_path}""") if not self.is_metadata_available(): logging.debug("Collection tables don't exist") self.run_command(self.CREATE_METADATA_TABLE) self.run_command(self.CREATE_SUMMARY_TABLE) logging.debug("Created collection tables") except Exception as e: logging.debug(e) def disconnect(self): """ Disconnect from the collection database """ if self.conn: self.conn.close() def run_command(self, command, values=None): """ Run single SQL command in transaction. Rollback if exception. :param command: SQL command :param values: input values """ try: self.conn.execute("begin") if values: self.conn.execute(command, values) else: self.conn.execute(command) self.conn.commit() except Exception as e: self.conn.execute("rollback") logging.debug(e) def run_batch_insert(self, params): """ Run multiple INSERT commands in transaction. Rollback if exception. :param params: list of values for multiple inserts """ try: self.conn.execute("begin") self.conn.executemany(self.INSERT_DATA, params) self.conn.commit() except Exception as e: self.conn.execute("rollback") logging.debug(e) def run_query(self, query): """ Run SELECT query :param query: SQL query :return: param set """ if not self.conn: logging.debug(f"""No connection to the database: {self.db_path}""") return try: cursor = self.conn.cursor() cursor.execute(query) result = cursor.fetchall() return result except Exception as e: logging.debug(e) logging.debug(query) return None def run_parameterized_query(self, query, params): """ Run SELECT query :param query: SQL query :param params: query parameters :return: param set """ if not self.conn: logging.debug(f"""No connection to the database: {self.db_path}""") return try: cursor = self.conn.cursor() cursor.execute(query, params) result = cursor.fetchall() return result except Exception as e: logging.debug(e) logging.debug(query) return None def get_collection_summary(self): """ Get collection summary from the SUMMARY table :return: dictionary with summary """ if not self.is_db_file_available(): return s = self.run_query(f""" SELECT * FROM {self.summary_table_name} """) if not s: logging.debug("Collection summary not found") return None summary = { BASEFOLDER: s[0][0], ORIGINOS: s[0][1], GENRE: s[0][2], ARTIST: s[0][3], COMPOSER: s[0][4], ALBUM: s[0][5], TITLE: s[0][6], DATE: s[0][7], TYPE: s[0][8], FOLDER: s[0][9], FILENAME: s[0][10] } return summary def purge_collection_tables(self): """ Delete collection tables """ command = f"""DROP TABLE IF EXISTS {self.table_name}""" self.run_command(command) command = f"""DROP TABLE IF EXISTS {self.summary_table_name}""" self.run_command(command) self.run_command(self.CREATE_METADATA_TABLE) self.run_command(self.CREATE_SUMMARY_TABLE) logging.debug("Collection deleted") def delete_summary_data(self): """ Delete data from the Summary table """ command = f"""DELETE FROM {self.summary_table_name}""" self.run_command(command) logging.debug("Summary data deleted") class Collector(object): """ Collects audio files metadata and inserts into the database. Reports statistics. """ def __init__(self, dbfile=None, dbutil=None): """ Initializer :param dbfile: database filename :param dbutil: database utility class. Provided if class is used in the player. """ if dbutil: self.dbutil = dbutil else: self.dbutil = DbUtil(db_filename=dbfile) def get_files_statistics(self, base_folder, total_folders, progress_callback=None): """ Collect recursively statistics about all audio files in the specified folder and its subfolders. This function doesn't parse files and doesn't create collection database. :param base_folder: base folder :param total_folders: total number of all subfolders :param progress_callback: callback function to send progress to :return: dictionary with statistics """ stats = {} found_audio_files = {} scanned_folders = 0 total_files = 0 logging.debug(f"""Collecting statistics for files in folder {base_folder}""") start = timer() for _, __, files in os.walk(base_folder, followlinks=True): scanned_folders += 1 for file in files: if not file.lower().endswith(EXTENSIONS): continue ext = file[file.rfind('.') + 1:].lower() total_files += 1 if ext in found_audio_files.keys(): found_audio_files[ext] = found_audio_files[ext] + 1 else: found_audio_files[ext] = 1 if progress_callback: progress_callback(scanned_folders, total_folders) end = timer() stats[SCAN_TIME] = timedelta(seconds=(end - start)) stats[FILES] = found_audio_files stats[SCANNED_FOLDERS] = scanned_folders stats[TOTAL_FILES] = total_files stats[TIME_TO_PARSE] = AVERAGE_PARSE_TIME logging.debug("Finished collecting statistics") return stats def get_database_statistics(self): """ Count all unique values for each metadata column. :return: dictionary with DB stats for each metadata column """ if not self.dbutil.is_db_file_available(): return total_tracks = self.dbutil.run_query(f""" SELECT COUNT(*) FROM {self.dbutil.table_name} """) if total_tracks == None: return None collection_stats = {TOTAL_TRACKS: total_tracks[0][0]} for c in SUMMARY[2:]: r = self.dbutil.run_query(f""" SELECT COUNT(DISTINCT {c}) FROM {self.dbutil.table_name} WHERE LENGTH(TRIM({c})) > 0""") if r: collection_stats[c] = r[0][0] return collection_stats def get_file_metadata(self, folder, filename, ext, meta): """ Prepare audio file metadata :param folder: file folder :param filename: file name :param ext: file extension :param meta: file metadata from mutagen :return: file metadata list of values for insert """ metadata = [] if not folder: # file in the base folder folder = os.sep metadata.append(folder) metadata.append(filename) metadata.append(ext) if meta == None: for _ in range(len(ALL_METADATA) - len(metadata)): metadata.append(None) return metadata if filename.lower().endswith(".mp4") or filename.lower().endswith(".m4a"): m = MP4_METADATA else: m = METADATA for key in m: if meta and (key not in meta.keys() or len(meta[key][0].replace(" ", "").strip()) == 0): v = None else: v = meta[key][0].strip() metadata.append(v) if hasattr(meta, "info"): for key in INFO: metadata.append(getattr(meta.info, key, None)) else: for _ in INFO: metadata.append(None) return metadata def collect_metadata(self, base_folder, total_folders, metadata_callback=None, progress_callback=None): """ Collect audio file metadata :param base_folder: base folder :param base_folder: total number of subfolders :param metadata_callback: callback for reporting progress, called when BATCH_SIZE reached :param progress_callback: callback for reporting progress, called for each new folder :return: dictionary with statistics """ metadata = [] errors = [] num = 0 total_files = 0 scanned_folders = 0 start = timer() if not base_folder: base_folder = os.getcwd() elif base_folder and not os.path.isdir(base_folder): logging.debug(f"""Folder {base_folder} not found""") return for current_folder, _, files in os.walk(base_folder, followlinks=True): scanned_folders += 1 for file in files: if not file.lower().endswith(EXTENSIONS): continue ext = file[file.rfind('.') + 1:] ext = ext.lower() meta = None try: p = os.path.join(current_folder, file) if ext == "mp4" or ext == "m4a": meta = MP4(p) else: meta = File(p, easy=True) except Exception as e: msg = f"""Metadata parsing error in file {file}: {e}""" errors.append(msg) meta_folder = current_folder[len(base_folder):] if meta: m = self.get_file_metadata(meta_folder, file, ext, meta) metadata.append(m) else: metadata.append(self.get_file_metadata(meta_folder, file, ext, None)) num += 1 total_files += 1 if num == BATCH_SIZE: if metadata_callback: metadata_callback(metadata) metadata = [] num = 0 if progress_callback: progress_callback(scanned_folders, total_folders) end = timer() if metadata_callback and metadata: metadata_callback(metadata) stats = { SCANNED_FOLDERS: scanned_folders, TOTAL_FILES: total_files, PARSING_TIME: timedelta(seconds=(end - start)), ERRORS: errors } return stats def create_summary(self, base_folder): """ Create collection summary :param base_folder: base folder name """ logging.debug("Creating summary...") summary = self.get_database_statistics() values = [ base_folder, sys.platform, str(summary[GENRE]), str(summary[ARTIST]), str(summary[COMPOSER]), str(summary[ALBUM]), str(summary[TITLE]), str(summary[DATE]), str(summary[TYPE]), str(summary[FOLDER]), str(summary[FILENAME]) ] self.dbutil.run_command(self.dbutil.INSERT_SUMMARY_DATA, values) logging.debug("Summary created") def create_collection(self, base_folder, total_folders, db_filename, progress_callback=None): """ Create the database collection with audio files metadata :param base_folder: collection base folder :param total_folders: total number of the subfolders in the base folder :param db_filename: collection database filename :param progress_callback: callback for reporting progress :return: dictionary with collection database statistics """ if base_folder and not os.path.isdir(base_folder): logging.debug(f"""Folder {base_folder} not found""") return None self.dbutil.db_path = db_filename self.dbutil.connect() logging.debug("Creating collection") stats = self.collect_metadata(base_folder, total_folders, self.dbutil.run_batch_insert, progress_callback) logging.debug("Collection created") self.create_summary(base_folder) logging.debug("Creation process completed") return stats def update_collection(self, base_folder, total_folders, progress_callback=None): """ Go through the base folder/sub-folders and check if the files exist in the database. If not add the audio files metadata to the database. :param base_folder: collection base folder :param total_folders: total number of the subfolders in the base folder :param progress_callback: callback for reporting progress :return: dictionary with collection database statistics """ if base_folder and not os.path.isdir(base_folder): logging.debug(f"""Folder {base_folder} not found""") return None metadata = [] errors = [] num = 0 scanned_folders = 0 start = timer() new_files_added = 0 if not base_folder: base_folder = os.getcwd() elif base_folder and not os.path.isdir(base_folder): logging.debug(f"""Folder {base_folder} not found""") return None query = f""" SELECT COUNT(DISTINCT ID) FROM {self.dbutil.table_name} WHERE FOLDER = ? AND FILENAME = ? """ for current_folder, _, files in os.walk(base_folder, followlinks=True): scanned_folders += 1 for file in files: if not file.lower().endswith(EXTENSIONS): continue ext = file[file.rfind('.') + 1:] ext = ext.lower() meta = None folder = current_folder[len(base_folder):] filename = file result = self.dbutil.run_parameterized_query(query, (folder, filename)) if result and int(result[0][0]) == 1: continue try: p = os.path.join(current_folder, file) if ext == "mp4" or ext == "m4a": meta = MP4(p) else: meta = File(p, easy=True) except Exception as e: msg = f"""Metadata parsing error in file {file}: {e}""" errors.append(msg) if meta: m = self.get_file_metadata(folder, file, ext, meta) metadata.append(m) else: metadata.append(self.get_file_metadata(folder, file, ext, None)) num += 1 if num == BATCH_SIZE: self.dbutil.run_batch_insert(metadata) logging.debug(metadata) new_files_added += len(metadata) metadata = [] num = 0 if progress_callback: progress_callback(scanned_folders, total_folders) end = timer() if metadata: self.dbutil.run_batch_insert(metadata) new_files_added += len(metadata) if new_files_added > 0: self.dbutil.delete_summary_data() self.create_summary(base_folder) stats = { TOTAL_FILES: new_files_added, PARSING_TIME: timedelta(seconds=(end - start)), ERRORS: errors } return stats def print_files_statistics(self, stats): """ Prepare formatted string with folder statistics :param stats: folder statistics dictionary """ if not stats: return n = int((60 - 2 - len(FILES_STATISTICS)) / 2) s = "\n\n" + "*" * n + " " + FILES_STATISTICS + " " + "*" * n scanned_folders = stats[SCANNED_FOLDERS] s += f"""\n\n{SCANNED_FOLDERS} {scanned_folders}""" s += f"""\n{TOTAL_FILES} {stats[TOTAL_FILES]}""" s += f"""\n{SCAN_TIME} {stats[SCAN_TIME]}""" parse_time = timedelta( seconds=(stats[TIME_TO_PARSE] * stats[TOTAL_FILES])) disk_space = str(EMPTY_COLLECTION_SIZE + (stats[TOTAL_FILES] * AVERAGE_METADATA_SIZE)) s += f"""\n{ESTIMATED_TIME}{parse_time}""" s += f"""\n{ESTIMATED_DISK_SPACE}{disk_space}\n\n""" files = stats[FILES] for k in files.keys(): ext = k.upper() num = files[k] r = f"""{ext}: \t {num}\n""" s += r s += "\n" + "*" * STARS logging.debug(s) def print_metadata_statistics(self, stats): """ Prepare formatted string with metadata statistics :param stats: metadata statistics dictionary """ if not stats: return n = int((STARS - 2 - len(METADATA_STATISTICS)) / 2) s = "\n\n" + "*" * n + " " + METADATA_STATISTICS + " " + "*" * n s += f"""\n\n{SCANNED_FOLDERS} {stats[SCANNED_FOLDERS]}""" s += f"""\n{TOTAL_FILES} {stats[TOTAL_FILES]}""" s += f"""\n{PARSING_TIME} {stats[PARSING_TIME]}""" s += f"""\n{ERRORS} {len(stats[ERRORS])}\n""" if stats[ERRORS]: s += f"""{ERRORS}\n""" for e in stats[ERRORS]: s += f"""{e}\n""" s += "\n" s += "*" * STARS logging.debug(s) def print_update_statistics(self, stats, header): """ Prepare formatted string with folder statistics :param stats: update statistics :param header: stats header """ n = int((STARS - 2 - len(header)) / 2) s = "\n\n" + "*" * n + " " + header + " " + "*" * n if not stats or int(stats[TOTAL_FILES]) == 0: s += f"""\n\n{UP_TO_DATE}\n""" s += "\n" + "*" * STARS logging.debug(s) return if stats[TOTAL_FILES] > 1: suffix = ADDED_SUFFIX + "s" else: suffix = ADDED_SUFFIX s += f"""\n\n{ADDED_PREFIX} {stats[TOTAL_FILES]} {suffix}""" s += f"""\n{UPDATE_TIME} {stats[PARSING_TIME]}""" s += f"""\n{ERRORS} {len(stats[ERRORS])}\n""" s += "\n" + "*" * STARS logging.debug(s) def print_statistics(self, stats, header): """ Prepare formatted string :param stats: stats dictionary :param header: stats header """ if not stats: return n = int((STARS - 2 - len(header)) / 2) s = "\n\n" + "*" * n + " " + header + " " + "*" * n + "\n" for k, v in stats.items(): s += f"""\n{k}: {v}""" s += "\n\n" s += "*" * STARS logging.debug(s) def print_progress_bar(self, current_value, total): """ Show progress bar Modified version of the source which was found here: https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console :param current_value: current value :param total: total values """ percents = f"""{100 * (current_value / float(total)):.2f}""" filled_length = int(round(PROGRESS_BAR_LENGTH * current_value / float(total))) bar = f"""{FULL_BLOCK_CHARACTER * filled_length}{"-" * (PROGRESS_BAR_LENGTH - filled_length)}""" print(f"""\r{PROGRESS_BAR_PREFIX} |{bar}| {percents}% {PROGRESS_BAR_SUFFIX}""", end="") if current_value == total: print() def count_folders(self, base_folder=None, print_log=True): """ Count all folders in specified base folder :param base_folder: base folder :param progress_callback: callback function to send progress to :return: number of folders """ count = 0 start = timer() if not base_folder: base_folder = os.getcwd() elif base_folder and not os.path.isdir(base_folder): logging.debug(f"""Folder {base_folder} not found""") return logging.debug(f"""Counting folders in {base_folder}""") for _ in os.walk(base_folder, followlinks=True): count += 1 if print_log and count % FILE_BATCH_SIZE == 0: print("*", end="", flush=True) if print_log: print() end = timer() elapsed_time = timedelta(seconds=(end - start)) logging.debug(f"""Found {count} folders""") logging.debug(f"""Time spent (h:mm:ss): {elapsed_time}""") return (count, elapsed_time) def main(): import argparse log_handler = logging.StreamHandler(sys.stdout) logging.basicConfig( level=logging.NOTSET, format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', handlers=[log_handler] ) usage = """python collector.py [args]""" examples = """Examples (Windows): python collector.py -h\t\t\t show this help python collector.py files -h\t\t show help for specific command python collector.py files -i c:\\music\t show statistics for files in specific folder python collector.py db -i c:\\peppy.db\t show statistics for specific database file python collector.py create -i c:\\music -o c:\peppy.db create collection database using specified folder and database filename python collector.py update -i c:\\music -o c:\peppy.db update collection database using specified folder and database filename """ parser = argparse.ArgumentParser( usage=usage, formatter_class=argparse.RawDescriptionHelpFormatter, epilog=examples ) subparsers = parser.add_subparsers(dest="command") p = subparsers.add_parser("files", help="show files statistics") p.add_argument("-i", help="audio files root folder", required=True) p = subparsers.add_parser("db", help="show collection database statistics") p.add_argument("-i", help="collection database file name", required=True) p = subparsers.add_parser("create", help="create collection database") p.add_argument("-i", help="audio files root folder", required=True) p.add_argument("-o", help="collection database filename", required=True) p = subparsers.add_parser("update", help="update collection database") p.add_argument("-i", help="audio files root folder", required=True) p.add_argument("-o", help="collection database filename", required=True) try: args = parser.parse_args() except Exception as e: logging.debug(e) sys.exit(1) command = args.command if command == "files": coll = Collector() base_folder = args.i n = coll.count_folders(base_folder) if n: stats = coll.get_files_statistics(base_folder, n[0], coll.print_progress_bar) coll.print_files_statistics(stats) elif command == "db": if args.i and not os.path.isfile(args.i): logging.debug(f"""Collection database file {args.i} not found""") sys.exit(1) coll = Collector(args.i) coll.dbutil.connect() stats = coll.dbutil.get_collection_summary() coll.print_statistics(stats, DATABASE_STATISTICS) elif command == "create": base_folder = args.i if base_folder.endswith(os.sep): base_folder = base_folder[:-1] db_filename = args.o if db_filename and os.path.isfile(db_filename): logging.debug(f"""Collection database file {db_filename} exists already""") sys.exit(1) coll = Collector(db_filename) coll.dbutil.connect() n = coll.count_folders(base_folder) if n: stats = coll.create_collection(base_folder, n[0], db_filename, coll.print_progress_bar) coll.print_metadata_statistics(stats) elif command == "update": base_folder = args.i if base_folder.endswith(os.sep): base_folder = base_folder[:-1] db_filename = args.o if db_filename and not os.path.isfile(db_filename): logging.debug(f"""Collection database file {db_filename} not found""") sys.exit(1) coll = Collector(db_filename) coll.dbutil.connect() n = coll.count_folders(base_folder) if n: stats = coll.update_collection(base_folder, n[0], coll.print_progress_bar) coll.print_update_statistics(stats, UPDATE_STATISTICS) if __name__ == '__main__': main()