#!/usr/bin/python3 # coding=utf-8 import _thread import argparse import os import platform import shutil import subprocess import sys import threading import time from typing import * _7zCompressable = ('.7z', '.xz', '.zip', '.tar', '.jar') _7zCompressArgs = '"{_7z}" a -sdel -mx9 -ssw -mmt{thread} -myx9 -md{dictsz}m -aoa -mfb273 ' \ '-ms=on "{dest}" "{src}"' _xzCompressArgs = '"{_7z}" a -sdel -txz -mx9 -ssw -mmt{thread} -myx9 -md{dictsz}m -aoa -mfb273 ' \ '-ms=on "{dest}" "{src}"' _zipCompressArgs = '"{_7z}" a -sdel -tzip -mx9 -mfb258 -mmt{thread} -ssw -aoa "{dest}" "{src}"' _7zExtractArgs = '"{_7z}" x "{src}" "-o{dest}" -p"{pw}" -y -r' _default7z = "C:\\Program Files\\7-Zip\\7z.exe" if platform.system() == 'Windows' and os.path.exists( "C:\\Program Files\\7-Zip\\7z.exe") else '7z' gb = {} # type:Dict[Union[str,int,list,tuple]] def terminal_width(): try: term_width, void = os.get_terminal_size() except Exception: term_width = 80 return min(max(20, term_width), 120) - 4 def compress(_7z = _default7z, source = '', dest = os.getcwd(), dictionary_size = 96, method = '7z') -> bool: if source == '': return False arg = gb['carguments'] if len(gb['carguments']) > 0 else '' if method == 'xz': command = _xzCompressArgs.format(_7z = _7z, src = os.path.join(source, '*'), dest = dest, dictsz = dictionary_size, thread = gb['ct']) + arg return execommand(command) elif method == 'zip' or method == 'jar': command = _zipCompressArgs.format(_7z = _7z, src = os.path.join(source, '*'), dest = dest, dictsz = dictionary_size, thread = gb['ct']) + arg return execommand(command) else: command = _7zCompressArgs.format(_7z = _7z, src = os.path.join(source, '*'), dest = dest, dictsz = dictionary_size, thread = gb['ct']) + arg return execommand(command) def extract(_7z = _default7z, source = '', dest = os.path.join(os.getcwd(), 'tmp')) -> bool: if source == '': return False command = _7zExtractArgs.format(_7z = _7z, src = source, dest = dest, pw = gb['password']) + ( gb['earguments'] if len(gb['earguments']) > 0 else '') return execommand(command, gb['timeout']) def test7z(file, pw = ''): return not execommand(r'"%s" t -p"%s" "%s"' % (gb['7z'], pw, file), exitCode = 2) def execommand(command = '', timeout = 2147483647, exitCode = 0) -> True | False: """ Execute following command \n :param exitCode: return true if exit-code equals this parameter :param command: a command
:param timeout: max running time
:return: True if process finished properly or False if reached timeout or exit code it not zero """ try: return exitCode == subprocess.call(command, shell = True, stdout = subprocess.DEVNULL, stderr = sys.stdout, timeout = timeout) except subprocess.TimeoutExpired: return False def get_extension(val = '', sensitive = False): if val.endswith('.jar'): return 'jar' if not sensitive: return '7z' if val.endswith('.xz'): return 'xz' elif val.endswith('.zip'): return 'zip' return '7z' class File: def __init__(self, path, sensitive = False): self.name = os.path.basename(path) self.type = get_extension(path, sensitive) # noinspection PyTypeChecker self.out = os.path.join(os.path.dirname(path), (self.name.rsplit('.', 1)[0] + '.' + self.type) if not sensitive else self.name) self.old = os.path.join(os.getcwd(), 'old', str(int(time.time())) + '-' + self.name) self.path = path self.status = 'idle' def backup(self): try: os.rename(self.path, self.old) except OSError: shutil.move(self.path, self.old) return self.old @staticmethod def deep_compress(folder): return execommand('"%s" "%s" "%s" -wt %s -ct %s -d %s -s %s -dc %s -7z "%s" -n yes' % ( sys.executable, sys.argv[0], folder, gb['wt'], gb['ct'], gb['deep'] - 1, 'yes', gb['dict'], gb['7z'])) def undo(self): try: os.rename(self.old, self.path) except OSError: shutil.move(self.old, self.path) # noinspection SpellCheckingInspection def recompress(self): if not test7z(self.path) and gb['skip']: print('[Skip] %s cause Locked' % self.path) return None try: cll() print("\rWorking on [%s]" % self) temp = os.path.join(os.getcwd(), 'tmp', str(int(time.time())) + '-' + self.name.replace('.', '-')) self.status = 'backing up' backup = self.backup() self.status = 'creating temp' if not os.path.exists(temp): os.mkdir(temp) self.status = 'extracting' if not extract(source = backup, dest = temp): cll() raise Error.ExtractError("\r[Error] when extract archive %s\n" % self) cll() stats = DirStats(temp) print("\rFound [%s] files | [%s] directories | size %0.6f Mbytes in [%s]" % ( stats.stats['file'], stats.stats['folder'], (stats.stats['size'] / 1024.0) / 1024.0, temp)) if gb['deep'] > 1 and deeper(temp): self.status = 'waiting deep compress' if not self.deep_compress(temp): cll() sys.stderr.write("\r[Error] when digging in archive %s\n" % self) self.status = 'compressing' if not compress(source = temp, dictionary_size = gb['dict'], dest = self.out, method = self.type): raise Error.CompressError("\r[Error] when compressing %s\n" % self) self.status = 'removing temp' try: os.rmdir(temp) except OSError: shutil.rmtree(temp) self.status = 'done' Manager.working.remove(self) if not gb['no_keep']: saveInfo(self.path, self.old) else: os.remove(self.old) cll() print("\r[done] %s" % self) except Exception as e: sys.stderr.write("\r[Error] %s | [Cause] %s\n" % (self, e)) self.undo() sys.stderr.write("\r[Info] %s undo successfully!\n" % self) def __str__(self): return self.path class Error: class ExtractError(IOError): def __init__(self, msg = ''): self.msg = msg def __str__(self): return self.msg class CompressError(IOError): def __init__(self, msg = ''): self.msg = msg def __str__(self): return self.msg class Manager: files = [] # type: List[File,...] working = [] # type: List[File,...] total = 0 def __init__(self, locations, sensitive = False, threads = 1): self.run(threads) if len(gb['only']) < 1: file_types = _7zCompressable if sensitive else (*_7zCompressable, '.rar', '.gz') if len(gb['exclude']) > 0: file_types = (x for x in (*file_types, *gb['include']) if not x.endswith((*gb['exclude'],))) else: file_types = (*gb['only'],) _thread.start_new_thread(Manager.walktop, (locations, lambda x: x.endswith((*file_types,)))) @staticmethod def walktop(path, only): for location in path: print("Scanning directory '%s'" % location) Manager.walk(location, only) print("Found %s files!" % Manager.total) gb['finished'] = True @staticmethod def walk(path, only): for f in os.listdir(path): f = os.path.join(path, f) if os.path.isdir(f): Manager.walk(f, only) elif os.path.isfile(f) and only(f): Manager.files.append(File(f, gb['sensitive'])) Manager.total += 1 @staticmethod def run(threads = 1): for i in range(threads): Work().start() @staticmethod def pop(): temp = Manager.files.pop() Manager.working.append(temp) return temp class Work(threading.Thread): def run(self): while len(Manager.files) < 1 and not gb['finished']: time.sleep(1) while 1: if len(Manager.files) < 1: break Manager.pop().recompress() class DirStats: def __init__(self, directory): self.dir = directory self.stats = {'size': 0, 'folder': 0, 'file': 0} # type: Dict[str:int] self.walk() def walk(self, path = None): if path is None: path = self.dir for f in os.listdir(path): f = os.path.join(path, f) if os.path.isdir(f): self.stats['folder'] += 1 self.walk(f) elif os.path.isfile(f): self.stats['size'] += os.stat(f).st_size self.stats['file'] += 1 def __str__(self): return "%s folder | %s files | size %s MBytes" % ( self.stats['folder'], self.stats['file'], ((self.stats['size'] / 1024.0) / 1024.0)) # clear line def cll(): print('\r', ' ' * (terminal_width() + 3), end = '', sep = '') def saveInfo(path, old): if not os.path.exists('info.txt'): f = open('info.txt', 'w') f.write('RE-COMPRESSED FILES\n') f.close() file = open('info.txt', 'a+') if not os.path.exists(old): old = '[REMOVED]' file.writelines('[OLD] %s -> [ORIGINAL] %s\n' % (old, path)) file.close() def print_status(): while len(Manager.files) < 1 and not gb['finished']: time.sleep(1) while len(Manager.working) > 0 or len(Manager.files) > 0: for work in Manager.working: cll() print("\r[%s] %s" % (work.status, wrap(work, -(len(work.status) + 3))), end = '') time.sleep(2) cll() print("\rRunning [%s] task | Remaining [%s] files" % (len(Manager.working), len(Manager.files)), end = '') time.sleep(2) print(gen_prog(terminal_width()), end = '') time.sleep(2) cll() print('\rDone!? | %s file has been re-compressed' % Manager.total) exit(0) def gen_prog(leng): total = Manager.total remaining = len(Manager.files) working = len(Manager.working) done = total - remaining - working str_rem = str(done) + '/' + str(total) + ' ' leng -= len(str_rem) return "\r%s[%s%s%s]" % (str_rem, '|' * int((done / total) * leng), ':' * int((working / total) * leng), '-' * int((remaining / total) * leng)) def wrap(msg, length = 0): if length < 0: length += terminal_width() if length == 0: length = terminal_width() msg = str(msg) return ('...' + msg[len(msg) - length:].strip(' \r\n\t')) if len(msg) > length else msg def deeper(path = ''): for root, dirs, files in os.walk(path): for f in files: if f.endswith(_7zCompressable): return True return False if __name__ == '__main__': args = argparse.ArgumentParser(description = 'Py7zReCompress help') args.add_argument(dest = 'directory', help = 'directory that contains archive files') args.add_argument('-wt', '--work_thread', default = 1, dest = 'work_thread', type = int, help = 'number of working thread(s)') args.add_argument('-ct', '--compress_thread', default = 3, dest = 'compress_thread', type = int, help = 'number of thread used by 7zip') args.add_argument('-d', '--deep', default = 2, dest = 'deep_count', type = int, help = 'number to dig into archive file') args.add_argument('-s', '--sensitive', default = 'no', dest = 's', type = str, help = 'use compress algorithm same with file extension', choices = ('yes', 'y', 'no', 'n')) args.add_argument('-dc', '--dictionary', default = 96, dest = 'dictionary_size', type = int, help = 'dictionary size (megabytes)') args.add_argument('-7z', '--executable', default = _default7z, dest = '_7z_path', type = str, help = '7zip executable path') args.add_argument('-e', '--exclude', default = '', dest = 'exclude', type = str, help = 'excluded file extension separator is ; like "-e 7z;xz"') args.add_argument('-i', '--include', default = '', dest = 'include', type = str, help = 'include file extension separator is ; like "-i iso;wim"') args.add_argument('-o', '--only', default = '', dest = 'only', type = str, help = 're-compress only file extension separator is ; like "-o zip"') args.add_argument('-n', '--no-keep', default = 'no', dest = 'no_keep', type = str, choices = ('y', 'yes', 'n', 'no'), help = 'if yes will keep old file') args.add_argument('-t', '--timeout', default = 900, dest = 'timeout', type = int, help = 'timeout in second if extract time is longer than the specified time program will abort this file') args.add_argument('-p', '--password', default = '', dest = 'password', type = str, help = 'password to extract archive file') args.add_argument('-ca', '--compress-arguments', default = '', dest = 'compress_arguments', type = str, help = 'additional argument when compress for 7zip') args.add_argument('-ea', '--extract-arguments', default = '', dest = 'extract_arguments', type = str, help = 'additional argument when extract for 7zip') args.add_argument('-skip', '--skip-locked', default = '', dest = 'skip', type = str, choices = ('y', 'yes', 'n', 'no'), help = 'skip locked file') arg = args.parse_args() gb['path'] = arg.directory if not isinstance(arg.directory, str) else [arg.directory] for d in gb['path']: if not os.path.exists(d): print("'{}' doesn't exist".format(d)) exit(-1) if not os.path.isdir(d): print("'{}' is not a directory".format(d)) exit(-1) gb['wt'] = arg.work_thread gb['ct'] = arg.compress_thread gb['deep'] = 2147483647 if arg.deep_count < 0 else arg.deep_count gb['sensitive'] = arg.s.startswith('y') gb['dict'] = arg.dictionary_size # noinspection PyProtectedMember gb['7z'] = arg._7z_path gb['finished'] = False gb['exclude'] = arg.exclude.split(';') gb['include'] = arg.include.split(';') gb['only'] = arg.only.split(';') gb['no_keep'] = arg.no_keep.startswith('y') gb['timeout'] = arg.timeout gb['carguments'] = arg.compress_arguments gb['earguments'] = arg.extract_arguments gb['password'] = arg.password gb['skip'] = arg.password == '' or arg.skip.startswith('y') if len(gb['include'][0]) < 1: gb['include'] = [] if len(gb['exclude'][0]) < 1: gb['exclude'] = [] if len(gb['only'][0]) < 1: gb['only'] = [] gb['include'] = ['.' + i if not i.startswith('.') else i for i in gb['include']] gb['only'] = ['.' + i if not i.startswith('.') else i for i in gb['only']] if not os.path.exists('tmp'): os.mkdir('tmp') if not os.path.exists('old'): os.mkdir('old') _thread.start_new_thread(print_status, ()) Manager(gb['path'], gb['sensitive'], gb['wt'])