#!/usr/bin/env python3 # /// script # dependencies = [ # "zstandard", # "filelock", # "tomli; python_version < '3.11'", # ] # /// """ VLFS - Vibecoded Large File Storage CLI Copyright 2026 UAA Software Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import argparse import fnmatch import glob import hashlib from concurrent.futures import ThreadPoolExecutor, as_completed import json import logging import os import subprocess import sys import tempfile import time from pathlib import Path from typing import Any import zstandard from filelock import FileLock as _FileLock # Module-level logger logger = logging.getLogger("vlfs") # Enable ANSI escape sequences on Windows 10+ if os.name == "nt": os.system("") _RCLONE_CONFIG_PATH: Path | None = None _LAST_INPLACE_LEN: int = 0 # ============================================================================= # Exceptions # ============================================================================= class RcloneError(Exception): """Error from rclone subprocess.""" def __init__(self, message: str, returncode: int, stdout: str, stderr: str): super().__init__(message) self.returncode = returncode self.stdout = stdout self.stderr = stderr class ConfigError(Exception): """Error in configuration.""" class VLFSIndexError(Exception): """Error in index operations.""" # ============================================================================= # Output Helpers # ============================================================================= class Colours: """ANSI colour codes for terminal output.""" RESET = "\033[0m" BOLD = "\033[1m" RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" GRAY = "\033[90m" def use_colour() -> bool: """Check if colour output should be used. Colours are disabled if: - NO_COLOR environment variable is set - CI environment variable is set - stdout is not a TTY Returns: True if colour should be used """ if os.environ.get("NO_COLOR"): return False if os.environ.get("CI"): return False if not sys.stdout.isatty(): return False return True def colourize(text: str, colour: str, force: bool = False) -> str: """Wrap text in ANSI colour codes if appropriate. Args: text: Text to colourize colour: Colour name (e.g., 'RED', 'GREEN') force: Force colour even if normally disabled Returns: Colourized text or plain text """ if not force and not use_colour(): return text colour_code = getattr(Colours, colour.upper(), "") if colour_code: return f"{colour_code}{text}{Colours.RESET}" return text def print_inplace(text: str) -> None: """Print text on the current line, overwriting previous content.""" if not sys.stdout.isatty(): print(text) return # \r moves to start of line, \033[K clears to the end sys.stdout.write(f"\r{text}\033[K") sys.stdout.flush() def clear_inplace() -> None: """Clear the current inplace line and move cursor back to start.""" if sys.stdout.isatty(): sys.stdout.write("\r\033[K") sys.stdout.flush() class ProgressTracker: """Small helper for concise CLI progress output.""" def __init__(self, total: int, verbose: bool = False): self.total = total self.verbose = verbose self.current = 0 self._use_inplace = not verbose and sys.stdout.isatty() def advance(self, message: str) -> None: """Advance progress by one item.""" if self.total <= 0: return self.current += 1 line = f"[{self.current}/{self.total}] {message}" if self._use_inplace: print_inplace(line) elif self.verbose: print(f" {line}") else: print(line) def clear(self) -> None: """Clear any inplace output.""" if self._use_inplace: clear_inplace() def done(self, summary: str, success: bool = True) -> None: """Clear progress output and print a final summary line.""" self.clear() marker = colourize("✓" if success else "✗", "GREEN" if success else "RED") print(f"{marker} {summary}") def format_bytes(size: int) -> str: """Format byte size as human-readable string.""" for unit in ["B", "KB", "MB", "GB", "TB"]: if size < 1024: return f"{size:.1f}{unit}" size /= 1024 return f"{size:.1f}PB" def pluralize(count: int, singular: str, plural: str | None = None) -> str: """Return the singular or plural form for count.""" if count == 1: return singular return plural if plural is not None else f"{singular}s" def format_compression_summary(original: int, compressed: int) -> str: """Format an original/compressed size summary.""" if original <= 0: return format_bytes(compressed) if original == compressed: return format_bytes(original) saved = max(0.0, 100.0 - (compressed / original * 100.0)) return f"{format_bytes(original)} → {format_bytes(compressed)}, {saved:.0f}%" def die(message: str, hint: str | None = None, exit_code: int = 1) -> int: """Print error message and exit with optional hint. Args: message: Error message to display hint: Optional remediation hint exit_code: Exit code to return Returns: Exit code (for testing purposes) """ prefix = colourize("Vlfs Error:", "RED") print(f"{prefix} {message}", file=sys.stderr) if hint: hint_prefix = colourize("Hint:", "YELLOW") print(f" {hint_prefix} {hint}", file=sys.stderr) logger.error(f"Exited with code {exit_code}: {message}") if hint: logger.error(f"Hint: {hint}") return exit_code # ============================================================================= # Logging # ============================================================================= def setup_logging(verbosity: int = 0, log_file: bool = True) -> None: """Set up logging with console and file handlers. Args: verbosity: 0=INFO, 1=DEBUG, 2=TRACE (mapped to DEBUG with more detail) log_file: Whether to write to log file """ # Determine log level if verbosity >= 2: level = logging.DEBUG fmt = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" elif verbosity >= 1: level = logging.DEBUG fmt = "%(asctime)s - %(levelname)s - %(message)s" else: level = logging.INFO fmt = "%(asctime)s - %(levelname)s - %(message)s" # Clear existing handlers logger.handlers = [] logger.setLevel(level) # Console handler (only warnings and above for non-verbose) console_handler = logging.StreamHandler(sys.stderr) console_handler.setLevel(logging.WARNING if verbosity == 0 else level) console_handler.setFormatter(logging.Formatter(fmt)) logger.addHandler(console_handler) # File handler if log_file: log_dir = Path.home() / ".vlfs" log_dir.mkdir(parents=True, exist_ok=True) log_path = log_dir / "vlfs.log" file_handler = logging.FileHandler(log_path, mode="a") file_handler.setLevel(logging.DEBUG) # Always log DEBUG to file file_handler.setFormatter( logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ) logger.addHandler(file_handler) logger.debug(f"Logging initialized (verbosity={verbosity})") # ============================================================================= # Low-level Utilities # ============================================================================= def with_file_lock(path: Path, timeout: float = 10.0): """Context manager for cross-platform file locking. Uses the filelock package for robust locking with timeout support. Args: path: Path to lock file timeout: Seconds to wait for lock (default 10s, -1 for infinite) Returns: Context manager that acquires/releases lock """ path.parent.mkdir(parents=True, exist_ok=True) return _FileLock(path, timeout=timeout) def atomic_write_bytes(dest: Path, data: bytes) -> None: """Write bytes to dest atomically via temp file + rename.""" dest.parent.mkdir(parents=True, exist_ok=True) fd, temp_path = tempfile.mkstemp(dir=dest.parent) try: with os.fdopen(fd, "wb") as f: f.write(data) os.replace(temp_path, dest) except Exception: try: os.close(fd) except OSError: pass if os.path.exists(temp_path): os.unlink(temp_path) raise def atomic_write_text(dest: Path, text: str, encoding: str = "utf-8") -> None: """Write text to dest atomically.""" atomic_write_bytes(dest, text.encode(encoding)) def retry( callable_fn, *, attempts: int = 3, base_delay: float = 1.0, max_delay: float = 30.0, exceptions: tuple = (RcloneError,), ): """Retry a callable with exponential backoff. Args: callable_fn: Function to call attempts: Maximum number of attempts base_delay: Initial delay between retries in seconds max_delay: Maximum delay between retries exceptions: Tuple of exceptions to catch and retry Returns: Result of callable_fn Raises: Last exception if all attempts fail """ last_exception = None for attempt in range(attempts): try: return callable_fn() except exceptions as e: last_exception = e if attempt < attempts - 1: delay = min(base_delay * (2**attempt), max_delay) time.sleep(delay) raise last_exception # ============================================================================= # Hashing & Compression # ============================================================================= def hash_file(path: Path, verbose: bool = True) -> tuple[str, int, float]: """Compute SHA256 hash of file, return (hex_digest, size, mtime).""" if verbose: print_inplace(f" Hashing {path.name}...") sha256 = hashlib.sha256() size = 0 with path.open("rb") as f: while True: chunk = f.read(65536) # 64KB chunks if not chunk: break sha256.update(chunk) size += len(chunk) mtime = path.stat().st_mtime return sha256.hexdigest().lower(), size, mtime def hash_files_parallel( paths: list[Path], max_workers: int | None = None, verbose: bool = True ) -> tuple[dict[Path, tuple[str, int, float]], dict[Path, Exception]]: """Hash files in parallel using a thread pool. Returns: Tuple of (results, errors) where results maps Path -> (hash, size, mtime) and errors maps Path -> Exception. """ if not paths: return {}, {} if max_workers is None: cpu_count = os.cpu_count() or 4 max_workers = min(32, cpu_count * 2) results: dict[Path, tuple[str, int, float]] = {} errors: dict[Path, Exception] = {} tracker = ProgressTracker(len(paths), verbose=False) if verbose else None with ThreadPoolExecutor(max_workers=max_workers) as executor: # Suppress internal hash_file printing to manage it ourselves future_map = {executor.submit(hash_file, path, verbose=False): path for path in paths} for future in as_completed(future_map): path = future_map[future] if tracker: tracker.advance(path.name) try: results[path] = future.result() except (OSError, IOError) as exc: errors[path] = exc if tracker: tracker.clear() return results, errors def shard_path(hex_digest: str) -> str: """Convert hex digest to sharded path (ab/cd/abcdef...).""" hex_lower = hex_digest.lower() if len(hex_lower) < 4: return hex_lower return f"{hex_lower[:2]}/{hex_lower[2:4]}/{hex_lower}" def compress_bytes(data: bytes, level: int = 3) -> bytes: """Compress data using zstandard.""" cctx = zstandard.ZstdCompressor(level=level) return cctx.compress(data) def decompress_bytes(data: bytes) -> bytes: """Decompress zstandard data.""" dctx = zstandard.ZstdDecompressor() return dctx.decompress(data) # ============================================================================= # Cache Operations # ============================================================================= def store_object(src_path: Path, cache_dir: Path, compression_level: int = 3) -> str: """Store file in cache, return object key.""" hex_digest, _, _ = hash_file(src_path, verbose=False) object_key = shard_path(hex_digest) object_path = cache_dir / "objects" / object_key # If already exists, skip if object_path.exists(): return object_key # Read, compress, and store atomically data = src_path.read_bytes() compressed = compress_bytes(data, level=compression_level) ratio = (len(compressed) / len(data)) * 100 if len(data) > 0 else 100 # print(f" Stored in cache ({format_bytes(len(data))} -> {format_bytes(len(compressed))}, {ratio:.1f}%)") atomic_write_bytes(object_path, compressed) return object_key def load_object(object_key: str, cache_dir: Path) -> bytes: """Load and decompress object from cache.""" object_path = cache_dir / "objects" / object_key compressed = object_path.read_bytes() return decompress_bytes(compressed) # ============================================================================= # Index Operations # ============================================================================= def read_index(vlfs_dir: Path) -> dict[str, Any]: """Read index.json, return entries dict.""" index_path = vlfs_dir / "index.json" if not index_path.exists(): return {"version": 1, "entries": {}} with index_path.open("r") as f: data = json.load(f) # Version guard if data.get("version") != 1: raise VLFSIndexError(f"Unsupported index version: {data.get('version')}") return data def write_index(vlfs_dir: Path, data: dict[str, Any]) -> None: """Write index.json atomically.""" index_path = vlfs_dir / "index.json" atomic_write_text(index_path, json.dumps(data, indent=2)) def update_index_entries(vlfs_dir: Path, updates: dict[str, dict[str, Any]]) -> None: """Update index entries and write once atomically.""" if not updates: return with with_file_lock(vlfs_dir / "index.lock"): index = read_index(vlfs_dir) index_entries = index.get("entries", {}) index_entries.update(updates) index["entries"] = index_entries write_index(vlfs_dir, index) # ============================================================================= # Configuration # ============================================================================= def get_user_config_dir() -> Path: """Return ~/.config/vlfs/, creating if needed.""" env_override = os.environ.get("VLFS_USER_CONFIG") if env_override: config_dir = Path(env_override) elif os.name == "nt": base = Path(os.environ.get("APPDATA", Path.home() / "AppData" / "Roaming")) config_dir = base / "vlfs" else: base = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) config_dir = base / "vlfs" config_dir.mkdir(parents=True, exist_ok=True) return config_dir def resolve_paths(repo_root: Path) -> tuple[Path, Path]: """Resolve VLFS directories, honoring environment overrides.""" config_path = os.environ.get("VLFS_CONFIG") if config_path: vlfs_dir = Path(config_path).parent else: vlfs_dir = repo_root / ".vlfs" cache_path = os.environ.get("VLFS_CACHE") if cache_path: cache_dir = Path(cache_path) else: cache_dir = repo_root / ".vlfs-cache" return vlfs_dir, cache_dir def ensure_dirs(vlfs_dir: Path, cache_dir: Path) -> None: """Create required directory structure if missing.""" vlfs_dir.mkdir(parents=True, exist_ok=True) (cache_dir / "objects").mkdir(parents=True, exist_ok=True) def ensure_gitignore(repo_root: Path) -> None: """Ensure .gitignore has required entries.""" gitignore = repo_root / ".gitignore" required_entries = [ ".vlfs-cache/", ] existing_content = "" if gitignore.exists(): existing_content = gitignore.read_text() entries_to_add = [] for entry in required_entries: if entry not in existing_content: entries_to_add.append(entry) if entries_to_add: with gitignore.open("a") as f: if existing_content and not existing_content.endswith("\n"): f.write("\n") for entry in entries_to_add: f.write(f"{entry}\n") def load_config(vlfs_dir: Path) -> dict[str, Any]: """Load configuration from TOML file.""" config_file = vlfs_dir / "config.toml" if not config_file.exists(): return {} try: import tomllib except ImportError: import tomli as tomllib with config_file.open("rb") as f: return tomllib.load(f) def deep_merge(target: dict, source: dict) -> dict: """Deep merge two dictionaries.""" result = target.copy() for key, value in source.items(): if key in result and isinstance(result[key], dict) and isinstance(value, dict): result[key] = deep_merge(result[key], value) else: result[key] = value return result def load_merged_config(vlfs_dir: Path) -> dict[str, Any]: """Load repo config, then overlay user config.""" repo_config = load_config(vlfs_dir) user_config_path = get_user_config_dir() / "config.toml" user_config = {} if user_config_path.exists(): import tomllib with user_config_path.open("rb") as f: user_config = tomllib.load(f) return deep_merge(repo_config, user_config) def warn_if_secrets_in_repo(vlfs_dir: Path) -> None: """Warn if secrets detected in repo config.""" config_path = vlfs_dir / "config.toml" if not config_path.exists(): return content = config_path.read_text() if "client_secret" in content or "secret_access_key" in content: print( colourize("Warning: Secrets detected in .vlfs/config.toml", "YELLOW"), file=sys.stderr, ) print("Move secrets to ~/.config/vlfs/config.toml", file=sys.stderr) # ============================================================================= # Rclone # ============================================================================= def set_rclone_config_path(path: Path | None) -> None: """Set global rclone config path for this run.""" global _RCLONE_CONFIG_PATH if path and path.exists(): _RCLONE_CONFIG_PATH = path else: _RCLONE_CONFIG_PATH = None def get_rclone_config_path() -> Path | None: """Get the current rclone config path.""" return _RCLONE_CONFIG_PATH def run_rclone( args: list[str], *, env: dict[str, str] | None = None, cwd: Path | str | None = None, timeout: float | None = None, capture_output: bool = True, ) -> tuple[int, str, str]: """Run rclone subprocess and return (returncode, stdout, stderr). Args: args: Command line arguments for rclone (not including 'rclone') env: Optional environment variables to add/override cwd: Optional working directory timeout: Optional timeout in seconds capture_output: If True, capture stdout/stderr. If False, output to terminal. Returns: Tuple of (returncode, stdout, stderr) Raises: RcloneError: If returncode is non-zero """ cmd = ["rclone"] + args config_path = get_rclone_config_path() if config_path: cmd += ["--config", str(config_path)] # Add global flags for stability with restricted keys # --s3-no-check-bucket prevents HeadBucket calls which fail on scoped keys cmd += ["--s3-no-check-bucket"] if logger.isEnabledFor(logging.DEBUG): print(f" Running: {' '.join(cmd)}") run_env = None if env: run_env = os.environ.copy() run_env.update(env) try: if capture_output: result = subprocess.run( cmd, capture_output=True, text=True, env=run_env, cwd=cwd, timeout=timeout ) else: result = subprocess.run( cmd, capture_output=False, text=True, env=run_env, cwd=cwd, timeout=timeout ) except FileNotFoundError as exc: raise RcloneError( "rclone not found in PATH. Install from https://rclone.org/downloads/", 127, "", str(exc), ) if result.returncode != 0: raise RcloneError( f"rclone failed with code {result.returncode}: {result.stderr or ''}", result.returncode, result.stdout or "", result.stderr or "", ) return result.returncode, result.stdout or "", result.stderr or "" def list_remote_objects(remote: str, bucket: str = "vlfs") -> set[str]: """List all objects in a remote bucket using rclone lsjson.""" logger.info(f"Listing all objects on {remote}:{bucket}") try: # Use rclone lsjson to get all objects recursively # This is much faster than checking each object individually rc, stdout, stderr = run_rclone(["lsjson", f"{remote}:{bucket}", "--recursive"]) if rc != 0: logger.error(f"Failed to list remote objects: {stderr}") return set() objects = json.loads(stdout) # Standardize paths to use forward slashes (rclone already does this) return {obj["Path"] for obj in objects if not obj["IsDir"]} except Exception as e: logger.error(f"Error listing remote objects: {e}") return set() def rclone_config_has_section(path: Path, section: str) -> bool: """Check if rclone config file has a specific section.""" import configparser if not path.exists(): return False parser = configparser.ConfigParser() try: parser.read(str(path)) return parser.has_section(section) except Exception: return False def write_rclone_drive_config(config_dir: Path, config: dict[str, str]) -> None: """Write rclone config for Google Drive. Args: config_dir: Directory to write rclone.conf to (typically user config dir) config: Dict with client_id, client_secret, etc. """ config_path = config_dir / "rclone.conf" config_lines = ["[gdrive]", "type = drive"] for key, value in config.items(): config_lines.append(f"{key} = {value}") atomic_write_text(config_path, "\n".join(config_lines) + "\n") def write_rclone_r2_config(dest_dir: Path) -> None: """Generate rclone.conf with R2 settings in dest_dir.""" # Load config from repo (for provider settings etc) # We assume we are in a repo, so find .vlfs try: vlfs_dir, _ = resolve_paths(Path.cwd()) repo_config = load_merged_config(vlfs_dir) except: repo_config = {} config_path = dest_dir / "rclone.conf" # Read existing config if present existing_lines = [] if config_path.exists(): existing_lines = config_path.read_text().splitlines() new_lines = [] in_r2 = False # Copy everything except [r2] section for line in existing_lines: stripped = line.strip() if stripped == "[r2]": in_r2 = True continue if in_r2 and stripped.startswith("["): in_r2 = False if not in_r2: new_lines.append(line) # Add/Update [r2] section r2_config = repo_config.get("remotes", {}).get("r2", {}) # Get secrets from env try: env_config = get_r2_config_from_env() except ConfigError: # If pushing, we need creds. But this might be called just to ensure config exists. env_config = {} if env_config: new_lines.append("") new_lines.append("[r2]") new_lines.append("type = s3") # Add provider specific settings provider = r2_config.get("provider", "Cloudflare") new_lines.append(f"provider = {provider}") endpoint = r2_config.get("endpoint") if not endpoint and "endpoint" in env_config: endpoint = env_config["endpoint"] if endpoint: new_lines.append(f"endpoint = {endpoint}") new_lines.append(f"access_key_id = {env_config['access_key_id']}") new_lines.append(f"secret_access_key = {env_config['secret_access_key']}") # Write back atomic_write_text(config_path, "\n".join(new_lines) + "\n") # ============================================================================= # R2 Operations # ============================================================================= def get_r2_config_from_env() -> dict[str, str]: """Get R2 configuration from environment variables. Required env vars: RCLONE_CONFIG_R2_ACCESS_KEY_ID RCLONE_CONFIG_R2_SECRET_ACCESS_KEY RCLONE_CONFIG_R2_ENDPOINT Returns: Dict with rclone config keys Raises: ConfigError: If required env vars are missing """ required = [ "RCLONE_CONFIG_R2_ACCESS_KEY_ID", "RCLONE_CONFIG_R2_SECRET_ACCESS_KEY", "RCLONE_CONFIG_R2_ENDPOINT", ] config = {} missing = [] for var in required: value = os.environ.get(var) if not value: missing.append(var) else: # Convert env var name to rclone config key # RCLONE_CONFIG_R2_ACCESS_KEY_ID -> access_key_id key = var.replace("RCLONE_CONFIG_R2_", "").lower() config[key] = value if missing: raise ConfigError( f"Missing R2 credentials. Set these environment variables:\n" + "\n".join(f" {var}" for var in missing) ) return config def ensure_r2_auth() -> int: """Ensure R2 authentication is available via env vars or config file. Returns: 0 on success, calls die() on failure. """ user_dir = get_user_config_dir() config_path = user_dir / "rclone.conf" # Check environment variables first (legacy/CI priority) try: # If env vars are present, generate config from them get_r2_config_from_env() write_rclone_r2_config(user_dir) set_rclone_config_path(config_path) return 0 except ConfigError: pass # Check for existing config file with [r2] section if rclone_config_has_section(config_path, "r2"): set_rclone_config_path(config_path) return 0 return die( "R2 credentials required for push", hint="Set RCLONE_CONFIG_R2_* env vars or create ~/.config/vlfs/rclone.conf with [r2] section", ) def validate_r2_connection(bucket: str = "vlfs") -> bool: """Validate R2 connection by listing bucket. Args: bucket: Bucket name to test Returns: True if connection succeeds Raises: RcloneError: If connection fails ConfigError: If credentials missing """ # Ensure config is available (will raise ConfigError if not) if not get_rclone_config_path(): try: get_r2_config_from_env() except ConfigError: # If no env vars and no config path set, check if we can set it from default user location user_config = get_user_config_dir() / "rclone.conf" if rclone_config_has_section(user_config, "r2"): set_rclone_config_path(user_config) else: # Re-raise original error to prompt for env vars or config raise if logger.isEnabledFor(logging.DEBUG): print(f" Verifying R2 connection to bucket '{bucket}'...") # Test with ls command (less strict permissions than lsd) # We check for a dummy file. Even if it doesn't exist, ls returns 0. # We only care if it fails due to auth/connection errors. try: run_rclone(["ls", f"r2:{bucket}/.vlfs-check", "--max-depth", "1"], capture_output=False) except RcloneError as e: # If it's a "directory not found" (which rclone might emit for empty buckets on some remotes), # we can ignore it. But usually ls returns 0 with empty output. # If it's an auth error, re-raise. if "directory not found" in str(e).lower(): return True raise return True def remote_object_exists(object_key: str, bucket: str = "vlfs") -> bool: """Check if object exists in remote R2 bucket. Args: object_key: The object key (sharded path) bucket: Bucket name Returns: True if object exists """ try: # Use rclone ls to check existence # ls returns 0 with empty output if path doesn't exist returncode, stdout, _ = run_rclone(["ls", f"r2:{bucket}/{object_key}"]) return returncode == 0 and stdout.strip() != "" except RcloneError: return False def delete_from_remote( remote: str, bucket: str, object_key: str, dry_run: bool = False ) -> bool: """Delete object from remote storage. Args: remote: Remote name ("r2" or "gdrive") bucket: Bucket name object_key: Object key to delete dry_run: If True, don't actually delete Returns: True if deletion succeeded """ if dry_run: print(f"[DRY-RUN] Would delete {remote}:{bucket}/{object_key}") return True if logger.isEnabledFor(logging.DEBUG): print(f" Deleting from {remote}...") try: run_rclone( ["deletefile", f"{remote}:{bucket}/{object_key}"], capture_output=False ) return True except RcloneError as e: print(f"Error deleting from {remote}: {e}", file=sys.stderr) return False def upload_to_r2( local_path: Path, object_key: str, bucket: str = "vlfs", dry_run: bool = False, verbose: bool = False, ) -> bool: """Upload a local file to R2. Args: local_path: Path to local file object_key: Destination object key in R2 bucket: Bucket name dry_run: If True, don't actually upload Returns: True if upload succeeded or object already exists """ if dry_run: print(f"[DRY-RUN] Would upload {local_path} -> r2:{bucket}/{object_key}") return True # Check if already exists if remote_object_exists(object_key, bucket): return True # Upload using rclone copyto remote_path = f"r2:{bucket}/{object_key}" def do_upload(): cmd = ["copyto", str(local_path), remote_path] if verbose: cmd.append("-P") run_rclone(cmd, capture_output=not verbose) retry(do_upload, attempts=3, base_delay=1.0) return True def download_from_r2( object_keys: list[str], cache_dir: Path, bucket: str = "vlfs", dry_run: bool = False, force: bool = False, verbose: bool = False, ) -> int: """Download multiple objects from R2 to cache. Args: object_keys: List of object keys to download cache_dir: Local cache directory bucket: Bucket name dry_run: If True, don't actually download force: If True, ignore existing files in cache """ if not object_keys: return 0 # Validate R2 credentials before attempting download if not dry_run: get_r2_config_from_env() if dry_run: for key in object_keys: print(f"[DRY-RUN] Would download r2:{bucket}/{key}") return len(object_keys) # Build files-from list for batch download files_list = object_keys # Write to temp file with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: f.write("\n".join(files_list)) files_from_path = f.name try: # Download with rclone copy using --files-from def do_download(): cmd = [ "copy", f"r2:{bucket}", str(cache_dir / "objects"), "--files-from", files_from_path, "--transfers", "8", ] if verbose: cmd.append("-P") if force: cmd.append("--ignore-times") run_rclone(cmd, capture_output=not verbose) retry(do_download, attempts=3, base_delay=1.0) return len(object_keys) finally: os.unlink(files_from_path) def download_http(url: str, dest: Path, timeout: float = 60, verbose: bool = True) -> None: """Download URL to dest atomically.""" import urllib.request import urllib.error if verbose: print_inplace(f" Downloading {url.split('/')[-1]}...") # Use a browser-like User-Agent to avoid 403 Forbidden from CDNs like Cloudflare headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } req = urllib.request.Request(url, headers=headers) dest.parent.mkdir(parents=True, exist_ok=True) fd, temp_path = tempfile.mkstemp(dir=dest.parent) try: with urllib.request.urlopen(req, timeout=timeout) as resp: while chunk := resp.read(65536): os.write(fd, chunk) os.close(fd) os.replace(temp_path, dest) except Exception: try: os.close(fd) except OSError: pass if os.path.exists(temp_path): os.unlink(temp_path) raise def download_from_r2_http( object_keys: list[str], cache_dir: Path, base_url: str, dry_run: bool = False, force: bool = False, verbose: bool = False, tracker: ProgressTracker | None = None, ) -> int: """Download objects via HTTP (no auth required).""" downloaded = 0 # Use ThreadPool for parallel downloads def _download_one(key: str) -> bool: dest = cache_dir / "objects" / key if dest.exists() and not force: return False url = f"{base_url.rstrip('/')}/{key}" if dry_run: print(f"[DRY-RUN] Would download {url}") return True try: download_http(url, dest, verbose=False) return True except Exception as e: print(f"Error downloading {url}: {e}", file=sys.stderr) return False with ThreadPoolExecutor(max_workers=8) as executor: future_map = {executor.submit(_download_one, key): key for key in object_keys} for future in as_completed(future_map): key = future_map[future] if future.result(): downloaded += 1 if tracker: tracker.advance(key) elif verbose: print(f" Downloaded {key}") if tracker: tracker.clear() return downloaded # ============================================================================= # Google Drive Operations # ============================================================================= def has_drive_token() -> bool: """Check if Google Drive token exists. In CI environments (CI env var or VLFS_NO_DRIVE set), fails gracefully. Returns: True if token file exists Raises: RuntimeError: In CI environments without token """ user_dir = get_user_config_dir() token_path = user_dir / "gdrive-token.json" # Check for CI environment if os.environ.get("CI") or os.environ.get("VLFS_NO_DRIVE"): if not token_path.exists(): raise RuntimeError( "Google Drive is not available in CI environments.\n" "Use R2 for CI-friendly storage or set up Drive auth locally first." ) return token_path.exists() def auth_gdrive(vlfs_dir: Path) -> int: """Interactive Google Drive authentication using rclone's built-in OAuth. Args: vlfs_dir: Path to .vlfs directory (unused, kept for compat) Returns: Exit code (0 for success) """ user_dir = get_user_config_dir() config_file = user_dir / "rclone.conf" token_file = user_dir / "gdrive-token.json" print(f"{colourize('Vlfs', 'CYAN')} Setting up Google Drive authentication...") print(" A browser will open for you to authorise access to Google Drive.") print() try: # Use rclone config create with built-in OAuth # This creates a remote named 'gdrive' of type 'drive' print(" Launching browser for authentication...") subprocess.run( [ "rclone", "config", "create", "gdrive", "drive", "--config", str(config_file), ], check=True, ) # Extract token from generated config import configparser parser = configparser.ConfigParser() parser.read(str(config_file)) token = parser.get("gdrive", "token", fallback="") if not token: return die( "Drive token not found in rclone.conf", hint="Complete rclone auth and retry", ) # If token is quoted JSON, unquote it if token.startswith('"') and token.endswith('"'): token = json.loads(token) # Write token file atomically (raw JSON content) print("Saving authentication token...") atomic_write_text(token_file, token) print() print("Google Drive authentication complete!") print(f"Token saved to: {token_file}") return 0 except subprocess.CalledProcessError as e: return die("rclone auth failed", hint=str(e)) except FileNotFoundError: return die( "rclone not found in PATH", hint="Install from https://rclone.org/downloads/", ) def upload_to_drive( local_path: Path, object_key: str, bucket: str = "vlfs", dry_run: bool = False, verbose: bool = False, ) -> bool: """Upload a local file to Google Drive with rate limiting. Args: local_path: Path to local file object_key: Destination object key in Drive bucket: Bucket/path name in Drive dry_run: If True, don't actually upload Returns: True if upload succeeded """ if dry_run: print(f"[DRY-RUN] Would upload {local_path} -> gdrive:{bucket}/{object_key}") return True remote_path = f"gdrive:{bucket}/{object_key}" def do_upload(): if verbose: print(f" Uploading {local_path.name} to Drive...") cmd = [ "copyto", str(local_path), remote_path, "--transfers", "1", "--drive-chunk-size", "8M", ] if verbose: cmd.append("-P") run_rclone( cmd, capture_output=not verbose, ) # Use more retries for Drive due to rate limiting def do_upload_with_drive_retry(): last_exception = None for attempt in range(5): try: do_upload() return except RcloneError as e: last_exception = e # Check for rate limit errors (403/429) if ( "403" in e.stderr or "429" in e.stderr or "rateLimitExceeded" in e.stderr ): delay = min(2**attempt * 2, 60) # Max 60s delay print(f"Rate limited, waiting {delay}s...") time.sleep(delay) else: raise raise last_exception do_upload_with_drive_retry() return True def download_from_drive( object_keys: list[str], cache_dir: Path, bucket: str = "vlfs", dry_run: bool = False, force: bool = False, verbose: bool = False, ) -> int: """Download multiple objects from Drive to cache with rate limiting. Args: object_keys: List of object keys to download cache_dir: Local cache directory bucket: Bucket/path name in Drive dry_run: If True, don't actually download force: If True, ignore existing files in cache """ if not object_keys: return 0 if dry_run: for key in object_keys: print(f"[DRY-RUN] Would download gdrive:{bucket}/{key}") return len(object_keys) # Build files-from list for batch download files_list = object_keys # Write to temp file with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: f.write("\n".join(files_list)) files_from_path = f.name try: # Download with rclone copy using --files-from, limited to 1 transfer def do_download(): cmd = [ "copy", f"gdrive:{bucket}", str(cache_dir / "objects"), "--files-from", files_from_path, "--transfers", "1", "--drive-chunk-size", "8M", ] if verbose: cmd.append("-P") if force: cmd.append("--ignore-times") run_rclone(cmd, capture_output=not verbose) retry(do_download, attempts=5, base_delay=2.0) return len(object_keys) finally: os.unlink(files_from_path) # ============================================================================= # Workspace Operations # ============================================================================= def compute_missing_objects(index: dict[str, Any], cache_dir: Path) -> list[str]: """Compute list of objects that need to be downloaded. Args: index: Index dict with entries cache_dir: Local cache directory Returns: List of object keys not in local cache """ missing = [] entries = index.get("entries", {}) for rel_path, entry in entries.items(): object_key = entry.get("object_key") if not object_key: continue object_path = cache_dir / "objects" / object_key if not object_path.exists(): missing.append(object_key) # Remove duplicates while preserving order seen = set() unique_missing = [] for key in missing: if key not in seen: seen.add(key) unique_missing.append(key) return unique_missing def materialize_workspace( index: dict[str, Any], repo_root: Path, cache_dir: Path, force: bool = False, dry_run: bool = False, verbose: int = 0, ) -> tuple[int, int, list[str]]: """Decompress objects from cache into workspace. Args: index: Index dict with entries repo_root: Repository root path cache_dir: Local cache directory force: If True, overwrite modified files dry_run: If True, don't actually write files Returns: Tuple of (files_written, bytes_written, skipped_files) """ entries = index.get("entries", {}) files_written = 0 bytes_written = 0 skipped_files = [] to_write: list[tuple[str, Path, str]] = [] for rel_path, entry in entries.items(): object_key = entry.get("object_key") if not object_key: continue # Target path in workspace file_path = repo_root / rel_path.replace("/", os.sep) # Check if file exists if file_path.exists() and not force: try: hex_digest, _, _ = hash_file(file_path, verbose=False) # If matches target, we are good (already up to date) if hex_digest == entry.get("hash"): continue # If different, and NOT force, skip if not force: skipped_files.append(rel_path) continue except (OSError, IOError): pass # Will overwrite if we can't read/hash to_write.append((rel_path, file_path, object_key)) tracker = ProgressTracker(len(to_write), verbose=bool(verbose)) if to_write else None for rel_path, file_path, object_key in to_write: # Load from cache try: data = load_object(object_key, cache_dir) except (OSError, IOError): continue # Will be missing if tracker: prefix = "[DRY-RUN] " if dry_run else "" tracker.advance(f"{prefix}{rel_path}") if dry_run: files_written += 1 bytes_written += len(data) continue # Write atomically atomic_write_bytes(file_path, data) files_written += 1 bytes_written += len(data) if tracker: tracker.clear() return files_written, bytes_written, skipped_files def _find_untracked_files( repo_root: Path, entries: dict[str, Any], patterns: list[str] ) -> list[str]: """Find files matching patterns that are not in the index.""" extra = [] ignored_dirs = { ".git", ".vlfs", ".vlfs-cache", "__pycache__", "node_modules", "venv", ".env", } for root, dirs, files in os.walk(repo_root): dirs[:] = [d for d in dirs if d not in ignored_dirs] for file in files: file_path = Path(root) / file rel_path = file_path.relative_to(repo_root) rel_str = str(rel_path).replace(os.sep, "/") # Skip if already in index if rel_str in entries: continue # Check if matches tracked patterns if any(fnmatch.fnmatch(file, p) for p in patterns): extra.append(rel_str) return extra def compute_status( index: dict[str, Any], repo_root: Path, verbose: int = 0 ) -> dict[str, list[str]]: """Compare workspace against index, return categorized lists.""" if verbose: print("Analyzing workspace status...") entries = index.get("entries", {}) missing = [] modified = [] # Check indexed files to_hash: list[tuple[str, Path, dict[str, Any]]] = [] for rel_path, entry in entries.items(): file_path = repo_root / rel_path.replace("/", os.sep) if not file_path.exists(): missing.append(rel_path) continue # Check if modified (size or mtime changed) stat = file_path.stat() if stat.st_size != entry.get("size") or stat.st_mtime != entry.get("mtime"): to_hash.append((rel_path, file_path, entry)) if to_hash: paths_to_hash = [item[1] for item in to_hash] if len(paths_to_hash) >= 8: results, errors = hash_files_parallel(paths_to_hash, verbose=bool(verbose)) else: results = {} errors = {} for path in paths_to_hash: try: results[path] = hash_file(path, verbose=False) except (OSError, IOError) as exc: errors[path] = exc for rel_path, file_path, entry in to_hash: if file_path in errors: modified.append(rel_path) continue current_hash = results[file_path][0] if current_hash != entry.get("hash"): modified.append(rel_path) # Find extra files # Load config for patterns config = load_config(repo_root / ".vlfs") # Assuming standard location tracking = config.get("tracking", {}) patterns = tracking.get("patterns", []) # Default to common large file types if no patterns configured if not patterns: patterns = ["*.psd", "*.zip", "*.exe", "*.dll", "*.lib", "*.iso", "*.mp4"] if verbose: print(" Scanning for untracked files...") extra = _find_untracked_files(repo_root, entries, patterns) return {"missing": missing, "modified": modified, "extra": extra} def group_objects_by_remote(index: dict[str, Any]) -> dict[str, list[tuple[str, str]]]: """Group index entries by remote backend. Args: index: Index dict with entries Returns: Dict mapping remote name -> list of (object_key, rel_path) tuples """ groups: dict[str, list[tuple[str, str]]] = {} entries = index.get("entries", {}) for rel_path, entry in entries.items(): object_key = entry.get("object_key") remote = entry.get("remote", "r2") # Default to r2 for backwards compatibility if not object_key: continue if remote not in groups: groups[remote] = [] groups[remote].append((object_key, rel_path)) return groups # ============================================================================= # Commands # ============================================================================= def resolve_targets(path_arg: str) -> list[Path]: """Resolve a path argument (which may be a glob) to a list of existing files/dirs.""" if not path_arg: return [] # Check for glob characters if any(c in path_arg for c in "*?[]"): # Expand glob relative to CWD # glob.glob handles both rel and abs patterns matches = glob.glob(path_arg, recursive=True) return [Path(p).resolve() for p in matches] else: # Exact path p = Path(path_arg).resolve() # Return it even if not exists, caller handles error return [p] def cmd_list( repo_root: Path, vlfs_dir: Path, long_format: bool = False, remote_filter: str | None = None, json_output: bool = False, pattern: str | None = None, ) -> int: """Execute list command.""" try: index = read_index(vlfs_dir) except VLFSIndexError as e: print(f"Error: {e}", file=sys.stderr) return 1 entries = index.get("entries", {}) if not entries: if json_output: print("[]") return 0 # Normalize pattern if provided full_pattern = None if pattern: cwd = Path.cwd() try: rel_cwd = cwd.relative_to(repo_root) prefix = str(rel_cwd).replace(os.sep, "/") if prefix != ".": full_pattern = f"{prefix}/{pattern}" else: full_pattern = pattern except ValueError: full_pattern = pattern # Ensure forward slashes full_pattern = full_pattern.replace(os.sep, "/") filtered_entries = [] for rel_path, entry in entries.items(): if remote_filter and entry.get("remote", "r2") != remote_filter: continue if full_pattern: if not fnmatch.fnmatch(rel_path, full_pattern): continue filtered_entries.append((rel_path, entry)) # Sort by path filtered_entries.sort(key=lambda x: x[0]) if json_output: output_list = [] for rel_path, entry in filtered_entries: item = entry.copy() item["path"] = rel_path output_list.append(item) print(json.dumps(output_list, indent=2)) return 0 print(f"{colourize('Vlfs', 'CYAN')} Tracked files") if long_format: # Calculate column widths # Hash (8 chars), Size (10 chars), Remote (8 chars), Path (remainder) header = f" {'HASH':<8} {'SIZE':<10} {'REMOTE':<8} {'PATH'}" print(colourize(header, "GRAY")) print(colourize(" " + "-" * (len(header) + 20), "GRAY")) for rel_path, entry in filtered_entries: h = colourize(entry.get("hash", "")[:8], "CYAN") s = entry.get("size", 0) # Use gray for size units if possible, but keep it simple s_str = format_bytes(s) r = entry.get("remote", "r2") print(f" {h} {s_str:<10} {r:<8} {rel_path}") else: for rel_path, _ in filtered_entries: print(f" {rel_path}") return 0 def cmd_status( repo_root: Path, vlfs_dir: Path, dry_run: bool = False, json_output: bool = False, force_color: bool = False, verbose: int = 0, ) -> int: """Execute status command with enhanced output.""" try: index = read_index(vlfs_dir) except VLFSIndexError as e: print(f"Error: {e}", file=sys.stderr) return 1 status = compute_status(index, repo_root, verbose=verbose) if json_output: print(json.dumps(status, indent=2)) return 0 if verbose: print(f"{colourize('Vlfs', 'CYAN', force_color)} status") total_logical = 0 total_compressed = 0 entries = index.get("entries", {}) for entry in entries.values(): total_logical += entry.get("size", 0) total_compressed += entry.get("compressed_size", 0) total_changes = len(status["missing"]) + len(status["modified"]) if total_changes == 0: marker = colourize("✓", "GREEN", force_color) print( f"{marker} Workspace is up to date ({len(entries)} {pluralize(len(entries), 'file')}, {format_bytes(total_logical)})" ) else: if status["missing"]: print( f"{colourize('Missing:', 'RED', force_color)} {len(status['missing'])} {pluralize(len(status['missing']), 'file')}" ) for path in status["missing"][:10]: # Show first 10 print(f" {colourize(path, 'RED', force_color)}") if len(status["missing"]) > 10: print(f" ... and {len(status['missing']) - 10} more") if status["modified"]: print( f"{colourize('Modified:', 'YELLOW', force_color)} {len(status['modified'])} {pluralize(len(status['modified']), 'file')}" ) for path in status["modified"][:10]: print(f" {colourize(path, 'YELLOW', force_color)}") if len(status["modified"]) > 10: print(f" ... and {len(status['modified']) - 10} more") if verbose: ratio = (total_compressed / total_logical * 100) if total_logical > 0 else 100 print() print(f"{colourize('Cache Statistics:', 'CYAN')}") print(f" Tracked files: {len(entries)}") print(f" Logical size: {format_bytes(total_logical)}") print(f" Physical size: {format_bytes(total_compressed)} ({ratio:.1f}% ratio)") return 0 def cmd_pull( repo_root: Path, vlfs_dir: Path, cache_dir: Path, force: bool = False, dry_run: bool = False, pattern: str | None = None, restore: bool = False, verbose: int = 0, ) -> int: """Execute pull command with support for mixed remotes.""" try: index = read_index(vlfs_dir) except VLFSIndexError as e: print(f"Error: {e}", file=sys.stderr) return 1 if not index.get("entries"): print("No files to pull (index is empty)") return 0 # Filter entries if pattern provided if pattern: cwd = Path.cwd() try: rel_cwd = cwd.relative_to(repo_root) prefix = str(rel_cwd).replace(os.sep, "/") if prefix != ".": full_pattern = f"{prefix}/{pattern}" else: full_pattern = pattern except ValueError: full_pattern = pattern full_pattern = full_pattern.replace(os.sep, "/") filtered_entries = {} for k, v in index["entries"].items(): if fnmatch.fnmatch(k, full_pattern): filtered_entries[k] = v if not filtered_entries: print(f"No tracked files match pattern: {pattern}") return 0 index["entries"] = filtered_entries if verbose and restore: print(f"{colourize('Vlfs', 'CYAN')} Restoring {len(filtered_entries)} files matching '{pattern}' from cache...") elif verbose: print(f"{colourize('Vlfs', 'CYAN')} Pulling {len(filtered_entries)} files matching '{pattern}' from remote...") # Load merged config config = load_merged_config(vlfs_dir) total_downloaded = 0 total_objects = 0 skipped_private_files = 0 if not restore: if verbose and not pattern: print(f"{colourize('Vlfs', 'CYAN')} Pulling all files from remote...") # Check if we can use HTTP download for R2 r2_public_url = config.get("remotes", {}).get("r2", {}).get("public_base_url") r2_bucket = config.get("remotes", {}).get("r2", {}).get("bucket", "vlfs") drive_bucket = config.get("remotes", {}).get("gdrive", {}).get("bucket", "vlfs") # Update rclone config with R2 settings only if needed (push or no public URL) # But for pull, if we have public URL, we don't strictly need rclone config # Group objects by remote remote_groups = group_objects_by_remote(index) # Validate connection only if not using HTTP if not dry_run and "r2" in remote_groups and not r2_public_url: try: validate_r2_connection(bucket=r2_bucket) except (RcloneError, ConfigError) as e: print(f"Error: {e}", file=sys.stderr) if isinstance(e, ConfigError): print( "Hint: Set R2 credentials via RCLONE_CONFIG_R2_* env vars", file=sys.stderr, ) return 1 # Build map of object key -> compressed size for progress reporting key_sizes = {} for entry in index.get("entries", {}).values(): k = entry.get("object_key") s = entry.get("compressed_size", 0) if k: key_sizes[k] = s for remote, objects in remote_groups.items(): object_keys = [obj[0] for obj in objects] total_objects += len(object_keys) # Check Google Drive auth before attempting download if remote == "gdrive": can_access_drive = False try: can_access_drive = has_drive_token() except RuntimeError: can_access_drive = False # Treat CI-restriction as "no token" if not can_access_drive: # No auth available - skip private files (summary will be shown at end) skipped_private_files += len(object_keys) continue try: downloaded = _download_remote_group( remote, object_keys, cache_dir, key_sizes, r2_public_url, dry_run, r2_bucket=r2_bucket, drive_bucket=drive_bucket, force=force, verbose=verbose, ) except (RcloneError, ConfigError) as e: print(f"Error downloading from {remote}: {e}", file=sys.stderr) if isinstance(e, ConfigError): print( "Hint: Set R2 credentials via RCLONE_CONFIG_R2_* env vars", file=sys.stderr, ) return 1 total_downloaded += downloaded # Materialize workspace files_written, bytes_written, skipped = materialize_workspace( index, repo_root, cache_dir, force or restore, dry_run, verbose=verbose ) if skipped: print( f" {colourize('Skipped:', 'YELLOW')} {len(skipped)} files due to local modifications (use --force to overwrite):" ) for path in skipped[:10]: print(f" {path}") if len(skipped) > 10: print(f" ... and {len(skipped) - 10} more") if dry_run: print( f"{colourize('[DRY-RUN]', 'YELLOW')} Would write {files_written} files ({format_bytes(bytes_written)})" ) else: if files_written > 0: marker = colourize("✓", "GREEN") print(f"{marker} Restored {files_written} files ({format_bytes(bytes_written)})") else: print(f"{colourize('✓', 'GREEN')} All files already up to date.") # Report skipped private files due to missing Google Drive auth if skipped_private_files > 0: print( f" {colourize('Note:', 'YELLOW')} Skipped {skipped_private_files} private files (Google Drive auth required)." ) return 0 def _run_push_batch( repo_root: Path, vlfs_dir: Path, cache_dir: Path, files_to_push: list[Path], private: bool, dry_run: bool, verbose: int = 0, ) -> int: """Push a prepared batch of files with concise progress output.""" if not files_to_push: print("No files processed.") return 0 config = load_merged_config(vlfs_dir) compression_level = config.get("defaults", {}).get("compression_level", 3) r2_bucket = config.get("remotes", {}).get("r2", {}).get("bucket", "vlfs") drive_bucket = config.get("remotes", {}).get("gdrive", {}).get("bucket", "vlfs") if not dry_run and not private: if ensure_r2_auth() != 0: return 1 try: validate_r2_connection(bucket=r2_bucket) except (RcloneError, ConfigError) as e: print(f"Error: {e}", file=sys.stderr) return 1 tracker = ProgressTracker(len(files_to_push), verbose=bool(verbose)) failed: list[str] = [] updates: dict[str, dict[str, Any]] = {} total_original = 0 total_compressed = 0 for file_path in files_to_push: try: rel_path = str(file_path.relative_to(repo_root)).replace(os.sep, "/") except ValueError: rel_path = str(file_path) size = file_path.stat().st_size if file_path.exists() else 0 tracker.advance(f"{rel_path} ({format_bytes(size)})") result, entry = _push_single_file_collect( repo_root, vlfs_dir, cache_dir, file_path, private, dry_run, compression_level, r2_bucket=r2_bucket, drive_bucket=drive_bucket, verbose=verbose, ) if result != 0: failed.append(rel_path) continue if entry: updates.update(entry) entry_data = next(iter(entry.values())) if isinstance(entry_data, dict): total_original += entry_data.get("size", 0) total_compressed += entry_data.get("compressed_size", 0) if failed: tracker.done( f"Failed to push {len(failed)} {pluralize(len(failed), 'file')}", success=False, ) return 1 if not dry_run and updates: update_index_entries(vlfs_dir, updates) action = "Would push" if dry_run else "Pushed" tracker.done( f"{action} {len(files_to_push)} {pluralize(len(files_to_push), 'file')} ({format_compression_summary(total_original, total_compressed)})" ) return 0 def cmd_push( repo_root: Path, vlfs_dir: Path, cache_dir: Path, paths: list[str], private: bool, dry_run: bool = False, verbose: int = 0, ) -> int: """Execute push command. Handles both files and directories.""" # Resolve target paths (supports globs) all_targets = [] for path in paths: targets = resolve_targets(path) if targets: all_targets.extend(targets) else: die(f"No files found matching: {path}") if not all_targets: return 1 # Filter out non-existent files valid_targets = [] for t in all_targets: if not t.exists(): print(f"Error: File not found: {t}", file=sys.stderr) continue valid_targets.append(t) if not valid_targets: return 1 files_to_push: list[Path] = [] seen: set[Path] = set() for src_path in valid_targets: if src_path.is_dir(): if verbose: print(f"Scanning directory {src_path}...") files = _find_files_recursive(repo_root, src_path) if not files: print(f"No files found in directory: {src_path}") continue for file_path in files: if file_path not in seen: files_to_push.append(file_path) seen.add(file_path) else: if src_path not in seen: files_to_push.append(src_path) seen.add(src_path) if verbose: print( f"{colourize('Vlfs', 'CYAN')} Pushing files to {'private' if private else 'public'} storage..." ) return _run_push_batch( repo_root, vlfs_dir, cache_dir, files_to_push, private, dry_run, verbose ) def cmd_push_all( repo_root: Path, vlfs_dir: Path, cache_dir: Path, private: bool, dry_run: bool, verbose: int = 0, ) -> int: """Push all new or modified files.""" try: index = read_index(vlfs_dir) except VLFSIndexError as e: print(f"Error: {e}", file=sys.stderr) return 1 # Find modified files if verbose: print("Scanning for modified files...") status = compute_status(index, repo_root, verbose=verbose) files_to_push = [ repo_root / rel_path.replace("/", os.sep) for rel_path in status["missing"] + status["modified"] if (repo_root / rel_path.replace("/", os.sep)).exists() ] if not files_to_push: print(f"{colourize('✓', 'GREEN')} All files are up to date") return 0 return _run_push_batch( repo_root, vlfs_dir, cache_dir, files_to_push, private, dry_run, verbose ) def cmd_push_glob( repo_root: Path, vlfs_dir: Path, cache_dir: Path, pattern: str, private: bool, dry_run: bool, verbose: int = 0, ) -> int: """Push files matching a glob pattern.""" if verbose: print(f"Searching for files matching '{pattern}'...") matched_files = _collect_glob_matches(repo_root, pattern) if not matched_files: print(f"No files match pattern: {pattern}") return 0 if verbose: print(f"Found {len(matched_files)} files matching '{pattern}'") return _run_push_batch( repo_root, vlfs_dir, cache_dir, matched_files, private, dry_run, verbose ) def cmd_lookup(repo_root: Path, vlfs_dir: Path, query: str) -> int: """Find files in index by partial hash or object key.""" try: index = read_index(vlfs_dir) except VLFSIndexError as e: print(f"Error: {e}", file=sys.stderr) return 1 entries = index.get("entries", {}) matches = [] query = query.lower() for rel_path, entry in entries.items(): obj_key = entry.get("object_key", "").lower() file_hash = entry.get("hash", "").lower() if query in obj_key or query in file_hash: matches.append((rel_path, entry)) if not matches: print(f"No matches found for '{query}'") return 1 print(f"Found {len(matches)} matches:") for rel_path, entry in matches: remote = entry.get("remote", "unknown") size = entry.get("size", 0) obj_key = entry.get("object_key", "n/a") print(f" {colourize(rel_path, 'CYAN')}") print(f" Hash: {entry.get('hash')}") print(f" Object: {obj_key} ({remote})") print(f" Size: {format_bytes(size)}") return 0 def cmd_verify( repo_root: Path, vlfs_dir: Path, cache_dir: Path, remote: bool = False, fix: bool = False, dry_run: bool = False, json_output: bool = False, verbose: int = 0, ) -> int: """Execute verify command that re-hashes workspace files.""" try: index = read_index(vlfs_dir) except VLFSIndexError as e: print(f"Error: {e}", file=sys.stderr) return 1 if verbose and not json_output: print("Verifying workspace integrity...") entries = index.get("entries", {}) corrupted = [] missing_local = [] valid = [] to_hash: list[tuple[str, Path, dict[str, Any]]] = [] for rel_path, entry in entries.items(): file_path = repo_root / rel_path.replace("/", os.sep) if not file_path.exists(): missing_local.append(rel_path) continue # Check size and mtime first (shortcut) stat = file_path.stat() indexed_size = entry.get("size", 0) indexed_mtime = entry.get("mtime", 0) # If size and mtime match, assume unchanged if stat.st_size == indexed_size and stat.st_mtime == indexed_mtime: valid.append(rel_path) continue to_hash.append((rel_path, file_path, entry)) if to_hash: paths_to_hash = [item[1] for item in to_hash] if len(paths_to_hash) >= 8 and not json_output: logger.debug("Hashing %d files in parallel", len(paths_to_hash)) results, errors = hash_files_parallel(paths_to_hash) else: results = {} errors = {} tracker = ProgressTracker(len(paths_to_hash), verbose=bool(verbose)) if not json_output else None for rel_path, path, _ in to_hash: if tracker: tracker.advance(rel_path) try: results[path] = hash_file(path, verbose=False) except (OSError, IOError) as exc: errors[path] = exc if tracker: tracker.clear() for rel_path, file_path, entry in to_hash: if file_path in errors: missing_local.append(rel_path) continue current_hash = results[file_path][0] if current_hash != entry.get("hash"): corrupted.append(rel_path) else: valid.append(rel_path) missing_remote = [] if remote: if verbose and not json_output: print("Verifying remote object existence...") # Group by remote remote_groups = {} for rel_path, entry in entries.items(): r = entry.get("remote", "r2") if r not in remote_groups: remote_groups[r] = [] remote_groups[r].append((rel_path, entry)) for r, items in remote_groups.items(): if r == "r2": cloud_keys = list_remote_objects("r2") for rel_path, entry in items: obj_key = entry.get("object_key") if obj_key and obj_key not in cloud_keys: missing_remote.append(rel_path) elif r == "gdrive": # Drive is trickier to list_json efficiently, skip for now or check one by one # For now, let's just focus on R2 which is where 404s usually happen pass if fix and missing_remote: print(f"Attempting to fix {len(missing_remote)} missing remote objects...") fixed_count = 0 for rel_path in missing_remote: entry = entries[rel_path] obj_key = entry.get("object_key") local_obj_path = cache_dir / "objects" / obj_key if local_obj_path.exists(): print(f" Re-uploading {rel_path} ({obj_key})...") try: if entry.get("remote") == "r2": upload_to_r2(local_obj_path, obj_key, dry_run=dry_run) fixed_count += 1 except Exception as e: print(f" Error: {e}", file=sys.stderr) else: print(f" Cannot fix {rel_path}: Object {obj_key} missing from local cache.") print(f"Fixed {fixed_count} missing remote objects.") # Remove fixed from missing_remote for reporting # (This is simplified, in a real scenario we'd re-verify) if json_output: result = { "valid": valid, "corrupted": corrupted, "missing_local": missing_local, "missing_remote": missing_remote, "total": len(entries), "issues": len(corrupted) + len(missing_local) + len(missing_remote), } print(json.dumps(result, indent=2)) else: total = len(entries) issues = len(corrupted) + len(missing_local) + len(missing_remote) if issues == 0: print(f"{colourize('✓', 'GREEN')} All {total} files verified OK") else: print( f"Verification: {colourize(f'{total - issues} OK', 'GREEN')}, " + f"{colourize(f'{len(corrupted)} corrupted', 'RED')}, " + f"{colourize(f'{len(missing_local)} missing local', 'YELLOW')}, " + f"{colourize(f'{len(missing_remote)} missing remote', 'RED')}" ) for path in corrupted[:5]: print(f" {colourize('CORRUPTED', 'RED')} {path}") for path in missing_local[:5]: print(f" {colourize('MISSING LOCAL', 'YELLOW')} {path}") for path in missing_remote[:5]: print(f" {colourize('MISSING REMOTE', 'RED')} {path}") combined_count = len(corrupted) + len(missing_local) + len(missing_remote) if combined_count > 15: print(f" ... and {combined_count - 15} more") return 1 if issues > 0 else 0 def cmd_remove( repo_root: Path, vlfs_dir: Path, cache_dir: Path, paths: list[str], force: bool = False, dry_run: bool = False, delete_file: bool = False, verbose: int = 0, ) -> int: """Execute remove command.""" try: index = read_index(vlfs_dir) except VLFSIndexError as e: print(f"Error: {e}", file=sys.stderr) return 1 entries = index.get("entries", {}) if not entries: print("Index is empty") return 0 # Resolve target paths all_targets = [] # We must handle paths individually or collectively? # If user provides multiple paths, we iterate. # But `resolve_targets` returns paths. # We also need to keep track of missing globs for fallback matching. filesystem_targets = [] failed_glob_paths = [] for path in paths: targets = resolve_targets(path) if targets: filesystem_targets.extend(targets) else: # Maybe it's a missing file or index-only glob failed_glob_paths.append(path) # Identify files to remove to_remove = [] # 1. Check resolved filesystem targets for target_path in filesystem_targets: # ... logic ... try: target_path.relative_to(repo_root) except ValueError: print(f"Warning: {target_path} is outside repository", file=sys.stderr) continue if target_path.is_dir(): target_rel = str(target_path.relative_to(repo_root)).replace(os.sep, "/") for rel_path in entries: if rel_path == target_rel or rel_path.startswith(target_rel + "/"): if rel_path not in to_remove: to_remove.append(rel_path) else: target_rel = str(target_path.relative_to(repo_root)).replace(os.sep, "/") if target_rel in entries: if target_rel not in to_remove: to_remove.append(target_rel) # No warn here, handled by fallback logic? # 2. Check failed glob paths (index matching) for path in failed_glob_paths: matched = False if any(c in path for c in "*?[]"): # It's a glob, try matching against index keys import fnmatch cwd = Path.cwd() try: rel_cwd = cwd.relative_to(repo_root) prefix = str(rel_cwd).replace(os.sep, "/") if prefix == ".": search_pattern = path else: search_pattern = f"{prefix}/{path}" except ValueError: search_pattern = path search_pattern = search_pattern.replace(os.sep, "/") for rel_path in entries: if fnmatch.fnmatch(rel_path, search_pattern): if rel_path not in to_remove: to_remove.append(rel_path) matched = True # 3. Handle exact path to missing file if not matched and not any(c in path for c in "*?[]"): target_path = Path(path).resolve() try: target_rel = str(target_path.relative_to(repo_root)).replace(os.sep, "/") if target_rel in entries: if target_rel not in to_remove: to_remove.append(target_rel) except ValueError: pass if not to_remove: print(f"No tracked files found matching: {paths}") return 0 print(f"Found {len(to_remove)} files to remove.") # Confirmation if not force and not dry_run: response = input("Remove these files from VLFS (index, cache, cloud)? [y/N] ").strip().lower() if response not in ("y", "yes"): print("Aborted") return 0 # Count object references (to avoid deleting shared objects) object_ref_counts = {} for entry in entries.values(): key = entry.get("object_key") if key: object_ref_counts[key] = object_ref_counts.get(key, 0) + 1 # Load config for buckets config = load_merged_config(vlfs_dir) r2_bucket = config.get("remotes", {}).get("r2", {}).get("bucket", "vlfs") drive_bucket = config.get("remotes", {}).get("gdrive", {}).get("bucket", "vlfs") removed_count = 0 # We need to modify 'entries' in place, but iterating over it while modifying is bad. # So we'll iterate over 'to_remove' and pop from 'entries'. # But wait, we need to write index atomically. # Let's modify a copy or just modify the loaded dict and write it back at end. tracker = ProgressTracker(len(to_remove), verbose=bool(verbose)) for rel_path in to_remove: tracker.advance(rel_path) entry = entries[rel_path] object_key = entry.get("object_key") remote = entry.get("remote", "r2") # Decrement ref count if object_key: object_ref_counts[object_key] -= 1 remaining_refs = object_ref_counts[object_key] if remaining_refs == 0: if verbose: print(f" Object {object_key} is unreferenced.") # Delete from cache cache_obj_path = cache_dir / "objects" / object_key if cache_obj_path.exists(): if dry_run: print(f"[DRY-RUN] Would delete local cache object {object_key}") else: if verbose: print(" Deleting from cache...") try: cache_obj_path.unlink() except OSError as e: print(f"Warning: Failed to delete cache object: {e}", file=sys.stderr) # Delete from remote if remote == "gdrive": # Check token? Maybe skip if no token but warn? # The delete_from_remote function handles the call, but we should probably check auth globally once if possible. # For now rely on individual calls. delete_from_remote("gdrive", drive_bucket, object_key, dry_run) else: # r2 delete_from_remote("r2", r2_bucket, object_key, dry_run) else: if verbose: print( f" Object {object_key} referenced by {remaining_refs} other files, keeping in storage." ) # Remove from index structure (in memory) if not dry_run: del entries[rel_path] # Optionally delete workspace file if delete_file: ws_file = repo_root / rel_path.replace("/", os.sep) if ws_file.exists(): if dry_run: print(f"[DRY-RUN] Would delete workspace file {ws_file}") else: if verbose: print(" Deleting workspace file...") try: ws_file.unlink() except OSError as e: print(f"Warning: Failed to delete workspace file: {e}", file=sys.stderr) removed_count += 1 if not dry_run: # Write updated index # We need to wrap this in lock? update_index_entries does locking. # But we are doing a bulk delete. # We should use with_file_lock manually here or create a helper `remove_index_entries`. # For now, let's just use with_file_lock and write_index. with with_file_lock(vlfs_dir / "index.lock"): # Re-read index to be safe? Or just overwrite? # Ideally re-read and apply changes, but we did a lot of logic based on the snapshot. # If we re-read, we might miss new files added concurrently? # Or if someone else deleted, we might error. # Given single-user CLI nature, overwriting our modified 'entries' is probably acceptable risk, # but strictly we should lock *before* reading if we want transaction. # But the user interaction (prompt) makes holding lock bad. # Let's just write back what we have. index["entries"] = entries write_index(vlfs_dir, index) # Cleanup empty dirs in cache _cleanup_empty_dirs(cache_dir / "objects") action = "Would remove" if dry_run else "Removed" tracker.done(f"{action} {removed_count} {pluralize(removed_count, 'file')}") return 0 def cmd_clean( repo_root: Path, vlfs_dir: Path, cache_dir: Path, dry_run: bool = False, yes: bool = False, verbose: int = 0, ) -> int: """Execute clean command to remove unreferenced cache objects.""" try: index = read_index(vlfs_dir) except VLFSIndexError as e: print(f"Error: {e}", file=sys.stderr) return 1 # Get all referenced object keys entries = index.get("entries", {}) referenced_keys = set() for entry in entries.values(): object_key = entry.get("object_key") if object_key: referenced_keys.add(object_key) # Scan cache directory for all objects objects_dir = cache_dir / "objects" if not objects_dir.exists(): print("Cache directory is empty") return 0 if verbose: print("Scanning cache for orphaned objects...") to_delete = [] total_size = 0 for obj_path in objects_dir.rglob("*"): if obj_path.is_file(): # Compute relative path from objects dir rel_key = str(obj_path.relative_to(objects_dir)).replace(os.sep, "/") if rel_key not in referenced_keys: to_delete.append(obj_path) total_size += obj_path.stat().st_size if not to_delete: print(f"{colourize('✓', 'GREEN')} No orphaned cache objects found") return 0 if dry_run: print( f"[DRY-RUN] Would delete {len(to_delete)} orphaned objects ({format_bytes(total_size)})" ) for obj_path in to_delete[:10]: print(f" {obj_path.relative_to(objects_dir)}") if len(to_delete) > 10: print(f" ... and {len(to_delete) - 10} more") return 0 # Confirmation prompt if not yes: print(f"Found {len(to_delete)} orphaned objects ({format_bytes(total_size)})") response = input("Delete these files? [y/N] ").strip().lower() if response not in ("y", "yes"): print("Aborted") return 0 # Delete files deleted_count = 0 freed_bytes = 0 tracker = ProgressTracker(len(to_delete), verbose=bool(verbose)) for obj_path in to_delete: try: tracker.advance(obj_path.name) size = obj_path.stat().st_size obj_path.unlink() deleted_count += 1 freed_bytes += size except (OSError, IOError) as e: print(f"Warning: Failed to delete {obj_path}: {e}", file=sys.stderr) # Clean up empty directories _cleanup_empty_dirs(objects_dir) tracker.done( f"Deleted {deleted_count} {pluralize(deleted_count, 'object')}, freed {format_bytes(freed_bytes)}" ) return 0 def cmd_repair( repo_root: Path, vlfs_dir: Path, cache_dir: Path, dry_run: bool = False, verbose: int = 0, ) -> int: """Execute repair command that fixes common issues.""" # 1. Clean local cache (remove orphans) print("[1/3] Cleaning local cache...") cmd_clean(repo_root, vlfs_dir, cache_dir, dry_run=dry_run, yes=True, verbose=verbose) # 2. Verify remote objects and fix 404s print("[2/3] Verifying remote objects...") cmd_verify( repo_root, vlfs_dir, cache_dir, remote=True, fix=True, dry_run=dry_run, verbose=verbose, ) # 3. Synchronize cache to remote (Final safety net) print("[3/3] Syncing to cloud...") if dry_run: print("[DRY-RUN] Would run: rclone sync .vlfs-cache/objects r2:vlfs") else: try: cmd = ["sync", str(cache_dir / "objects"), "r2:vlfs"] if verbose: cmd.append("-P") run_rclone(cmd, capture_output=not verbose) print(f"{colourize('✓', 'GREEN')} Synced cache to R2") except Exception as e: print(f"Warning: Final sync failed: {e}", file=sys.stderr) print(f"{colourize('✓', 'GREEN')} Repair complete!") return 0 # ============================================================================= # Private Helpers # ============================================================================= def _push_single_file_collect( repo_root: Path, vlfs_dir: Path, cache_dir: Path, src_path: Path, private: bool, dry_run: bool, compression_level: int = 3, r2_bucket: str = "vlfs", drive_bucket: str = "vlfs", verbose: int = 0, ) -> tuple[int, dict[str, dict[str, Any]] | None]: """Push a single file to remote and return index entry update.""" # Ensure file is within repo try: rel_path = str(src_path.relative_to(repo_root)).replace(os.sep, "/") except ValueError: print(f"Error: File must be within repository: {src_path}", file=sys.stderr) return 1, None logger.info(f"Pushing file: {rel_path}") logger.debug(f"Source path: {src_path}") # Store in local cache object_key = store_object(src_path, cache_dir, compression_level=compression_level) logger.debug(f"Stored in cache with key: {object_key}") # Compute hash and size hex_digest, size, mtime = hash_file(src_path, verbose=False) compressed_size = (cache_dir / "objects" / object_key).stat().st_size logger.debug(f"Hash: {hex_digest}, Size: {size}, Compressed: {compressed_size}") # Determine remote remote = "gdrive" if private else "r2" logger.debug(f"Target remote: {remote}") if dry_run: logger.info(f"[DRY-RUN] Would upload {rel_path} to {remote}") else: # Upload to remote try: if private: # Check Drive token if not has_drive_token(): logger.error("Google Drive token not found") print("Error: Google Drive token not found.", file=sys.stderr) print("Set up Drive auth with: vlfs auth gdrive", file=sys.stderr) return 1, None upload_to_drive( cache_dir / "objects" / object_key, object_key, **( {"bucket": drive_bucket, "dry_run": False, "verbose": True} if verbose else {"bucket": drive_bucket, "dry_run": False} ), ) logger.info(f"Uploaded to Drive: {rel_path}") else: upload_to_r2( cache_dir / "objects" / object_key, object_key, **( {"bucket": r2_bucket, "dry_run": False, "verbose": True} if verbose else {"bucket": r2_bucket, "dry_run": False} ), ) logger.info(f"Uploaded to R2: {rel_path}") except RcloneError as e: print(f"Error uploading {rel_path}: {e}", file=sys.stderr) return 1, None except ConfigError as e: print(f"Error: {e}", file=sys.stderr) return 1, None entry = { rel_path: { "hash": hex_digest, "size": size, "compressed_size": compressed_size, "mtime": mtime, "object_key": object_key, "remote": remote, } } return 0, entry def _download_remote_group( remote: str, object_keys: list[str], cache_dir: Path, key_sizes: dict[str, int], r2_public_url: str | None, dry_run: bool, r2_bucket: str = "vlfs", drive_bucket: str = "vlfs", force: bool = False, verbose: int = 0, ) -> int: """Download missing objects for a remote group and return count.""" if force: to_download = object_keys else: to_download = [key for key in object_keys if not (cache_dir / "objects" / key).exists()] if not to_download: return 0 missing_size = sum(key_sizes.get(k, 0) for k in to_download) if remote == "r2" and r2_public_url: if dry_run: print( f"[DRY-RUN] Would download {len(to_download)} objects ({format_bytes(missing_size)}) via HTTP from {r2_public_url}" ) return len(to_download) if verbose: print( f"Downloading {len(to_download)} objects ({format_bytes(missing_size)}) via HTTP..." ) tracker = ProgressTracker(len(to_download), verbose=bool(verbose)) if verbose else None return download_from_r2_http( to_download, cache_dir, r2_public_url, dry_run, force=force, **({"verbose": True, "tracker": tracker} if verbose else {}), ) if dry_run: print( f"[DRY-RUN] Would download {len(to_download)} objects ({format_bytes(missing_size)}) from {remote}" ) return len(to_download) if verbose: print( f"Downloading {len(to_download)} objects ({format_bytes(missing_size)}) from {remote}..." ) if remote == "gdrive": try: if not has_drive_token(): print( "Skipping Drive files (no auth). Run: vlfs auth gdrive", file=sys.stderr, ) return 0 except RuntimeError: print( "Skipping Drive files (CI environment/no auth).", file=sys.stderr, ) return 0 return download_from_drive( to_download, cache_dir, bucket=drive_bucket, dry_run=False, force=force, **({"verbose": True} if verbose else {}), ) # Default to R2 (rclone) if not r2_public_url: return download_from_r2( to_download, cache_dir, bucket=r2_bucket, dry_run=False, force=force, **({"verbose": True} if verbose else {}), ) # Fallback to HTTP if configured return download_from_r2_http( to_download, cache_dir, r2_public_url, dry_run, force=force, **({"verbose": True} if verbose else {}), ) def _match_recursive_glob(rel_path: str, pattern: str) -> bool: """Match a path against a ** glob pattern. Examples: "tools/compiler.exe" matches "tools/**/*.exe" "tools/sub/linker.exe" matches "tools/**/*.exe" """ # Handle patterns like "tools/**/*.exe" if "**" not in pattern: return fnmatch.fnmatch(rel_path, pattern) # Split pattern by ** parts = pattern.split("**/") if len(parts) != 2: return False prefix = parts[0].rstrip("/") # e.g., "tools" suffix = parts[1] # e.g., "*.exe" # Path must start with prefix if prefix and not rel_path.startswith(prefix + "/"): return False # Path must end with suffix match # Get the filename part filename = rel_path.split("/")[-1] return fnmatch.fnmatch(filename, suffix) def _collect_glob_matches(repo_root: Path, pattern: str) -> list[Path]: """Collect files matching a glob pattern.""" matched_files = [] # Normalize pattern separators pattern_normalized = pattern.replace("/", os.sep) # Support both ** recursive patterns and simple globs if "**" in pattern: # For recursive patterns like "tools/**/*.exe", we need special handling # Split the pattern: prefix = "tools", suffix = "*.exe" parts = pattern.split("**/") if len(parts) == 2: prefix = parts[0].rstrip("/") # "tools" suffix = parts[1] # "*.exe" # Walk the directory tree starting from prefix start_dir = repo_root / prefix if prefix else repo_root if start_dir.exists(): for root, dirs, files in os.walk(start_dir): # Skip ignored directories dirs[:] = [ d for d in dirs if d not in (".vlfs", ".vlfs-cache", ".git") ] for file in files: if fnmatch.fnmatch(file, suffix): file_path = Path(root) / file matched_files.append(file_path) else: # Fallback: simple recursive walk with pattern matching for root, dirs, files in os.walk(repo_root): dirs[:] = [d for d in dirs if d not in (".vlfs", ".vlfs-cache", ".git")] for file in files: file_path = Path(root) / file rel_path = file_path.relative_to(repo_root) rel_str = str(rel_path).replace(os.sep, "/") # Convert ** pattern to a simpler check # "tools/**/*.exe" should match "tools/compiler.exe", "tools/sub/linker.exe" if _match_recursive_glob(rel_str, pattern): matched_files.append(file_path) else: # Simple glob - use glob.glob import glob as glob_module search_path = repo_root / pattern_normalized for file_path in glob_module.glob(str(search_path), recursive=False): file_path = Path(file_path) if file_path.is_file(): matched_files.append(file_path) return matched_files def _find_files_recursive(repo_root: Path, directory: Path) -> list[Path]: """Find all files recursively, skipping ignored directories.""" files = [] ignore_dirs = {".vlfs", ".vlfs-cache", ".git"} for root, dirs, filenames in os.walk(directory): # Filter out ignored directories dirs[:] = [d for d in dirs if d not in ignore_dirs] for filename in filenames: files.append(Path(root) / filename) return files def _cleanup_empty_dirs(directory: Path) -> None: """Remove empty directories recursively.""" if not directory.exists(): return for root, dirs, files in os.walk(str(directory), topdown=False): for dir_name in dirs: dir_path = Path(root) / dir_name try: if dir_path.exists() and not any(dir_path.iterdir()): dir_path.rmdir() except (OSError, IOError): pass # ============================================================================= # CLI Entry Point # ============================================================================= def main(argv: list[str] | None = None) -> int: """Main entry point.""" parser = argparse.ArgumentParser( prog="vlfs", description="Vibecoded Large File Storage", exit_on_error=False ) # Global options parser.add_argument( "-v", "--verbose", action="count", default=0, help="Increase verbosity (use -v for DEBUG, -vv for TRACE)", ) subparsers = parser.add_subparsers(dest="command", help="Available commands") # auth command auth_parser = subparsers.add_parser("auth", help="Authentication commands") auth_subparsers = auth_parser.add_subparsers( dest="auth_command", help="Auth subcommands" ) auth_gdrive_parser = auth_subparsers.add_parser( "gdrive", help="Authenticate with Google Drive" ) # pull command pull_parser = subparsers.add_parser("pull", help="Download files from remote") pull_parser.add_argument( "path", nargs="?", help="Path or glob pattern to pull" ) pull_parser.add_argument( "--force", action="store_true", help="Overwrite locally modified files" ) pull_parser.add_argument( "--dry-run", action="store_true", help="Show what would be done without doing it", ) pull_parser.add_argument( "--restore", action="store_true", help="Restore files from local cache only (no downloading)", ) # push command push_parser = subparsers.add_parser("push", help="Upload file(s) to remote") push_parser.add_argument( "paths", nargs="*", help="Path(s) to file or directory to push" ) push_parser.add_argument( "--private", action="store_true", help="Upload to private storage (Drive)" ) push_parser.add_argument( "--dry-run", action="store_true", help="Show what would be done without doing it", ) push_parser.add_argument("--glob", help="Push files matching glob pattern") push_parser.add_argument( "--all", action="store_true", help="Push all new or modified files" ) # remove command remove_parser = subparsers.add_parser("remove", help="Remove file(s) from VLFS tracking and storage") remove_parser.add_argument( "paths", nargs="+", help="Path(s) to file or directory to remove" ) remove_parser.add_argument( "--force", "-f", action="store_true", help="Skip confirmation prompt" ) remove_parser.add_argument( "--dry-run", action="store_true", help="Show what would be done without doing it", ) remove_parser.add_argument( "--delete-file", action="store_true", help="Also delete the local workspace file", ) # list command list_parser = subparsers.add_parser("ls", help="List tracked files") list_parser.add_argument( "pattern", nargs="?", help="Glob pattern to filter listing" ) list_parser.add_argument( "--long", "-l", action="store_true", help="Show detailed information" ) list_parser.add_argument( "--json", action="store_true", help="Output in JSON format" ) list_parser.add_argument( "--remote", help="Filter by remote (e.g., r2, gdrive)" ) # status command status_parser = subparsers.add_parser("status", help="Show workspace status") status_parser.add_argument( "--dry-run", action="store_true", help="Show what would be done without doing it", ) status_parser.add_argument( "--json", action="store_true", help="Output in JSON format" ) status_parser.add_argument( "--color", action="store_true", help="Force color output" ) # verify command verify_parser = subparsers.add_parser( "verify", help="Verify workspace files against index" ) verify_parser.add_argument( "--json", action="store_true", help="Output in JSON format" ) verify_parser.add_argument( "--dry-run", action="store_true", help="Show what would be done without doing it", ) verify_parser.add_argument( "--remote", action="store_true", help="Also verify existence of objects in cloud storage", ) verify_parser.add_argument( "--fix", action="store_true", help="Attempt to fix missing remote objects by re-uploading from local cache", ) # clean command clean_parser = subparsers.add_parser( "clean", help="Remove unreferenced cache objects" ) clean_parser.add_argument( "--dry-run", action="store_true", help="Show what would be deleted without deleting", ) clean_parser.add_argument( "--yes", "-y", action="store_true", help="Skip confirmation prompt" ) # lookup command lookup_parser = subparsers.add_parser( "lookup", help="Find files by partial hash or object key" ) lookup_parser.add_argument( "query", help="Partial hash or object key to search for" ) # repair command repair_parser = subparsers.add_parser( "repair", help="Automatically fix common issues (orphans, 404s)" ) repair_parser.add_argument( "--dry-run", action="store_true", help="Show what would be repaired without doing it", ) try: args = parser.parse_args(argv) except SystemExit as e: # argparse calls sys.exit() on --help or errors return e.code if isinstance(e.code, int) else 1 except Exception: return 1 # Setup logging based on verbosity setup_logging(verbosity=args.verbose, log_file=True) if args.command is None: parser.print_help() return 0 # Handle auth command separately (doesn't need repo structure) if args.command == "auth": if args.auth_command == "gdrive": repo_root = Path.cwd() vlfs_dir, _ = resolve_paths(repo_root) ensure_dirs(vlfs_dir, repo_root / ".vlfs-cache") return auth_gdrive(vlfs_dir) else: auth_parser.print_help() return 0 dry_run = getattr(args, "dry_run", False) json_output = getattr(args, "json", False) # Resolve paths and ensure structure repo_root = Path.cwd() vlfs_dir, cache_dir = resolve_paths(repo_root) ensure_dirs(vlfs_dir, cache_dir) ensure_gitignore(repo_root) warn_if_secrets_in_repo(vlfs_dir) # Set rclone config path if available # Check user dir first, then legacy user_config_path = get_user_config_dir() / "rclone.conf" legacy_config_path = vlfs_dir / "rclone.conf" if user_config_path.exists(): set_rclone_config_path(user_config_path) else: set_rclone_config_path(None) if args.command == "status": return cmd_status(repo_root, vlfs_dir, dry_run, json_output, args.color, args.verbose) elif args.command == "ls": return cmd_list( repo_root, vlfs_dir, getattr(args, "long", False), getattr(args, "remote", None), json_output, getattr(args, "pattern", None), ) elif args.command == "verify": return cmd_verify( repo_root, vlfs_dir, cache_dir, getattr(args, "remote", False), getattr(args, "fix", False), dry_run, json_output, args.verbose, ) elif args.command == "clean": return cmd_clean(repo_root, vlfs_dir, cache_dir, dry_run, args.yes, args.verbose) elif args.command == "lookup": return cmd_lookup(repo_root, vlfs_dir, args.query) elif args.command == "repair": return cmd_repair(repo_root, vlfs_dir, cache_dir, dry_run, args.verbose) elif args.command == "remove": return cmd_remove( repo_root, vlfs_dir, cache_dir, args.paths, args.force, dry_run, args.delete_file, args.verbose, ) elif args.command == "pull": return cmd_pull( repo_root, vlfs_dir, cache_dir, getattr(args, "force", False), dry_run, getattr(args, "path", None), getattr(args, "restore", False), args.verbose, ) elif args.command == "push": if args.all: return cmd_push_all(repo_root, vlfs_dir, cache_dir, args.private, dry_run, args.verbose) elif args.glob: return cmd_push_glob( repo_root, vlfs_dir, cache_dir, args.glob, args.private, dry_run, args.verbose ) elif args.paths: return cmd_push( repo_root, vlfs_dir, cache_dir, args.paths, args.private, dry_run, args.verbose ) else: print("Error: push requires a path, --glob, or --all", file=sys.stderr) return 1 return 0 if __name__ == "__main__": sys.exit(main())