#!/usr/bin/env python3 """ Author: Michal Szymanski v2.4 OSINT tool implementing real-time tracking of GitHub users activities including profile and repositories changes: https://github.com/misiektoja/github_monitor/ Python pip3 requirements: PyGithub requests python-dateutil pytz tzlocal (optional) python-dotenv (optional) """ VERSION = "2.4" # --------------------------- # CONFIGURATION SECTION START # --------------------------- CONFIG_BLOCK = """ # Get your GitHub personal access token (classic) by visiting: # https://github.com/settings/apps # # Then go to: Personal access tokens -> Tokens (classic) -> Generate new token (classic) # # Provide the GITHUB_TOKEN secret using one of the following methods: # - Pass it at runtime with -t / --github-token # - Set it as an environment variable (e.g. export GITHUB_TOKEN=...) # - Add it to ".env" file (GITHUB_TOKEN=...) for persistent use # - Fallback: hard-code it in the code or config file GITHUB_TOKEN = "your_github_classic_personal_access_token" # The URL of the GitHub API # # For Public Web GitHub use the default: https://api.github.com # For GitHub Enterprise change to: https://{your_hostname}/api/v3 # # Can also be set using the -x flag GITHUB_API_URL = "https://api.github.com" # The base URL of the GitHub web interface # Required to check if the profile is public or private # # For public GitHub use the default: https://github.com # For GitHub Enterprise change to: https://{your_hostname} GITHUB_HTML_URL = "https://github.com" # SMTP settings for sending email notifications # If left as-is, no notifications will be sent # # Provide the SMTP_PASSWORD secret using one of the following methods: # - Set it as an environment variable (e.g. export SMTP_PASSWORD=...) # - Add it to ".env" file (SMTP_PASSWORD=...) for persistent use # Fallback: # - Hard-code it in the code or config file SMTP_HOST = "your_smtp_server_ssl" SMTP_PORT = 587 SMTP_USER = "your_smtp_user" SMTP_PASSWORD = "your_smtp_password" SMTP_SSL = True SENDER_EMAIL = "your_sender_email" RECEIVER_EMAIL = "your_receiver_email" # Whether to send an email when user's profile changes # Can also be enabled via the -p flag PROFILE_NOTIFICATION = False # Whether to send an email when new GitHub events appear # Can also be enabled via the -s flag EVENT_NOTIFICATION = False # Whether to send an email when user's repositories change (stargazers, watchers, forks, issues, # PRs, description etc., except for update date) # Requires TRACK_REPOS_CHANGES to be enabled # Can also be enabled via the -q flag REPO_NOTIFICATION = False # Whether to send an email when user's repositories update date changes # Can also be enabled via the -u flag REPO_UPDATE_DATE_NOTIFICATION = False # Whether to send an email when user's daily contributions count changes # Requires TRACK_CONTRIB_CHANGES to be enabled # Can also be enabled via the -y flag CONTRIB_NOTIFICATION = False # Whether to send an email on errors # Can also be disabled via the -e flag ERROR_NOTIFICATION = True # How often to check for user profile changes / activities; in seconds # Can also be set using the -c flag GITHUB_CHECK_INTERVAL = 1800 # 30 mins # Set your local time zone so that GitHub API timestamps are converted accordingly (e.g. 'Europe/Warsaw') # Use this command to list all time zones supported by pytz: # python3 -c "import pytz; print('\\n'.join(pytz.all_timezones))" # If set to 'Auto', the tool will try to detect your local time zone automatically (requires tzlocal) LOCAL_TIMEZONE = 'Auto' # Events to monitor # Use 'ALL' to monitor all available event types EVENTS_TO_MONITOR = [ 'ALL', 'PushEvent', 'PullRequestEvent', 'PullRequestReviewEvent', 'PullRequestReviewCommentEvent', 'IssueCommentEvent', 'IssuesEvent', 'CommitCommentEvent', 'CreateEvent', 'DeleteEvent', 'ForkEvent', 'PublicEvent', 'GollumEvent', 'MemberEvent', 'WatchEvent', 'ReleaseEvent', 'DeploymentEvent', 'CheckRunEvent', 'WorkflowRunEvent', ] # Number of recent events to fetch when a change in the last event ID is detected # Note: if more than EVENTS_NUMBER events occur between two checks, # any events older than the most recent EVENTS_NUMBER will be missed EVENTS_NUMBER = 30 # 1 page # If True, track user's repository changes (changed stargazers, watchers, forks, description, update date etc.) # Can also be enabled using the -j flag TRACK_REPOS_CHANGES = False # Repositories to monitor when TRACK_REPOS_CHANGES is enabled # Use 'ALL' to monitor all repositories (default behavior) # Use 'user/repo_name' format to monitor specific repositories for specific users # If the current user matches the user in the list, that repository will be monitored # Example: ['user1/repo1', 'user2/repo2', 'user1/repo3'] # Can also be set using the --repos flag (comma-separated repo names only, without user prefix) # Example: --repos "repo1,repo2,repo3" # Note: When using a specific list (not 'ALL'), newly created repositories will NOT be # automatically monitored - only repositories explicitly listed here will be monitored. REPOS_TO_MONITOR = ['ALL'] # If True, disable event monitoring # Can also be disabled using the -k flag DO_NOT_MONITOR_GITHUB_EVENTS = False # If True, fetch all user repos (owned, forks, collaborations); otherwise, fetch only owned repos GET_ALL_REPOS = False # Alert about blocked (403 - TOS violation and 451 - DMCA block) repos in the console output (in monitoring mode) # In listing mode (-r), blocked repos are always shown BLOCKED_REPOS = False # If True, track and log user's daily contributions count changes # Can also be enabled using the -m flag TRACK_CONTRIB_CHANGES = False # How often to print a "liveness check" message to the output; in seconds # Set to 0 to disable LIVENESS_CHECK_INTERVAL = 43200 # 12 hours # URL used to verify internet connectivity at startup CHECK_INTERNET_URL = GITHUB_API_URL # Timeout used when checking initial internet connectivity; in seconds CHECK_INTERNET_TIMEOUT = 5 # CSV file to write new events & profile changes # Can also be set using the -b flag CSV_FILE = "" # Location of the optional dotenv file which can keep secrets # If not specified it will try to auto-search for .env files # To disable auto-search, set this to the literal string "none" # Can also be set using the --env-file flag DOTENV_FILE = "" # Base name for the log file. Output will be saved to github_monitor_.log # Can include a directory path to specify the location, e.g. ~/some_dir/github_monitor GITHUB_LOGFILE = "github_monitor" # Whether to disable logging to github_monitor_.log # Can also be disabled via the -d flag DISABLE_LOGGING = False # Width of main horizontal line HORIZONTAL_LINE1 = 105 # Width of horizontal line for repositories list output HORIZONTAL_LINE2 = 80 # Whether to clear the terminal screen after starting the tool CLEAR_SCREEN = True # Maximum number of times to retry a failed GitHub API/network call NET_MAX_RETRIES = 5 # Base number of seconds to wait before each retry, multiplied by the attempt count NET_BASE_BACKOFF_SEC = 5 # Value used by signal handlers increasing/decreasing profile/user activity check (GITHUB_CHECK_INTERVAL); in seconds GITHUB_CHECK_SIGNAL_VALUE = 60 # 1 minute """ # ------------------------- # CONFIGURATION SECTION END # ------------------------- # Default dummy values so linters shut up # Do not change values below - modify them in the configuration section or config file instead GITHUB_TOKEN = "" GITHUB_API_URL = "" GITHUB_HTML_URL = "" SMTP_HOST = "" SMTP_PORT = 0 SMTP_USER = "" SMTP_PASSWORD = "" SMTP_SSL = False SENDER_EMAIL = "" RECEIVER_EMAIL = "" PROFILE_NOTIFICATION = False EVENT_NOTIFICATION = False REPO_NOTIFICATION = False REPO_UPDATE_DATE_NOTIFICATION = False CONTRIB_NOTIFICATION = False ERROR_NOTIFICATION = False GITHUB_CHECK_INTERVAL = 0 LOCAL_TIMEZONE = "" EVENTS_TO_MONITOR = [] EVENTS_NUMBER = 0 TRACK_REPOS_CHANGES = False REPOS_TO_MONITOR = [] DO_NOT_MONITOR_GITHUB_EVENTS = False GET_ALL_REPOS = False BLOCKED_REPOS = False TRACK_CONTRIB_CHANGES = False LIVENESS_CHECK_INTERVAL = 0 CHECK_INTERNET_URL = "" CHECK_INTERNET_TIMEOUT = 0 CSV_FILE = "" DOTENV_FILE = "" GITHUB_LOGFILE = "" DISABLE_LOGGING = False HORIZONTAL_LINE1 = 0 HORIZONTAL_LINE2 = 0 CLEAR_SCREEN = False NET_MAX_RETRIES = 0 NET_BASE_BACKOFF_SEC = 0 GITHUB_CHECK_SIGNAL_VALUE = 0 exec(CONFIG_BLOCK, globals()) # Default name for the optional config file DEFAULT_CONFIG_FILENAME = "github_monitor.conf" # List of secret keys to load from env/config SECRET_KEYS = ("GITHUB_TOKEN", "SMTP_PASSWORD") LIVENESS_CHECK_COUNTER = LIVENESS_CHECK_INTERVAL / GITHUB_CHECK_INTERVAL stdout_bck = None csvfieldnames = ['Date', 'Type', 'Name', 'Old', 'New'] CLI_CONFIG_PATH = None # Maximum length for event body text (issue bodies, comment bodies, etc.) before truncation # Text longer than this will be truncated with safe HTML tag closing MAX_EVENT_BODY_LENGTH = 3500 # to solve the issue: 'SyntaxError: f-string expression part cannot include a backslash' nl_ch = "\n" import sys if sys.version_info < (3, 10): print("* Error: Python version 3.10 or higher required !") sys.exit(1) import time import string import os from datetime import datetime, timezone, date from dateutil import relativedelta from dateutil.parser import isoparse import calendar import requests as req import signal import smtplib import ssl from email.header import Header from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import argparse import csv try: import pytz except ModuleNotFoundError: raise SystemExit("Error: Couldn't find the pytz library !\n\nTo install it, run:\n pip3 install pytz\n\nOnce installed, re-run this tool") try: from tzlocal import get_localzone except ImportError: get_localzone = None import platform import re import ipaddress import html try: from github import Github, Auth, GithubException, UnknownObjectException from github.GithubException import RateLimitExceededException from github.GithubException import BadCredentialsException except ModuleNotFoundError: raise SystemExit("Error: Couldn't find the PyGitHub library !\n\nTo install it, run:\n pip3 install PyGithub\n\nOnce installed, re-run this tool. For more help, visit:\nhttps://github.com/PyGithub/PyGithub") from itertools import islice import textwrap import urllib3 import socket from typing import Any, Callable import shutil from pathlib import Path from typing import Optional import datetime as dt import requests NET_ERRORS = ( req.exceptions.RequestException, urllib3.exceptions.HTTPError, socket.gaierror, GithubException, ) # Logger class to output messages to stdout and log file class Logger(object): def __init__(self, filename): self.terminal = sys.stdout self.logfile = open(filename, "a", buffering=1, encoding="utf-8") def write(self, message): self.terminal.write(message) self.logfile.write(message) self.terminal.flush() self.logfile.flush() def flush(self): pass # Signal handler when user presses Ctrl+C def signal_handler(sig, frame): sys.stdout = stdout_bck print('\n* You pressed Ctrl+C, tool is terminated.') sys.exit(0) # Checks internet connectivity def check_internet(url=CHECK_INTERNET_URL, timeout=CHECK_INTERNET_TIMEOUT): try: _ = req.get(url, timeout=timeout) return True except req.RequestException as e: print(f"* No connectivity, please check your network:\n\n{e}") return False # Clears the terminal screen def clear_screen(enabled=True): if not enabled: return try: if platform.system() == 'Windows': os.system('cls') else: os.system('clear') except Exception: print("* Cannot clear the screen contents") # Converts absolute value of seconds to human readable format def display_time(seconds, granularity=2): intervals = ( ('years', 31556952), # approximation ('months', 2629746), # approximation ('weeks', 604800), # 60 * 60 * 24 * 7 ('days', 86400), # 60 * 60 * 24 ('hours', 3600), # 60 * 60 ('minutes', 60), ('seconds', 1), ) result = [] if seconds > 0: for name, count in intervals: value = seconds // count if value: seconds -= value * count if value == 1: name = name.rstrip('s') result.append(f"{value} {name}") return ', '.join(result[:granularity]) else: return '0 seconds' # Calculates time span between two timestamps, accepts timestamp integers, floats and datetime objects def calculate_timespan(timestamp1, timestamp2, show_weeks=True, show_hours=True, show_minutes=True, show_seconds=True, granularity=3): result = [] intervals = ['years', 'months', 'weeks', 'days', 'hours', 'minutes', 'seconds'] ts1 = timestamp1 ts2 = timestamp2 if isinstance(timestamp1, str): try: timestamp1 = isoparse(timestamp1) except Exception: return "" if isinstance(timestamp1, int): dt1 = datetime.fromtimestamp(int(ts1), tz=timezone.utc) elif isinstance(timestamp1, float): ts1 = int(round(ts1)) dt1 = datetime.fromtimestamp(ts1, tz=timezone.utc) elif isinstance(timestamp1, datetime): dt1 = timestamp1 if dt1.tzinfo is None: dt1 = pytz.utc.localize(dt1) else: dt1 = dt1.astimezone(pytz.utc) ts1 = int(round(dt1.timestamp())) else: return "" if isinstance(timestamp2, str): try: timestamp2 = isoparse(timestamp2) except Exception: return "" if isinstance(timestamp2, int): dt2 = datetime.fromtimestamp(int(ts2), tz=timezone.utc) elif isinstance(timestamp2, float): ts2 = int(round(ts2)) dt2 = datetime.fromtimestamp(ts2, tz=timezone.utc) elif isinstance(timestamp2, datetime): dt2 = timestamp2 if dt2.tzinfo is None: dt2 = pytz.utc.localize(dt2) else: dt2 = dt2.astimezone(pytz.utc) ts2 = int(round(dt2.timestamp())) else: return "" if ts1 >= ts2: ts_diff = ts1 - ts2 else: ts_diff = ts2 - ts1 dt1, dt2 = dt2, dt1 if ts_diff > 0: date_diff = relativedelta.relativedelta(dt1, dt2) years = date_diff.years months = date_diff.months days_total = date_diff.days if show_weeks: weeks = days_total // 7 days = days_total % 7 else: weeks = 0 days = days_total hours = date_diff.hours if show_hours or ts_diff <= 86400 else 0 minutes = date_diff.minutes if show_minutes or ts_diff <= 3600 else 0 seconds = date_diff.seconds if show_seconds or ts_diff <= 60 else 0 date_list = [years, months, weeks, days, hours, minutes, seconds] for index, interval in enumerate(date_list): if interval > 0: name = intervals[index] if interval == 1: name = name.rstrip('s') result.append(f"{interval} {name}") return ', '.join(result[:granularity]) else: return '0 seconds' # Sanitizes HTML content, preserving safe tags while removing dangerous ones def sanitize_and_preserve_html(text, convert_line_breaks=True, repo_url=None): if not text: return "" safe_tags = { 'details': ['open'], 'summary': [], 'ul': [], 'ol': [], 'li': [], 'a': ['href', 'title'], 'code': [], 'pre': [], 'p': [], 'br': [], 'strong': [], 'b': [], 'em': [], 'i': [], 's': [], 'strike': [], 'del': [], 'img': ['src', 'alt', 'title'], 'blockquote': [], 'hr': [], } code_blocks = [] code_block_pattern = r'```([\s\S]*?)```' code_block_counter = 0 def replace_code_block(match): nonlocal code_block_counter code_content = match.group(1) placeholder = f"__CODE_BLOCK_{code_block_counter}__" code_blocks.append(('
' + html.escape(code_content) + '
', placeholder)) code_block_counter += 1 return placeholder text = re.sub(code_block_pattern, replace_code_block, text) # Pattern to match HTML tags including multiline (use [\s\S]*? to match any char including newlines) tag_pattern = r'<(/)?([a-z][a-z0-9]*)([\s\S]*?)>' def sanitize_tag(match): closing = match.group(1) == '/' tag_name = match.group(2).lower() attrs_str = match.group(3) if match.group(3) else '' if closing: return f'' if tag_name in safe_tags else '' if tag_name not in safe_tags: return '' allowed_attrs = safe_tags[tag_name] if not allowed_attrs and attrs_str: return f'<{tag_name}>' attr_pattern = r'(\w+)=["\']([^"\']*)["\']' safe_attrs = [] for attr_match in re.finditer(attr_pattern, attrs_str): attr_name = attr_match.group(1).lower() attr_value = attr_match.group(2) if attr_name in allowed_attrs: if attr_name == 'href' or attr_name == 'src': if attr_value.startswith(('http://', 'https://', 'mailto:', '#')): safe_attrs.append(f'{attr_name}="{html.escape(attr_value)}"') else: safe_attrs.append(f'{attr_name}="{html.escape(attr_value)}"') if safe_attrs: return f'<{tag_name} {" ".join(safe_attrs)}>' else: return f'<{tag_name}>' sanitized = re.sub(tag_pattern, sanitize_tag, text, flags=re.IGNORECASE) temp_markers = [] for idx, (code_html, placeholder) in enumerate(code_blocks): temp_marker = f"__TEMP_CODE_{idx}__" temp_markers.append((temp_marker, code_html)) sanitized = sanitized.replace(placeholder, temp_marker) protected_tags = [] tag_counter = 0 valid_tag_pattern = r']*)?>' def protect_tag(match): nonlocal tag_counter protected_tags.append(match.group(0)) result = f"__PROTECTED_TAG_{tag_counter}__" tag_counter += 1 return result sanitized = re.sub(valid_tag_pattern, protect_tag, sanitized, flags=re.IGNORECASE) sanitized = sanitized.replace('<', '<').replace('>', '>') for idx, tag in enumerate(protected_tags): sanitized = sanitized.replace(f"__PROTECTED_TAG_{idx}__", tag) for temp_marker, code_html in temp_markers: sanitized = sanitized.replace(temp_marker, code_html) if convert_line_breaks: lines = sanitized.split('\n') result_lines = [] prev_was_block = False prev_was_empty = False for i, line in enumerate(lines): stripped = line.strip() is_block = bool(re.search(r'<(details|summary|ul|ol|li|pre|blockquote|hr|p)[\s>]', stripped, re.IGNORECASE)) if not stripped: if not prev_was_empty and not prev_was_block: result_lines.append('
') prev_was_empty = True prev_was_block = False else: if is_block: result_lines.append(line) prev_was_block = True prev_was_empty = False else: if not prev_was_block and result_lines and not prev_was_empty: result_lines.append('
') result_lines.append(line) prev_was_block = False prev_was_empty = False sanitized = ''.join(result_lines) return sanitized # Sanitizes a single HTML tag def sanitize_single_html_tag(html_tag): safe_tags = { 'details': ['open'], 'summary': [], 'ul': [], 'ol': [], 'li': [], 'a': ['href', 'title'], 'code': [], 'pre': [], 'p': [], 'br': [], 'strong': [], 'b': [], 'em': [], 'i': [], 's': [], 'strike': [], 'del': [], 'img': ['src', 'alt', 'title'], 'blockquote': [], 'hr': [], } # Pattern to match HTML tags including multiline (use [\s\S]*? to match any char including newlines) tag_pattern = r'<(/)?([a-z][a-z0-9]*)([\s\S]*?)>' match = re.match(tag_pattern, html_tag, re.IGNORECASE) if not match: return html.escape(html_tag) closing = match.group(1) == '/' tag_name = match.group(2).lower() attrs_str = match.group(3) if match.group(3) else '' if closing: return f'' if tag_name in safe_tags else '' if tag_name not in safe_tags: return '' allowed_attrs = safe_tags[tag_name] if not allowed_attrs and attrs_str: return f'<{tag_name}>' # Extract attributes, handling whitespace/newlines before attribute names attr_pattern = r'\s*(\w+)=["\']([^"\']*)["\']' safe_attrs = [] for attr_match in re.finditer(attr_pattern, attrs_str): attr_name = attr_match.group(1).lower() attr_value = attr_match.group(2) if attr_name in allowed_attrs: if attr_name == 'href' or attr_name == 'src': if attr_value.startswith(('http://', 'https://', 'mailto:', '#')): safe_attrs.append(f'{attr_name}="{html.escape(attr_value)}"') else: safe_attrs.append(f'{attr_name}="{html.escape(attr_value)}"') if safe_attrs: return f'<{tag_name} {" ".join(safe_attrs)}>' else: return f'<{tag_name}>' # Converts markdown text to HTML def markdown_to_html(text, convert_line_breaks=True, repo_url=None): if not text: return "" # Pattern to match HTML tags (both opening and closing), including those that span multiple lines # Matches: or with attributes that may span lines html_tag_pattern = r'' has_html = bool(re.search(html_tag_pattern, text, re.IGNORECASE)) # Protect code blocks first code_blocks = [] code_block_pattern = r'```([\s\S]*?)```' code_block_counter = 0 def replace_code_block(match): nonlocal code_block_counter code_content = match.group(1) placeholder = f"__CODE_BLOCK_{code_block_counter}__" code_blocks.append(('
' + html.escape(code_content) + '
', placeholder)) code_block_counter += 1 return placeholder text = re.sub(code_block_pattern, replace_code_block, text) # If HTML is present, protect HTML tags during markdown processing, then sanitize them html_tags = [] if has_html: html_tag_counter = 0 def protect_html_tag(match): nonlocal html_tag_counter html_tags.append(match.group(0)) # Use a placeholder that won't be processed by markdown (no underscores, asterisks, etc.) result = f"PROTECTEDHTMLTAG{html_tag_counter}PROTECTED" html_tag_counter += 1 return result text = re.sub(html_tag_pattern, protect_html_tag, text, flags=re.IGNORECASE) # Protect markdown link/image patterns from HTML escaping and italic processing markdown_pattern_placeholders = [] markdown_pattern_counter = 0 def protect_markdown_pattern(match): nonlocal markdown_pattern_counter markdown_pattern_placeholders.append(match.group(0)) result = f"__MARKDOWN_PATTERN_{markdown_pattern_counter}__" markdown_pattern_counter += 1 return result # Protect image links, images, and links before HTML escaping text = re.sub(r'\[!\[([^\]]*)\]\(([^\)]+)\)\]\(([^\)]+)\)', protect_markdown_pattern, text) text = re.sub(r'!\[([^\]]*)\]\(([^\)]+)\)', protect_markdown_pattern, text) text = re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', protect_markdown_pattern, text) # Escape HTML (but code blocks, protected HTML tags, and markdown patterns are already protected) html_text = html.escape(text) # Restore code blocks for code_html, placeholder in code_blocks: html_text = html_text.replace(placeholder, code_html) # Process block-level elements line by line lines = html_text.split('\n') processed_lines = [] in_list = False list_type = None # 'ul' or 'ol' list_items = [] i = 0 while i < len(lines): line = lines[i] stripped = line.strip() # Skip empty lines for now (we'll add them back later) if not stripped: if in_list: # Close current list if list_type == 'ul': processed_lines.append('
    ' + ''.join(list_items) + '
') else: processed_lines.append('
    ' + ''.join(list_items) + '
') in_list = False list_type = None list_items = [] processed_lines.append('') i += 1 continue # Horizontal rules (must be at least 3 dashes/asterisks) if re.match(r'^[-*_]{3,}$', stripped): processed_lines.append('
') i += 1 continue # Headers (# ## ### etc.) header_match = re.match(r'^(#{1,6})\s+(.+)$', stripped) if header_match: level = len(header_match.group(1)) header_text = header_match.group(2) processed_lines.append(f'{header_text}') i += 1 continue # Blockquotes (>) if stripped.startswith('>'): quote_text = stripped[1:].strip() processed_lines.append(f'
{quote_text}
') i += 1 continue # Lists - be careful not to match "- label:" patterns # Check for unordered list (- or *) list_match = re.match(r'^(\s*)([-*])\s+(.+)$', line) if list_match: list_content = list_match.group(3) # Don't treat as list if it looks like a label pattern: # - Ends with just a colon (with optional whitespace), OR # - Matches pattern like "Word:" or "Words:" followed by tabs/spaces (typical label format) is_label = (re.search(r':\s*$', list_content) or re.match(r'^[A-Z][a-zA-Z\s]+:\s+\S', list_content)) if not is_label: if not in_list or list_type != 'ul': if in_list: # Close previous list if list_type == 'ol': processed_lines.append('
    ' + ''.join(list_items) + '
') list_items = [] in_list = True list_type = 'ul' list_items.append(f'
  • {list_content}
  • ') i += 1 continue # Check for ordered list (1. 2. etc.) ordered_match = re.match(r'^(\s*)(\d+)\.\s+(.+)$', line) if ordered_match: list_content = ordered_match.group(3) if not in_list or list_type != 'ol': if in_list: # Close previous list if list_type == 'ul': processed_lines.append('
      ' + ''.join(list_items) + '
    ') list_items = [] in_list = True list_type = 'ol' list_items.append(f'
  • {list_content}
  • ') i += 1 continue # Not a list item, so close any open list if in_list: if list_type == 'ul': processed_lines.append('
      ' + ''.join(list_items) + '
    ') else: processed_lines.append('
      ' + ''.join(list_items) + '
    ') in_list = False list_type = None list_items = [] # Regular line processed_lines.append(line) i += 1 # Close any remaining open list if in_list: if list_type == 'ul': processed_lines.append('
      ' + ''.join(list_items) + '
    ') else: processed_lines.append('
      ' + ''.join(list_items) + '
    ') html_text = '\n'.join(processed_lines) # Process inline elements (but skip code blocks) # First, restore and process protected markdown patterns for idx, original_pattern in enumerate(markdown_pattern_placeholders): placeholder = f"__MARKDOWN_PATTERN_{idx}__" escaped_placeholder = html.escape(placeholder) # Check both escaped and unescaped placeholders if placeholder in html_text or escaped_placeholder in html_text: # Process the original pattern (unescaped) # Image links image_link_match = re.match(r'\[!\[([^\]]*)\]\(([^\)]+)\)\]\(([^\)]+)\)', original_pattern) if image_link_match: alt_text = image_link_match.group(1) image_url = image_link_match.group(2) link_url = image_link_match.group(3) if repo_url: if image_url and not image_url.startswith(('http://', 'https://', 'data:', '#')): if image_url.startswith('/'): image_url = repo_url.rstrip('/') + '/blob/HEAD' + image_url else: image_url = repo_url.rstrip('/') + '/blob/HEAD/' + image_url if link_url and not link_url.startswith(('http://', 'https://', 'mailto:', '#')): if link_url.startswith('/'): link_url = repo_url.rstrip('/') + '/blob/HEAD' + link_url else: link_url = repo_url.rstrip('/') + '/blob/HEAD/' + link_url replacement = f'{html.escape(alt_text)}' html_text = html_text.replace(placeholder, replacement) html_text = html_text.replace(escaped_placeholder, replacement) continue # Images image_match = re.match(r'!\[([^\]]*)\]\(([^\)]+)\)', original_pattern) if image_match: alt_text = image_match.group(1) image_url = image_match.group(2) if repo_url and image_url and not image_url.startswith(('http://', 'https://', 'data:', '#')): if image_url.startswith('/'): absolute_url = repo_url.rstrip('/') + '/blob/HEAD' + image_url else: absolute_url = repo_url.rstrip('/') + '/blob/HEAD/' + image_url replacement = f'{html.escape(alt_text)}' else: replacement = f'{html.escape(alt_text)}' html_text = html_text.replace(placeholder, replacement) html_text = html_text.replace(escaped_placeholder, replacement) continue # Links link_match = re.match(r'\[([^\]]+)\]\(([^\)]+)\)', original_pattern) if link_match: link_text = link_match.group(1) link_url = link_match.group(2) if repo_url and link_url and not link_url.startswith(('http://', 'https://', 'mailto:', '#')): if link_url.startswith('/'): absolute_url = repo_url.rstrip('/') + '/blob/HEAD' + link_url else: absolute_url = repo_url.rstrip('/') + '/blob/HEAD/' + link_url replacement = f'{html.escape(link_text)}' else: replacement = f'{html.escape(link_text)}' html_text = html_text.replace(placeholder, replacement) html_text = html_text.replace(escaped_placeholder, replacement) continue # Process remaining markdown patterns that weren't protected (shouldn't happen, but for safety) image_link_pattern = r'\[!\[([^\]]*)\]\(([^\)]+)\)\]\(([^\)]+)\)' def convert_image_link(match): alt_text = html.unescape(match.group(1)) image_url = html.unescape(match.group(2)) link_url = html.unescape(match.group(3)) if repo_url: if image_url and not image_url.startswith(('http://', 'https://', 'data:', '#')): if image_url.startswith('/'): image_url = repo_url.rstrip('/') + '/blob/HEAD' + image_url else: image_url = repo_url.rstrip('/') + '/blob/HEAD/' + image_url if link_url and not link_url.startswith(('http://', 'https://', 'mailto:', '#')): if link_url.startswith('/'): link_url = repo_url.rstrip('/') + '/blob/HEAD' + link_url else: link_url = repo_url.rstrip('/') + '/blob/HEAD/' + link_url return f'{html.escape(alt_text)}' html_text = re.sub(image_link_pattern, convert_image_link, html_text) # Images image_pattern = r'!\[([^\]]*)\]\(([^\)]+)\)' def convert_image(match): alt_text = html.unescape(match.group(1)) image_url = html.unescape(match.group(2)) # Convert relative image URLs to absolute if repo_url is provided if repo_url and image_url and not image_url.startswith(('http://', 'https://', 'data:', '#')): # Use blob/HEAD/ for relative image URLs (GitHub requires branch in path) if image_url.startswith('/'): absolute_url = repo_url.rstrip('/') + '/blob/HEAD' + image_url else: absolute_url = repo_url.rstrip('/') + '/blob/HEAD/' + image_url return f'{html.escape(alt_text)}' return f'{html.escape(alt_text)}' html_text = re.sub(image_pattern, convert_image, html_text) # Links link_pattern = r'\[([^\]]+)\]\(([^\)]+)\)' def convert_link(match): link_text = html.unescape(match.group(1)) link_url = html.unescape(match.group(2)) # Convert relative links to absolute if repo_url is provided if repo_url and link_url and not link_url.startswith(('http://', 'https://', 'mailto:', '#')): # Use blob/HEAD/ for relative links (GitHub requires branch in path) if link_url.startswith('/'): absolute_url = repo_url.rstrip('/') + '/blob/HEAD' + link_url else: absolute_url = repo_url.rstrip('/') + '/blob/HEAD/' + link_url return f'{link_text}' return f'{link_text}' html_text = re.sub(link_pattern, convert_link, html_text) # Strikethrough html_text = re.sub(r'~~([^~]+)~~', r'\1', html_text) # Bold html_text = re.sub(r'\*\*([^*]+)\*\*', r'\1', html_text) html_text = re.sub(r'__([^_]+)__', r'\1', html_text) # Italic (must not be part of bold, and not inside HTML attributes) # Protect HTML attributes from italic processing attr_pattern_placeholders = [] attr_pattern_counter = 0 def protect_attr_pattern(match): nonlocal attr_pattern_counter attr_pattern_placeholders.append(match.group(0)) # Use placeholder without underscores to avoid italic processing result = f"ATTRPATTERN{attr_pattern_counter}ATTR" attr_pattern_counter += 1 return result # Protect URLs in HTML attributes (href="...", src="...", alt="...", title="...") # This prevents italic processing from affecting URLs inside HTML attributes html_text = re.sub(r'(href|src|alt|title)=["\']([^"\']+)["\']', protect_attr_pattern, html_text, flags=re.IGNORECASE) html_text = re.sub(r'(?\1', html_text) # Only match italic underscores when they're clearly markdown (word boundaries or spaces/punctuation) html_text = re.sub(r'(?\1', html_text) # Restore protected patterns for idx, pattern in enumerate(attr_pattern_placeholders): html_text = html_text.replace(f"ATTRPATTERN{idx}ATTR", pattern) # Inline code (but not inside code blocks) html_text = re.sub(r'`([^`]+)`', r'\1', html_text) # Protect URLs inside HTML attributes before converting plain URLs to links # This prevents convert_urls_to_links from converting URLs that are already in href/src/alt attributes attr_url_placeholders = [] attr_url_counter = 0 def protect_attr_url(match): nonlocal attr_url_counter full_match = match.group(0) attr_url_placeholders.append(full_match) result = f"__ATTR_URL_{attr_url_counter}__" attr_url_counter += 1 return result # Match URLs inside HTML attributes (href="...", src="...", alt="...", title="...") attr_url_pattern = r'(href|src|alt|title)=["\'](https?://[^"\']+)["\']' html_text = re.sub(attr_url_pattern, protect_attr_url, html_text, flags=re.IGNORECASE) # Convert plain URLs to links (avoid double-converting URLs already in tags) html_text = convert_urls_to_links(html_text) # Restore protected attribute URLs for idx, attr_url in enumerate(attr_url_placeholders): html_text = html_text.replace(f"__ATTR_URL_{idx}__", attr_url) # If HTML was detected, restore and sanitize the protected HTML tags BEFORE line break conversion # This allows the line break logic to properly detect HTML block elements if has_html and html_tags: for idx, original_html_tag in enumerate(html_tags): # Use the same placeholder format that was used during protection placeholder = f"PROTECTEDHTMLTAG{idx}PROTECTED" if placeholder in html_text: sanitized_tag = sanitize_single_html_tag(original_html_tag) # Only replace if sanitization returned a valid tag (not empty string) if sanitized_tag: html_text = html_text.replace(placeholder, sanitized_tag) else: # If tag was filtered out, escape the original tag instead html_text = html_text.replace(placeholder, html.escape(original_html_tag)) if convert_line_breaks: # Line-break handling that respects both text paragraphs and HTML blocks. # Rules: # - text -> blank -> text => one
    # - text -> blank -> HTML block =>

    (extra space before big block like
    ) # - HTML -> blank -> text => one
    # - HTML -> blank -> HTML block => no extra
    lines = html_text.split('\n') result_lines = [] prev_type = None # 'text' or 'html' prev_html_tag = None # last HTML tag name (e.g., 'ul', 'details', 'a') prev_html_had_image = False # whether previous HTML line contained an prev_text_line = None # stripped previous text line pending_blank = False for line in lines: stripped = line.strip() if not stripped: # Defer decision until we see what comes after the blank(s) pending_blank = True continue is_html = stripped.startswith('<') if is_html: # Detect tag name for smarter spacing rules tag_match = re.match(r'<\s*/?\s*([a-zA-Z0-9]+)', stripped) tag_name = tag_match.group(1).lower() if tag_match else None if prev_type == 'text' and pending_blank: # Text paragraph followed by blank line then HTML block # Only use double break before certain heavy blocks like
    if tag_name == 'details': result_lines.append('

    ') else: result_lines.append('
    ') # Always keep HTML lines as-is result_lines.append(line) prev_type = 'html' prev_html_tag = tag_name prev_html_had_image = (',
    , etc.) if prev_type == 'text': # Consecutive text paragraphs if pending_blank: # Blank line between text paragraphs -> full empty line result_lines.append('

    ') else: result_lines.append('
    ') elif prev_type == 'html' and pending_blank: # HTML block followed by blank line then text # Avoid extra break after lists (
      /
        ) which already have spacing if prev_html_tag not in ('ul', 'ol'): # If previous HTML line contained an image (e.g., badge), use double break if prev_html_had_image: result_lines.append('

        ') else: result_lines.append('
        ') result_lines.append(line) prev_type = 'text' prev_text_line = stripped pending_blank = False html_text = ''.join(result_lines) return html_text # Converts URLs in HTML-escaped text to clickable links def convert_urls_to_links(text): if not text: return text bracket_pattern = r'\[\s*(https?://[^\s\]]+)\s*\]' text = re.sub(bracket_pattern, r'
        \1', text) # Match URLs but not those inside HTML attributes (href="...", src="...", etc.) # This pattern avoids matching URLs that are already inside quotes after = (attribute values) url_pattern = r'(?)(?"]+)' text = re.sub(url_pattern, r'\1', text) return text # Converts issue/PR list items to HTML with clickable titles def convert_issue_pr_items_to_html(text, already_escaped=False): if not text: return text pattern = r'(- )?#(\d+)\s+([^(]+?)\s+\(([^)]+)\)\s+(?:\[\s*)?(https?://[^\s\]]+)(?:\s*\])?' def replace_item(match): prefix = match.group(1) or "" number = match.group(2) title = match.group(3).strip() user = match.group(4) url = match.group(5) if already_escaped: escaped_title = title escaped_user = user escaped_url = url else: escaped_title = html.escape(title) escaped_user = html.escape(user) escaped_url = html.escape(url) return f'{prefix}#{number} {escaped_title} ({escaped_user})' return re.sub(pattern, replace_item, text) # Converts plain text to HTML, preserving line breaks and formatting def text_to_html(text, preserve_newlines=True, convert_urls=True, convert_issue_pr=True): if not text: return "" html_text = html.escape(text) if convert_issue_pr: html_text = convert_issue_pr_items_to_html(html_text, already_escaped=True) if convert_urls: html_text = convert_urls_to_links(html_text) if preserve_newlines: html_text = html_text.replace('\n', '
        ') return html_text # Formats email body text to HTML def format_email_body_html(body_text, bold_keys=None): if not body_text: return "" html_text = text_to_html(body_text, preserve_newlines=True) if bold_keys: for key in bold_keys: if key: escaped_key = html.escape(key) html_text = re.sub( re.escape(escaped_key), lambda m: f'{m.group(0)}', html_text, flags=re.IGNORECASE ) return html_text # Converts event text to HTML, handling markdown in specific fields def event_text_to_html(event_text, event_type=None, event_payload=None): if not event_text: return "" # Extract repo URL from event text if available repo_url = None for line in event_text.split('\n'): if 'Repo URL:' in line: # Extract URL from line like "Repo URL: https://github.com/user/repo" match = re.search(r'Repo URL:\s*(https?://[^\s]+)', line) if match: repo_url = match.group(1) break lines = event_text.split('\n') html_lines = [] commit_message_style = ( "background-color: #f8f8f8; padding: 3px 3px; border-radius: 4px; font-size: 1.00em;" ) def style_commit_html(message_html): if not message_html: return "" return f'{message_html}' def remove_closing_quote(text): idx = text.rfind("'") if idx == -1: return text return text[:idx] + text[idx + 1:] def extract_quoted_message(start_index, initial_fragment): fragments = [] fragment = initial_fragment if fragment: fragment_stripped = fragment.rstrip() if fragment_stripped.endswith("'"): fragments.append(remove_closing_quote(fragment)) return '\n'.join(fragments), start_index + 1 fragments.append(fragment) idx = start_index + 1 while idx < len(lines): current_line = lines[idx] stripped = current_line.strip() if stripped.endswith("'") and (stripped == "'" or not stripped.startswith("'")): fragments.append(remove_closing_quote(current_line)) idx += 1 break else: fragments.append(current_line) idx += 1 return '\n'.join(fragments), idx i = 0 while i < len(lines): line = lines[i] if "Release notes:" in line: if event_payload and event_payload.get("release"): release_body = event_payload["release"].get("body", "") if release_body: release_html = markdown_to_html(release_body, convert_line_breaks=True, repo_url=repo_url) prefix = line.split("Release notes:")[0].replace('\t', ' ') html_lines.append(f"{html.escape(prefix)}Release notes:

        '{release_html}'") i += 1 # Skip all lines until we find the closing quote (end of release notes in plaintext) while i < len(lines): if lines[i].strip().endswith("'") and not lines[i].strip().startswith("'"): i += 1 break i += 1 continue if "Commit message:" in line or "- Commit message:" in line or "Commit full message:" in line or "- Commit full message:" in line: if ("Commit full message:" in line or "- Commit full message:" in line) and "'" not in line: prefix = line.replace('\t', ' ').strip() is_dash_variant = "- Commit full message:" in prefix if is_dash_variant: prefix_clean = prefix.split("- Commit full message:")[0] label_html = "- Commit full message:" else: prefix_clean = prefix.split("Commit full message:")[0] label_html = "Commit full message:" i += 1 while i < len(lines) and not lines[i].strip(): i += 1 if i < len(lines) and "'" in lines[i]: message_line = lines[i] parts = message_line.split("'", 1) if len(parts) > 1: message_text, next_index = extract_quoted_message(i, parts[1]) message_html = markdown_to_html(message_text, convert_line_breaks=True, repo_url=repo_url) styled_message = style_commit_html(message_html) html_lines.append(f"{html.escape(prefix_clean)}{label_html}

        '{styled_message}'") i = next_index continue elif "'" in line: parts = line.split("'", 1) if len(parts) > 1: prefix = parts[0].replace('\t', ' ') message_text, next_index = extract_quoted_message(i, parts[1]) message_html = markdown_to_html(message_text, convert_line_breaks=True, repo_url=repo_url) styled_message = style_commit_html(message_html) if "- Commit full message:" in prefix: prefix_clean = prefix.split("- Commit full message:")[0] html_lines.append(f"{html.escape(prefix_clean)}- Commit full message:

        '{styled_message}'") elif "Commit full message:" in prefix: prefix_clean = prefix.split("Commit full message:")[0] html_lines.append(f"{html.escape(prefix_clean)}Commit full message:

        '{styled_message}'") elif "- Commit message:" in prefix: prefix_clean = prefix.split("- Commit message:")[0] html_lines.append(f"{html.escape(prefix_clean)}- Commit message: {styled_message}") else: prefix_clean = prefix.split("Commit message:")[0] html_lines.append(f"{html.escape(prefix_clean)}Commit message: {styled_message}") i = next_index continue if "PR description:" in line: # Case 1: label and description on the same line if "'" in line: parts = line.split("'", 1) if len(parts) > 1: prefix = parts[0].replace('\t', ' ') description = parts[1] if description.endswith("'"): description = description.rstrip("'") description_html = markdown_to_html(description, convert_line_breaks=True, repo_url=repo_url) html_lines.append(f"{html.escape(prefix)}PR description: '{description_html}'") else: description_lines = [description] i += 1 while i < len(lines) and not lines[i].strip().endswith("'"): description_lines.append(lines[i]) i += 1 if i < len(lines): description_lines.append(lines[i].rstrip("'")) description = '\n'.join(description_lines) description_html = markdown_to_html(description, convert_line_breaks=True, repo_url=repo_url) html_lines.append(f"{html.escape(prefix)}PR description: '{description_html}'") i += 1 continue else: # Case 2: label on its own line, description in following quoted block prefix = line.split("PR description:")[0].replace('\t', ' ') i += 1 # Skip empty lines while i < len(lines) and not lines[i].strip(): i += 1 if i < len(lines) and "'" in lines[i]: first = lines[i] parts = first.split("'", 1) if len(parts) > 1: description_text, next_index = extract_quoted_message(i, parts[1]) description_html = markdown_to_html(description_text, convert_line_breaks=True, repo_url=repo_url) html_lines.append(f"{html.escape(prefix)}PR description:

        '{description_html}'") i = next_index continue # Handle body-like blocks, bolding the label and interpreting markdown inside the quoted body body_keyword_matched = False # Special handling for "Previous comment:" which has an intermediate "↳ In reply to..." line if "Previous comment:" in line: line_stripped = line.strip() if line_stripped == "Previous comment:" or line_stripped.startswith("Previous comment:"): prefix = line.split("Previous comment:")[0].replace('\t', ' ') i += 1 # Skip blank lines while i < len(lines) and not lines[i].strip(): i += 1 # Collect all non-empty lines until we hit the quoted body reply_lines = [] while i < len(lines) and not (lines[i].strip().startswith("'") or (lines[i] and lines[i][0] in [' ', '\t'] and "'" in lines[i])): if lines[i].strip(): # Only collect non-empty lines reply_lines.append(lines[i].strip()) i += 1 # Skip blank lines before body while i < len(lines) and not lines[i].strip(): i += 1 # Now look for the quoted body if i < len(lines) and "'" in lines[i]: first = lines[i] parts = first.split("'", 1) if len(parts) > 1: body_text, next_index = extract_quoted_message(i, parts[1]) body_html = markdown_to_html(body_text, convert_line_breaks=True, repo_url=repo_url) # Build the output with reply lines if present if reply_lines: reply_html = "
        ".join(html.escape(rl) for rl in reply_lines) html_lines.append(f"{html.escape(prefix)}Previous comment:

        {reply_html}

        '{body_html}'") else: html_lines.append(f"{html.escape(prefix)}Previous comment:

        '{body_html}'") i = next_index body_keyword_matched = True # Handle other body keywords (Issue body, Comment body, Review body) if not body_keyword_matched: for keyword in ["Issue body:", "Comment body:", "Review body:"]: if keyword in line: # Case 1: label and body on the same line if "'" in line and line.strip().startswith(keyword): parts = line.split("'", 1) if len(parts) > 1: prefix = parts[0].replace('\t', ' ') body_content = parts[1] if body_content.endswith("'"): body_content = body_content.rstrip("'") body_html = markdown_to_html(body_content, convert_line_breaks=True, repo_url=repo_url) html_lines.append(f"{html.escape(prefix)}{html.escape(keyword)} '{body_html}'") i += 1 body_keyword_matched = True break # Case 2: label on its own line, body in a following quoted block if line.strip().startswith(keyword): prefix = line.split(keyword)[0].replace('\t', ' ') i += 1 # Skip blank lines while i < len(lines) and not lines[i].strip(): i += 1 # Look for the first line that starts the quoted body if i < len(lines) and "'" in lines[i]: first = lines[i] parts = first.split("'", 1) if len(parts) > 1: body_text, next_index = extract_quoted_message(i, parts[1]) body_html = markdown_to_html(body_text, convert_line_breaks=True, repo_url=repo_url) html_lines.append(f"{html.escape(prefix)}{html.escape(keyword)}

        '{body_html}'") i = next_index body_keyword_matched = True break if body_keyword_matched: continue line_no_tabs = line.replace('\t', ' ').strip() if not line_no_tabs: html_lines.append('') i += 1 continue # Check for PR header BEFORE key-value pattern (PR headers match key-value pattern) if line_no_tabs.startswith('===') and line_no_tabs.endswith('==='): pr_match = re.match(r'^=== PR #(\d+): (.+?) ===$', line_no_tabs) if pr_match: pr_number = pr_match.group(1) pr_title = pr_match.group(2) # Convert markdown in PR title (e.g., **bold**, *italic*) pr_title_html = markdown_to_html(pr_title, convert_line_breaks=False, repo_url=repo_url) html_lines.append(f'=== PR #{pr_number}: {pr_title_html} ===') i += 1 continue key_value_pattern = r'^(.+?):\s+(.+)$' match = re.match(key_value_pattern, line_no_tabs) if match: label = match.group(1) value = match.group(2) label_html = f"{html.escape(label)}:" # Remove apostrophes from Description field if label.strip() == "Description" and value.startswith("'") and value.endswith("'"): value = value[1:-1] # Remove first and last character (apostrophes) # Check if this is a date field that should be highlighted date_labels = ['Event date', '- Commit date', 'Created at', 'Closed at', 'Merged at', 'Published at', 'Issue date', 'Comment date', 'Review submitted at'] is_date_label = any(label.strip() == date_label or label.strip().endswith(date_label) for date_label in date_labels) # Check if this is an identifier field that should not get markdown conversion # These fields contain identifiers (repo names, file names, etc.) that should be escaped, not markdown-processed identifier_labels = ['Repo name', 'Asset name', 'Branch name', 'Tag name', 'File name', 'Release tag name'] is_identifier_label = any(label.strip() == identifier_label or label.strip().endswith(identifier_label) for identifier_label in identifier_labels) if value.startswith('http://') or value.startswith('https://'): escaped_url = html.escape(value) html_lines.append(f"{label_html} {escaped_url}") else: # Apply date styling if applicable if is_date_label: # Check if value contains " by " (e.g., "Merged at: ... by username") if ' by ' in value: date_part, by_part = value.split(' by ', 1) value_to_style = html.escape(date_part) suffix = html.escape(' by ' + by_part) else: value_to_style = html.escape(value) suffix = '' # Bold only the time duration part (e.g., "2 hours, 27 minutes"), not "after" value_html = re.sub( r'\(after\s+([^:]+):\s+([^)]+)\)', r'(after \1: \2)', value_to_style ) date_message_style = ( "background-color: #f0f0f0; padding: 2px 4px; border-radius: 3px; font-family: monospace; font-size: 1.00em;" ) value_html = f'{value_html}{suffix}' elif is_identifier_label: value_html = html.escape(value) else: # Non-date, non-identifier values get markdown + URL conversion value_html = markdown_to_html(value, convert_line_breaks=False, repo_url=repo_url) value_html = convert_urls_to_links(value_html) html_lines.append(f"{label_html} {value_html}") else: section_headers = [ 'Changed files list:', 'Removed files:', 'Added files:', 'Modified files:', 'Closed issues:', 'Opened issues:', 'Reopened issues:', 'Closed pull requests:', 'Opened pull requests:', 'Created tags:', 'Deleted tags:', 'Created branches:', 'Deleted branches:', 'Assets:', ] # Check if this is a separator line (dots) if line_no_tabs and all(c == '.' for c in line_no_tabs): html_lines.append('
        ') i += 1 # Skip next line if it's empty (to avoid double line break) if i < len(lines) and not lines[i].strip(): i += 1 continue # Check if this line starts with === (commit separator - PR headers already handled above) elif line_no_tabs.startswith('===') and line_no_tabs.endswith('==='): # Commit separator - bold entire line html_lines.append(f'{html.escape(line_no_tabs)}') i += 1 continue else: # Protect quoted text (file names, strings) from markdown conversion quoted_strings = [] temp_line = line_no_tabs import re as re_module quote_pattern = r"'([^']+)'" for match in re_module.finditer(quote_pattern, temp_line): quoted_strings.append(match.group(1)) for idx, quoted_str in enumerate(quoted_strings): temp_line = temp_line.replace(f"'{quoted_str}'", f"__QUOTED_{idx}__", 1) line_html = markdown_to_html(temp_line, convert_line_breaks=False, repo_url=repo_url) line_html = convert_urls_to_links(line_html) for idx, quoted_str in enumerate(quoted_strings): escaped_quoted = html.escape(quoted_str) line_html = line_html.replace(f"__QUOTED_{idx}__", f"'{escaped_quoted}'") for header in section_headers: if header in line_no_tabs: escaped_header = html.escape(header) line_html = line_html.replace(escaped_header, f'{escaped_header}') break html_lines.append(line_html) i += 1 # Join with
        , then remove
        between special elements (commit headers,
        ) result = '
        '.join(html_lines) # Remove
        between commit header and
        result = re.sub(r'
        between
        and commit header (for multiple commits) result = re.sub(r'(]*>)
        ===', r'\1===', result) # Remove
        between
        and next element (including commit message label) result = re.sub(r'(]*>)
        \s*(|| dt.date: return now_local_naive().date() # Returns the current date/time in human readable format; eg. Sun 21 Apr 2024, 15:08:45 def get_cur_ts(ts_str=""): return (f'{ts_str}{calendar.day_abbr[(now_local_naive()).weekday()]} {now_local_naive().strftime("%d %b %Y, %H:%M:%S")}') # Prints the current date/time in human readable format with separator; eg. Sun 21 Apr 2024, 15:08:45 def print_cur_ts(ts_str=""): print(get_cur_ts(str(ts_str))) print(f"{'─' * HORIZONTAL_LINE1}\n{'─' * HORIZONTAL_LINE1}") # Returns the timestamp/datetime object in human readable format (long version); eg. Sun 21 Apr 2024, 15:08:45 def get_date_from_ts(ts): tz = pytz.timezone(LOCAL_TIMEZONE) if isinstance(ts, str): try: ts = isoparse(ts) except Exception: return "" if isinstance(ts, datetime): if ts.tzinfo is None: ts = pytz.utc.localize(ts) ts_new = ts.astimezone(tz) elif isinstance(ts, int): ts_new = datetime.fromtimestamp(ts, tz) elif isinstance(ts, float): ts_rounded = int(round(ts)) ts_new = datetime.fromtimestamp(ts_rounded, tz) else: return "" return (f'{calendar.day_abbr[ts_new.weekday()]} {ts_new.strftime("%d %b %Y, %H:%M:%S")}') # Returns the timestamp/datetime object in human readable format (short version); eg. # Sun 21 Apr 15:08 # Sun 21 Apr 24, 15:08 (if show_year == True and current year is different) # Sun 21 Apr 25, 15:08 (if always_show_year == True and current year can be the same) # Sun 21 Apr (if show_hour == False) # Sun 21 Apr 15:08:32 (if show_seconds == True) # 21 Apr 15:08 (if show_weekday == False) def get_short_date_from_ts(ts, show_year=False, show_hour=True, show_weekday=True, show_seconds=False, always_show_year=False): tz = pytz.timezone(LOCAL_TIMEZONE) if always_show_year: show_year = True if isinstance(ts, str): try: ts = isoparse(ts) except Exception: return "" if isinstance(ts, datetime): if ts.tzinfo is None: ts = pytz.utc.localize(ts) ts_new = ts.astimezone(tz) elif isinstance(ts, int): ts_new = datetime.fromtimestamp(ts, tz) elif isinstance(ts, float): ts_rounded = int(round(ts)) ts_new = datetime.fromtimestamp(ts_rounded, tz) elif isinstance(ts, date): ts = datetime.combine(ts, datetime.min.time()) ts = pytz.utc.localize(ts) ts_new = ts.astimezone(tz) else: return "" if show_hour: hour_strftime = " %H:%M:%S" if show_seconds else " %H:%M" else: hour_strftime = "" weekday_str = f"{calendar.day_abbr[ts_new.weekday()]} " if show_weekday else "" if (show_year and ts_new.year != datetime.now(tz).year) or always_show_year: hour_prefix = "," if show_hour else "" return f'{weekday_str}{ts_new.strftime(f"%d %b %y{hour_prefix}{hour_strftime}")}' else: return f'{weekday_str}{ts_new.strftime(f"%d %b{hour_strftime}")}' # Returns the timestamp/datetime object in human readable format (only hour, minutes and optionally seconds): eg. 15:08:12 def get_hour_min_from_ts(ts, show_seconds=False): tz = pytz.timezone(LOCAL_TIMEZONE) if isinstance(ts, str): try: ts = isoparse(ts) except Exception: return "" if isinstance(ts, datetime): if ts.tzinfo is None: ts = pytz.utc.localize(ts) ts_new = ts.astimezone(tz) elif isinstance(ts, int): ts_new = datetime.fromtimestamp(ts, tz) elif isinstance(ts, float): ts_rounded = int(round(ts)) ts_new = datetime.fromtimestamp(ts_rounded, tz) else: return "" out_strf = "%H:%M:%S" if show_seconds else "%H:%M" return ts_new.strftime(out_strf) # Returns the range between two timestamps/datetime objects; eg. Sun 21 Apr 14:09 - 14:15 def get_range_of_dates_from_tss(ts1, ts2, between_sep=" - ", short=False): tz = pytz.timezone(LOCAL_TIMEZONE) if isinstance(ts1, datetime): ts1_new = int(round(ts1.timestamp())) elif isinstance(ts1, int): ts1_new = ts1 elif isinstance(ts1, float): ts1_new = int(round(ts1)) else: return "" if isinstance(ts2, datetime): ts2_new = int(round(ts2.timestamp())) elif isinstance(ts2, int): ts2_new = ts2 elif isinstance(ts2, float): ts2_new = int(round(ts2)) else: return "" ts1_strf = datetime.fromtimestamp(ts1_new, tz).strftime("%Y%m%d") ts2_strf = datetime.fromtimestamp(ts2_new, tz).strftime("%Y%m%d") if ts1_strf == ts2_strf: if short: out_str = f"{get_short_date_from_ts(ts1_new)}{between_sep}{get_hour_min_from_ts(ts2_new)}" else: out_str = f"{get_date_from_ts(ts1_new)}{between_sep}{get_hour_min_from_ts(ts2_new, show_seconds=True)}" else: if short: out_str = f"{get_short_date_from_ts(ts1_new)}{between_sep}{get_short_date_from_ts(ts2_new)}" else: out_str = f"{get_date_from_ts(ts1_new)}{between_sep}{get_date_from_ts(ts2_new)}" return str(out_str) # Checks if the timezone name is correct def is_valid_timezone(tz_name): return tz_name in pytz.all_timezones # Prints and returns the printed text with new line def print_v(text=""): print(text) return text + "\n" # Signal handler for SIGUSR1 allowing to switch email notifications for user's profile changes def toggle_profile_changes_notifications_signal_handler(sig, frame): global PROFILE_NOTIFICATION PROFILE_NOTIFICATION = not PROFILE_NOTIFICATION sig_name = signal.Signals(sig).name print(f"* Signal {sig_name} received") print(f"* Email notifications:\t\t[profile changes = {PROFILE_NOTIFICATION}]") print_cur_ts("Timestamp:\t\t\t") # Signal handler for SIGUSR2 allowing to switch email notifications for user's new events def toggle_new_events_notifications_signal_handler(sig, frame): global EVENT_NOTIFICATION EVENT_NOTIFICATION = not EVENT_NOTIFICATION sig_name = signal.Signals(sig).name print(f"* Signal {sig_name} received") print(f"* Email notifications:\t\t[new events = {EVENT_NOTIFICATION}]") print_cur_ts("Timestamp:\t\t\t") # Signal handler for SIGCONT allowing to switch email notifications for user's repositories changes (except for update date) def toggle_repo_changes_notifications_signal_handler(sig, frame): global REPO_NOTIFICATION REPO_NOTIFICATION = not REPO_NOTIFICATION sig_name = signal.Signals(sig).name print(f"* Signal {sig_name} received") print(f"* Email notifications:\t\t[repos changes = {REPO_NOTIFICATION}]") print_cur_ts("Timestamp:\t\t\t") # Signal handler for SIGPIPE allowing to switch email notifications for user's repositories update date changes def toggle_repo_update_date_changes_notifications_signal_handler(sig, frame): global REPO_UPDATE_DATE_NOTIFICATION REPO_UPDATE_DATE_NOTIFICATION = not REPO_UPDATE_DATE_NOTIFICATION sig_name = signal.Signals(sig).name print(f"* Signal {sig_name} received") print(f"* Email notifications:\t\t[repos update date = {REPO_UPDATE_DATE_NOTIFICATION}]") print_cur_ts("Timestamp:\t\t\t") # Signal handler for SIGURG allowing to switch email notifications for user's daily contributions changes def toggle_contrib_changes_notifications_signal_handler(sig, frame): global CONTRIB_NOTIFICATION CONTRIB_NOTIFICATION = not CONTRIB_NOTIFICATION sig_name = signal.Signals(sig).name print(f"* Signal {sig_name} received") print(f"* Email notifications:\t\t[contrib changes = {CONTRIB_NOTIFICATION}]") print_cur_ts("Timestamp:\t\t\t") # Signal handler for SIGTRAP allowing to increase check timer by GITHUB_CHECK_SIGNAL_VALUE seconds def increase_check_signal_handler(sig, frame): global GITHUB_CHECK_INTERVAL GITHUB_CHECK_INTERVAL = GITHUB_CHECK_INTERVAL + GITHUB_CHECK_SIGNAL_VALUE sig_name = signal.Signals(sig).name print(f"* Signal {sig_name} received") print(f"* GitHub polling interval:\t[ {display_time(GITHUB_CHECK_INTERVAL)} ]") print_cur_ts("Timestamp:\t\t\t") # Signal handler for SIGABRT allowing to decrease check timer by GITHUB_CHECK_SIGNAL_VALUE seconds def decrease_check_signal_handler(sig, frame): global GITHUB_CHECK_INTERVAL if GITHUB_CHECK_INTERVAL - GITHUB_CHECK_SIGNAL_VALUE > 0: GITHUB_CHECK_INTERVAL = GITHUB_CHECK_INTERVAL - GITHUB_CHECK_SIGNAL_VALUE sig_name = signal.Signals(sig).name print(f"* Signal {sig_name} received") print(f"* GitHub polling interval:\t[ {display_time(GITHUB_CHECK_INTERVAL)} ]") print_cur_ts("Timestamp:\t\t\t") # Signal handler for SIGHUP allowing to reload secrets from .env def reload_secrets_signal_handler(sig, frame): sig_name = signal.Signals(sig).name print(f"* Signal {sig_name} received") # disable autoscan if DOTENV_FILE set to none if DOTENV_FILE and DOTENV_FILE.lower() == 'none': env_path = None else: # reload .env if python-dotenv is installed try: from dotenv import load_dotenv, find_dotenv if DOTENV_FILE: env_path = DOTENV_FILE else: env_path = find_dotenv() if env_path: load_dotenv(env_path, override=True) else: print("* No .env file found, skipping env-var reload") except ImportError: env_path = None print("* python-dotenv not installed, skipping env-var reload") if env_path: for secret in SECRET_KEYS: old_val = globals().get(secret) val = os.getenv(secret) if val is not None and val != old_val: globals()[secret] = val print(f"* Reloaded {secret} from {env_path}") print_cur_ts("Timestamp:\t\t\t") # List subclass used as a safe fallback for paginated responses class EmptyPaginatedList(list): def __init__(self): super().__init__() self.totalCount = 0 # Wraps GitHub API call with retry and linear back-off, returning a specified default on failure def gh_call(fn: Callable[..., Any], retries=NET_MAX_RETRIES, backoff=NET_BASE_BACKOFF_SEC, default: Any = None,) -> Callable[..., Any]: def wrapped(*args: Any, **kwargs: Any) -> Any: for i in range(1, retries + 1): try: return fn(*args, **kwargs) except RateLimitExceededException as e: headers = getattr(e, "headers", None) reset_str = None if headers: val = headers.get("X-RateLimit-Reset") if isinstance(val, str): reset_str = val sleep_for: int if reset_str is not None and reset_str.isdigit(): reset_epoch = int(reset_str) sleep_for = max(0, reset_epoch - int(time.time()) + 1) else: retry_after_str = None if headers: ra = headers.get("Retry-After") if isinstance(ra, str): retry_after_str = ra if retry_after_str is not None and retry_after_str.isdigit(): sleep_for = int(retry_after_str) else: sleep_for = int(backoff * i) print(f"* {fn.__name__} rate limited, sleeping {sleep_for}s (retry {i}/{retries})") time.sleep(sleep_for) continue except NET_ERRORS as e: print(f"* {fn.__name__} error: {e} (retry {i}/{retries})") time.sleep(backoff * i) return default return wrapped # Prints followers and followings for a GitHub user (-f) def github_print_followers_and_followings(user): user_name_str = user user_url = "-" followers_count = 0 followings_count = 0 followers_list = [] followings_list = [] print(f"* Getting followers & followings for user '{user}' ...") try: auth = Auth.Token(GITHUB_TOKEN) g = Github(base_url=GITHUB_API_URL, auth=auth) g_user = g.get_user(user) user_login = g_user.login user_name = g_user.name user_url = g_user.html_url followers_count = g_user.followers followings_count = g_user.following followers_list = g_user.get_followers() followings_list = g_user.get_following() user_name_str = user_login if user_name: user_name_str += f" ({user_name})" except Exception as e: raise RuntimeError(f"Cannot fetch user {user} details: {e}") print(f"\nUsername:\t\t{user_name_str}") print(f"User URL:\t\t{user_url}/") print(f"GitHub API URL:\t\t{GITHUB_API_URL}") print(f"Local timezone:\t\t{LOCAL_TIMEZONE}") print(f"\nFollowers:\t\t{followers_count}") try: if followers_list: for follower in followers_list: follower_str = f"\n- {follower.login}" if follower.name: follower_str += f" ({follower.name})" if follower.html_url: follower_str += f"\n[ {follower.html_url}/ ]" print(follower_str) except Exception as e: print(f"* Cannot fetch user's followers list: {e}") print(f"\nFollowings:\t\t{followings_count}") try: if followings_list: for following in followings_list: following_str = f"\n- {following.login}" if following.name: following_str += f" ({following.name})" if following.html_url: following_str += f"\n[ {following.html_url}/ ]" print(following_str) except Exception as e: print(f"* Cannot fetch user's followings list: {e}") g.close() # Displays a progress bar with percentage and current repo name def _display_progress(current, total, repo_name: str = "", bar_length: int = 40, is_final: bool = False) -> None: if total == 0: return # Defensive fallback for environments without a real TTY try: term_width = shutil.get_terminal_size(fallback=(80, 20)).columns except Exception: term_width = 80 # Keep a sane minimum – very tiny terminals may still wrap, but that's acceptable term_width = max(40, term_width) percent = float(current) / total percent_str = f"{percent * 100:.1f}%" counter_str = f"({current}/{total})" # Prepare (possibly truncated) repo name display_name = repo_name or "" max_name_length = 30 if display_name and len(display_name) > max_name_length: display_name = display_name[:max_name_length] + "..." prefix = "Repos" name_part = f" - {display_name}" if display_name else "" # First, assume we can show prefix + name; compute max bar length that fits def compute_bar_len(include_prefix: bool, include_name: bool) -> int: base = "" if include_prefix: base += prefix + " " base += "[]" # placeholder for bar base += f" {percent_str} {counter_str}" if include_name and name_part: base += name_part # Leave 1 char margin available = term_width - len(base) - 1 return available max_bar_len = compute_bar_len(include_prefix=True, include_name=True) show_prefix = True show_name = True if max_bar_len < 10: # Try without repo name show_name = False max_bar_len = compute_bar_len(include_prefix=True, include_name=False) if max_bar_len < 5: # Try without prefix as well show_prefix = False max_bar_len = compute_bar_len(include_prefix=False, include_name=False) # Final bar length: at least 3 chars, at most requested bar_length bar_len = max(3, min(bar_length, max_bar_len if max_bar_len > 0 else bar_length)) filled_length = int(bar_len * percent) bar = "█" * filled_length + "░" * (bar_len - filled_length) parts = [] if show_prefix: parts.append(prefix) parts.append(f"[{bar}]") parts.append(percent_str) parts.append(counter_str) if show_name and name_part: parts.append(name_part.lstrip()) progress_str = " ".join(parts) terminal_out = stdout_bck if stdout_bck is not None else sys.stdout if is_final: terminal_out.write("\r\033[K" + progress_str) terminal_out.flush() if stdout_bck is not None and isinstance(sys.stdout, Logger): sys.stdout.logfile.write(progress_str + "\n") sys.stdout.logfile.flush() else: terminal_out.write("\r\033[K" + progress_str) terminal_out.flush() # Processes items from all passed repositories and returns a list of dictionaries def github_process_repos(repos_list, show_progress=True): import logging import warnings # Suppress urllib3 warnings that might interfere with progress bar urllib3.disable_warnings() warnings.filterwarnings('ignore') list_of_repos = [] stargazers_list = [] subscribers_list = [] forked_repos = [] if repos_list: # Convert to list if it's a generator/iterator to get total count repos_list = list(repos_list) total_repos = len(repos_list) for idx, repo in enumerate(repos_list, 1): # Update progress bar at start if show_progress: _display_progress(idx, total_repos, repo.name) try: repo_created_date = repo.created_at repo_updated_date = repo.updated_at github_logger = logging.getLogger('github') original_level = github_logger.level github_logger.setLevel(logging.ERROR) try: stargazers_list = [star.login for star in repo.get_stargazers()] if show_progress: _display_progress(idx, total_repos, repo.name) # Refresh after stargazers subscribers_list = [subscriber.login for subscriber in repo.get_subscribers()] if show_progress: _display_progress(idx, total_repos, repo.name) # Refresh after subscribers forked_repos = [fork.full_name for fork in repo.get_forks()] if show_progress: _display_progress(idx, total_repos, repo.name) # Refresh after forks except GithubException as e: if e.status in [403, 451]: if BLOCKED_REPOS: print(f"\n* Repo '{repo.name}' is blocked, skipping for now: {e}") print_cur_ts("Timestamp:\t\t\t") if show_progress: _display_progress(idx, total_repos, repo.name) continue raise finally: github_logger.setLevel(original_level) issues = list(repo.get_issues(state='open')) if show_progress: _display_progress(idx, total_repos, repo.name) # Refresh after issues pulls = list(repo.get_pulls(state='open')) if show_progress: _display_progress(idx, total_repos, repo.name) # Refresh after pulls real_issues = [i for i in issues if not i.pull_request] issue_count = len(real_issues) pr_count = len(pulls) issues_list = [f"#{i.number} {i.title} ({i.user.login}) [ {i.html_url} ]" for i in real_issues] pr_list = [f"#{pr.number} {pr.title} ({pr.user.login}) [ {pr.html_url} ]" for pr in pulls] list_of_repos.append({"name": repo.name, "descr": repo.description, "is_fork": repo.fork, "forks": repo.forks_count, "stars": repo.stargazers_count, "subscribers": repo.subscribers_count, "url": repo.html_url, "language": repo.language, "date": repo_created_date, "update_date": repo_updated_date, "stargazers_list": stargazers_list, "forked_repos": forked_repos, "subscribers_list": subscribers_list, "issues": issue_count, "pulls": pr_count, "issues_list": issues_list, "pulls_list": pr_list}) if show_progress: _display_progress(idx, total_repos, repo.name, is_final=(idx == total_repos)) # Final refresh after successful processing except GithubException as e: # Skip TOS-blocked (403) and legally blocked (451) repositories if e.status in [403, 451]: if BLOCKED_REPOS: print(f"\n* Repo '{repo.name}' is blocked, skipping for now: {e}") print_cur_ts("Timestamp:\t\t\t") if show_progress: _display_progress(idx, total_repos, repo.name, is_final=(idx == total_repos)) continue else: print(f"\n* Cannot process repo '{repo.name}', skipping for now: {e}") print_cur_ts("Timestamp:\t\t\t") if show_progress: _display_progress(idx, total_repos, repo.name, is_final=(idx == total_repos)) continue except Exception as e: print(f"\n* Cannot process repo '{repo.name}', skipping for now: {e}") print_cur_ts("Timestamp:\t\t\t") if show_progress: _display_progress(idx, total_repos, repo.name, is_final=(idx == total_repos)) continue # Clear progress bar and move to next line (only if progress was shown) if show_progress and total_repos > 0: # Write newline to terminal terminal_out = stdout_bck if stdout_bck is not None else sys.stdout terminal_out.write("\n") terminal_out.flush() # Also write to log file if logging is enabled if stdout_bck is not None and isinstance(sys.stdout, Logger): sys.stdout.logfile.write("\n") sys.stdout.logfile.flush() return list_of_repos # Prints a list of public repositories for a GitHub user (-r) def github_print_repos(user): import logging user_name_str = user user_url = "-" repos_count = 0 repos_list = [] print(f"* Getting public repositories for user '{user}' ...") try: auth = Auth.Token(GITHUB_TOKEN) g = Github(base_url=GITHUB_API_URL, auth=auth) g_user = g.get_user(user) user_login = g_user.login user_name = g_user.name user_url = g_user.html_url if GET_ALL_REPOS: repos_list = g_user.get_repos() repos_count = g_user.public_repos else: repos_list = [repo for repo in g_user.get_repos(type='owner') if not repo.fork and repo.owner.login == user_login] repos_count = len(repos_list) user_name_str = user_login if user_name: user_name_str += f" ({user_name})" except Exception as e: raise RuntimeError(f"Cannot fetch user {user} details: {e}") print(f"\nUsername:\t\t{user_name_str}") print(f"User URL:\t\t{user_url}/") print(f"GitHub API URL:\t\t{GITHUB_API_URL}") print(f"Owned repos only:\t{not GET_ALL_REPOS}") print(f"Local timezone:\t\t{LOCAL_TIMEZONE}") print(f"\nRepositories:\t\t{repos_count}\n") try: if repos_list: print("─" * HORIZONTAL_LINE2) for repo in repos_list: print(f"🔸 {repo.name} {'(fork)' if repo.fork else ''} \n") github_logger = logging.getLogger('github') original_level = github_logger.level github_logger.setLevel(logging.ERROR) try: pr_count = repo.get_pulls(state='open').totalCount issue_count = repo.open_issues_count - pr_count except Exception: pr_count = "?" issue_count = "?" try: print(f" - 🌐 URL:\t\t{repo.html_url}") print(f" - 💻 Language:\t\t{repo.language}") print(f"\n - ⭐ Stars:\t\t{repo.stargazers_count}") print(f" - 🍴 Forks:\t\t{repo.forks_count}") print(f" - 👓 Watchers:\t\t{repo.subscribers_count}") # print(f" - 🐞 Issues+PRs:\t{repo.open_issues_count}") print(f" - 🐞 Issues:\t\t{issue_count}") print(f" - 📬 PRs:\t\t{pr_count}") print(f"\n - 📝 License:\t\t{repo.license.name if repo.license else 'None'}") print(f" - 🌿 Branch (default):\t{repo.default_branch}") print(f"\n - 📅 Created:\t\t{get_date_from_ts(repo.created_at)} ({calculate_timespan(int(time.time()), repo.created_at, granularity=2)} ago)") print(f" - 🔄 Updated:\t\t{get_date_from_ts(repo.updated_at)} ({calculate_timespan(int(time.time()), repo.updated_at, granularity=2)} ago)") print(f" - 🔃 Last push:\t{get_date_from_ts(repo.pushed_at)} ({calculate_timespan(int(time.time()), repo.pushed_at, granularity=2)} ago)") if repo.description: print(f"\n - 📝 Desc:\t\t{repo.description}") except GithubException as e: # Inform about TOS-blocked (403) and legally blocked (451) repositories if e.status in [403, 451]: print(f"\n* Repo '{repo.name}' is blocked: {e}") print("─" * HORIZONTAL_LINE2) continue finally: github_logger.setLevel(original_level) print("─" * HORIZONTAL_LINE2) except Exception as e: raise RuntimeError(f"Cannot fetch user's repositories list: {e}") g.close() # Prints a list of starred repositories by a GitHub user (-g) def github_print_starred_repos(user): user_name_str = user user_url = "-" starred_count = 0 starred_list = [] print(f"* Getting repositories starred by user '{user}' ...") try: auth = Auth.Token(GITHUB_TOKEN) g = Github(base_url=GITHUB_API_URL, auth=auth) g_user = g.get_user(user) user_login = g_user.login user_name = g_user.name user_url = g_user.html_url starred_list = g_user.get_starred() starred_count = starred_list.totalCount user_name_str = user_login if user_name: user_name_str += f" ({user_name})" except Exception as e: raise RuntimeError(f"Cannot fetch user {user} details: {e}") print(f"\nUsername:\t\t{user_name_str}") print(f"User URL:\t\t{user_url}/") print(f"GitHub API URL:\t\t{GITHUB_API_URL}") print(f"Local timezone:\t\t{LOCAL_TIMEZONE}") print(f"\nRepos starred by user:\t{starred_count}") try: if starred_list: for star in starred_list: star_str = f"\n- {star.full_name}" if star.html_url: star_str += f" [ {star.html_url}/ ]" print(star_str) except Exception as e: raise RuntimeError(f"Cannot fetch user's starred list: {e}") g.close() # Returns size in human readable format def human_readable_size(num): value = float(num) for unit in ["B", "KB", "MB", "GB", "TB", "PB"]: if abs(value) < 1024.0: return f"{value:.1f} {unit}" value /= 1024.0 return f"{value:.1f} PB" # Formats the given string as a quoted, indented block def format_body_block(content, indent=" "): new_content = f"'{content}'" indented = textwrap.indent(new_content.strip(), indent) return f"\n{indented}" # Returns the base web URL for GitHub or GHE (e.g. https://github.com or https://ghe.example.com) def github_web_base() -> str: if "api.github.com" in GITHUB_API_URL: return "https://github.com" return GITHUB_API_URL.replace("/api/v3", "").rstrip("/") # Safely truncates text without breaking HTML tags def safe_truncate_text(text, max_length=MAX_EVENT_BODY_LENGTH): if len(text) <= max_length: return text # Find a safe truncation point - look backwards from max_length truncate_at = max_length # Check if we're in the middle of an HTML tag by looking backwards for i in range(max_length - 1, max(0, max_length - 200), -1): if text[i] == '<': # Found start of a tag - check if it completes before max_length tag_end = text.find('>', i) if tag_end != -1 and tag_end < max_length: # Tag completes before truncation point, safe to truncate after it truncate_at = tag_end + 1 break else: # Tag doesn't complete, truncate before it to avoid breaking the tag truncate_at = i break elif text[i] == '>': # Found end of a tag, safe to truncate after it truncate_at = i + 1 break # Truncate at the safe point truncated = text[:truncate_at] # Find and close any open HTML tags open_tags = [] tag_pattern = r'<(/)?([a-zA-Z][a-zA-Z0-9]*)[^>]*>' for match in re.finditer(tag_pattern, truncated): is_closing = match.group(1) == '/' tag_name = match.group(2).lower() if is_closing: # Remove matching opening tag if open_tags and open_tags[-1] == tag_name: open_tags.pop() else: # Self-closing tags don't need closing if tag_name not in ('img', 'br', 'hr', 'input', 'meta', 'link', 'area', 'base', 'col', 'embed', 'source', 'track', 'wbr'): open_tags.append(tag_name) # Close any remaining open tags in reverse order result = truncated for tag in reversed(open_tags): result += f'' result += " ... " return result # Prints details about passed GitHub event def github_print_event(event, g, time_passed=False, ts: datetime | None = None): event_date: datetime | None = None repo_name = "" repo_url = "" st = "" tp = "" repo = None event_date = event.created_at if time_passed and not ts: tp = f" ({calculate_timespan(int(time.time()), event_date, show_seconds=False, granularity=2)} ago)" elif time_passed and ts: # Only show "after" if current event is newer than previous event if event_date > ts: tp = f" (after {calculate_timespan(event_date, ts, show_seconds=False, granularity=2)}: {get_short_date_from_ts(ts)})" else: tp = "" st += print_v(f"Event date:\t\t\t{get_date_from_ts(event_date)}{tp}") st += print_v(f"Event ID:\t\t\t{event.id}") st += print_v(f"Event type:\t\t\t{event.type}") if event.repo.id: try: desc_len = 80 repo = g.get_repo(event.repo.name) # For ForkEvent, prefer the source repo if available if event.type == "ForkEvent" and repo is not None: try: parent = gh_call(lambda: getattr(repo, "parent", None))() if parent: repo = parent except Exception: pass repo_name = getattr(repo, "full_name", event.repo.name) api_prefix = GITHUB_API_URL.rstrip("/") + "/repos/" repo_url = getattr(repo, "html_url", event.repo.url.replace(api_prefix, github_web_base() + "/")) st += print_v(f"\nRepo name:\t\t\t{repo_name}") st += print_v(f"Repo URL:\t\t\t{repo_url}") desc = (repo.description or "") if repo else "" cleaned = desc.replace('\n', ' ') short_desc = cleaned[:desc_len] + '...' if len(cleaned) > desc_len else cleaned if short_desc: st += print_v(f"Repo description:\t\t{short_desc}") except UnknownObjectException: repo = None st += print_v("\nRepository not found or has been removed") except GithubException as e: repo = None st += print_v(f"\n* Error occurred while getting repo details: {e}") if hasattr(event.actor, 'login'): if event.actor.login: st += print_v(f"\nEvent actor login:\t\t{event.actor.login}") if hasattr(event.actor, 'name'): if event.actor.name: st += print_v(f"Event actor name:\t\t{event.actor.name}") if hasattr(event.actor, 'html_url'): if event.actor.html_url: st += print_v(f"Event actor URL:\t\t{event.actor.html_url}") if event.payload.get("ref"): st += print_v(f"\nObject name:\t\t\t{event.payload.get('ref')}") if event.payload.get("ref_type"): st += print_v(f"Object type:\t\t\t{event.payload.get('ref_type')}") if event.payload.get("description"): st += print_v(f"Description:\t\t\t{event.payload.get('description')}") if event.payload.get("action"): st += print_v(f"\nAction:\t\t\t\t{event.payload.get('action')}") # Prefer commits from payload when present (older API behavior) if event.payload.get("commits"): commits = event.payload["commits"] commits_total = len(commits) st += print_v(f"\nNumber of commits:\t\t{commits_total}") for commit_count, commit in enumerate(commits, start=1): st += print_v(f"\n=== Commit {commit_count}/{commits_total} ===") st += print_v("." * HORIZONTAL_LINE1) commit_message = commit['message'] is_multiline = '\n' in commit_message if is_multiline: first_line = commit_message.split('\n', 1)[0] st += print_v(f" - Commit message:\t\t'{first_line}...'") else: st += print_v(f" - Commit message:\t\t'{commit_message}'") commit_details = None if repo: commit_details = gh_call(lambda: repo.get_commit(commit["sha"]))() if commit_details: commit_date = commit_details.commit.author.date st += print_v(f" - Commit date:\t\t\t{get_date_from_ts(commit_date)}") st += print_v(f" - Commit SHA:\t\t\t{commit['sha']}") st += print_v(f" - Commit author:\t\t{commit['author']['name']}") if commit_details and commit_details.author: st += print_v(f" - Commit author URL:\t\t{commit_details.author.html_url}") if commit_details: st += print_v(f" - Commit URL:\t\t\t{commit_details.html_url}") st += print_v(f" - Commit raw patch URL:\t{commit_details.html_url}.patch") stats = getattr(commit_details, "stats", None) additions = stats.additions if stats else 0 deletions = stats.deletions if stats else 0 stats_total = stats.total if stats else 0 st += print_v(f"\n - Additions/Deletions:\t\t+{additions} / -{deletions} ({stats_total})") if commit_details: try: file_count = sum(1 for _ in commit_details.files) except Exception: file_count = "N/A" st += print_v(f" - Files changed:\t\t{file_count}") if file_count: st += print_v(f" - Changed files list:") for f in commit_details.files: st += print_v(f" • '{f.filename}' - {f.status} (+{f.additions} / -{f.deletions})") if is_multiline: st += print_v(f"\n - Commit full message:") st += print_v(f"\n'{commit_message}'") else: pass st += print_v("." * HORIZONTAL_LINE1) # Fallback for new Events API where PushEvent no longer includes commit summaries elif event.type == "PushEvent" and repo: before_sha = event.payload.get("before") head_sha = event.payload.get("head") or event.payload.get("after") size_hint = event.payload.get("size") # Debug when payload has no commits # st += print_v("\n[debug] PushEvent payload has no 'commits' array; using compare API") # st += print_v(f"[debug] before:\t\t\t{before_sha}") # st += print_v(f"[debug] head/after:\t\t{head_sha}") # if size_hint is not None: # st += print_v(f"[debug] size (hint):\t\t{size_hint}") if before_sha and head_sha and before_sha != head_sha: try: compare = gh_call(lambda: repo.compare(before_sha, head_sha))() except Exception as e: compare = None st += print_v(f"* Error using compare({before_sha[:12]}...{head_sha[:12]}): {e}") if compare: commits = list(compare.commits) commits_total = len(commits) short_repo = getattr(repo, "full_name", repo_name) compare_url = f"{github_web_base()}/{short_repo}/compare/{before_sha[:12]}...{head_sha[:12]}" st += print_v(f"\nNumber of commits:\t\t{commits_total}") st += print_v(f"Compare URL:\t\t\t{compare_url}") for commit_count, c in enumerate(commits, start=1): st += print_v(f"\n=== Commit {commit_count}/{commits_total} ===") st += print_v("." * HORIZONTAL_LINE1) commit_sha = getattr(c, "sha", None) or getattr(c, "id", None) commit_details = gh_call(lambda: repo.get_commit(commit_sha))() if (repo and commit_sha) else None commit_message = commit_details.commit.message if commit_details and commit_details.commit else "" is_multiline = '\n' in commit_message if commit_message else False if commit_message: if is_multiline: first_line = commit_message.split('\n', 1)[0] st += print_v(f" - Commit message:\t\t'{first_line}...'") else: st += print_v(f" - Commit message:\t\t'{commit_message}'") if commit_details: commit_date = commit_details.commit.author.date st += print_v(f" - Commit date:\t\t\t{get_date_from_ts(commit_date)}") if commit_sha: st += print_v(f" - Commit SHA:\t\t\t{commit_sha}") author_name = None if commit_details and commit_details.commit and commit_details.commit.author: author_name = commit_details.commit.author.name st += print_v(f" - Commit author:\t\t{author_name or 'N/A'}") if commit_details and commit_details.author: st += print_v(f" - Commit author URL:\t\t{commit_details.author.html_url}") if commit_details: st += print_v(f" - Commit URL:\t\t\t{commit_details.html_url}") st += print_v(f" - Commit raw patch URL:\t{commit_details.html_url}.patch") stats = getattr(commit_details, "stats", None) additions = stats.additions if stats else 0 deletions = stats.deletions if stats else 0 stats_total = stats.total if stats else 0 st += print_v(f"\n - Additions/Deletions:\t\t+{additions} / -{deletions} ({stats_total})") try: file_count = sum(1 for _ in commit_details.files) except Exception: file_count = "N/A" st += print_v(f" - Files changed:\t\t{file_count}") if file_count and file_count != "N/A": st += print_v(" - Changed files list:") for f in commit_details.files: st += print_v(f" • '{f.filename}' - {f.status} (+{f.additions} / -{f.deletions})") if is_multiline and commit_message: st += print_v(f"\n - Commit full message:") st += print_v(f"\n'{commit_message}'") st += print_v("." * HORIZONTAL_LINE1) else: st += print_v("\nNo compare range available (forced push, tag push, or identical before/after)") if event.payload.get("commits") == []: st += print_v("\nNo new commits (forced push, tag push, branch reset or other ref update)") if event.payload.get("release"): st += print_v(f"\nRelease name:\t\t\t{event.payload['release'].get('name')}") st += print_v(f"Release tag name:\t\t{event.payload['release'].get('tag_name')}") st += print_v(f"Release URL:\t\t\t{event.payload['release'].get('html_url')}") st += print_v(f"\nPublished by:\t\t\t{event.payload['release']['author']['login']}") if event.payload['release']['author'].get('html_url'): st += print_v(f"Published by URL:\t\t{event.payload['release']['author']['html_url']}") if event.payload['release'].get('published_at'): pub_ts = event.payload['release']['published_at'] st += print_v(f"Published at:\t\t\t{get_date_from_ts(pub_ts)}") st += print_v(f"Target commitish:\t\t{event.payload['release'].get('target_commitish')}") st += print_v(f"Draft:\t\t\t\t{event.payload['release'].get('draft')}") st += print_v(f"Prerelease:\t\t\t{event.payload['release'].get('prerelease')}") if event.payload["release"].get("assets"): print() st += print_v("\nAssets:\n") assets = event.payload['release'].get('assets', []) for asset in assets: size_bytes = asset.get("size", 0) st += print_v(f" - Asset name:\t\t\t{asset.get('name')}") st += print_v(f" - Asset size:\t\t\t{human_readable_size(size_bytes)}") st += print_v(f" - Download URL:\t\t{asset.get('browser_download_url')}") if asset != assets[-1]: st += print_v() st += print_v(f"\nRelease notes:\n\n'{event.payload['release'].get('body')}'") if repo and event.payload.get("pull_request"): pr_number = event.payload["pull_request"]["number"] pr = repo.get_pull(pr_number) st += print_v(f"\n=== PR #{pr.number}: {pr.title} ===") st += print_v("." * HORIZONTAL_LINE1) st += print_v(f"Author:\t\t\t\t{pr.user.login}") st += print_v(f"Author URL:\t\t\t{pr.user.html_url}") st += print_v(f"State:\t\t\t\t{pr.state}") st += print_v(f"Merged:\t\t\t\t{pr.merged}") st += print_v(f"PR URL:\t\t\t\t{pr.html_url}") if pr.created_at: pr_created_date = get_date_from_ts(pr.created_at) st += print_v(f"Created at:\t\t\t{pr_created_date}") if pr.closed_at: pr_closed_date = get_date_from_ts(pr.closed_at) st += print_v(f"Closed at:\t\t\t{pr_closed_date}") if pr.merged_at: pr_merged_date = get_date_from_ts(pr.merged_at) st += print_v(f"Merged at:\t\t\t{pr_merged_date} by {pr.merged_by.login}") st += print_v(f"Head → Base:\t\t\t{pr.head.ref} → {pr.base.ref}") st += print_v(f"Mergeable state:\t\t{pr.mergeable_state}") if pr.labels: st += print_v(f"Labels:\t\t\t\t{', '.join(label.name for label in pr.labels)}") st += print_v(f"\nCommits:\t\t\t{pr.commits}") st += print_v(f"Comments (issue/review):\t{pr.comments} / {pr.review_comments}") st += print_v(f"Additions/Deletions:\t\t+{pr.additions} / -{pr.deletions}") st += print_v(f"Files changed:\t\t\t{pr.changed_files}") if pr.body: st += print_v(f"\nPR description:\n\n'{pr.body.strip()}'") if pr.requested_reviewers: for reviewer in pr.requested_reviewers: st += print_v(f"\n - Requested reviewer:\t{reviewer.login} ({reviewer.html_url})") if pr.assignees: for assignee in pr.assignees: st += print_v(f"\nAssignee:\t\t\t{assignee.login} ({assignee.html_url})") st += print_v("." * HORIZONTAL_LINE1) if event.payload.get("review"): review_date = event.payload["review"].get("submitted_at") st += print_v(f"\nReview submitted at:\t\t{get_date_from_ts(review_date)}") st += print_v(f"Review URL:\t\t\t{event.payload['review'].get('html_url')}") if event.payload["review"].get("author_association"): st += print_v(f"Author association:\t\t{event.payload['review'].get('author_association')}") if event.payload["review"].get("id"): st += print_v(f"Review ID:\t\t\t{event.payload['review'].get('id')}") if event.payload["review"].get("commit_id"): st += print_v(f"Commit SHA reviewed:\t\t{event.payload['review'].get('commit_id')}") if event.payload["review"].get("state"): st += print_v(f"Review state:\t\t\t{event.payload['review'].get('state')}") if event.payload["review"].get("body"): review_body = event.payload['review'].get('body') if len(review_body) > MAX_EVENT_BODY_LENGTH: review_body = safe_truncate_text(review_body) st += print_v(f"Review body:") st += print_v(format_body_block(review_body)) if repo: try: pr_number = event.payload["pull_request"]["number"] pr_obj = repo.get_pull(pr_number) count = sum(1 for _ in pr_obj.get_single_review_comments(event.payload["review"].get("id"))) st += print_v(f"Comments in this review:\t{count}") except Exception: pass if event.payload.get("issue"): st += print_v(f"\nIssue title:\t\t\t{event.payload['issue'].get('title')}") issue_date = event.payload["issue"].get("created_at") st += print_v(f"Issue date:\t\t\t{get_date_from_ts(issue_date)}") issue_author = event.payload["issue"].get("user", {}).get("login") if issue_author: st += print_v(f"Issue author:\t\t\t{issue_author}") issue_author_url = event.payload["issue"].get("user", {}).get("html_url") if issue_author_url: st += print_v(f"Issue author URL:\t\t{issue_author_url}") st += print_v(f"Issue URL:\t\t\t{event.payload['issue'].get('html_url')}") if event.payload["issue"].get("state"): st += print_v(f"Issue state:\t\t\t{event.payload['issue'].get('state')}") st += print_v(f"Issue comments:\t\t\t{event.payload['issue'].get('comments', 0)}") labels = event.payload["issue"].get("labels", []) if labels: label_names = ", ".join(label.get("name") for label in labels if label.get("name")) if label_names: st += print_v(f"Issue labels:\t\t\t{label_names}") if event.payload["issue"].get("assignees"): assignees = event.payload["issue"].get("assignees") for assignee in assignees: st += print_v(f" - Assignee name:\t\t{assignee.get('name')}") if assignee != assignees[-1]: st += print_v() reactions = event.payload["issue"].get("reactions", {}) reaction_map = { "+1": "👍", "-1": "👎", "laugh": "😄", "hooray": "🎉", "confused": "😕", "heart": "❤️", "rocket": "🚀", "eyes": "👀", } reaction_display = [] for key, emoji in reaction_map.items(): count = reactions.get(key, 0) if count > 0: reaction_display.append(f"{emoji} {count}") if reaction_display: st += print_v(f"Issue reactions:\t\t{' / '.join(reaction_display)}") if event.payload["issue"].get("body"): issue_body = event.payload['issue'].get('body') issue_snippet = issue_body if len(issue_body) <= MAX_EVENT_BODY_LENGTH else safe_truncate_text(issue_body) st += print_v(f"\nIssue body:") st += print_v(format_body_block(issue_snippet)) if event.payload.get("comment"): comment = event.payload["comment"] comment_date = comment.get("created_at") st += print_v(f"\nComment date:\t\t\t{get_date_from_ts(comment_date)}") comment_author = comment.get("user", {}).get("login") if comment_author: st += print_v(f"Comment author:\t\t\t{comment_author}") comment_author_url = comment.get("user", {}).get("html_url") if comment_author_url: st += print_v(f"Comment author URL:\t\t{comment_author_url}") st += print_v(f"Comment URL:\t\t\t{comment.get('html_url')}") if comment.get("path"): st += print_v(f"Comment path:\t\t\t{comment.get('path')}") comment_body = comment.get("body") if comment_body: if len(comment_body) > MAX_EVENT_BODY_LENGTH: comment_body = safe_truncate_text(comment_body) st += print_v(f"\nComment body:") st += print_v(format_body_block(comment_body)) if event.type == "PullRequestReviewCommentEvent": parent_id = comment.get("in_reply_to_id") if parent_id and repo: try: pr_number = event.payload["pull_request"]["number"] pr = repo.get_pull(pr_number) parent = pr.get_review_comment(parent_id) parent_date = get_date_from_ts(parent.created_at) st += print_v(f"\nPrevious comment:\n\n↳ In reply to {parent.user.login} (@ {parent_date}):") parent_body = parent.body if len(parent_body) > MAX_EVENT_BODY_LENGTH: parent_body = safe_truncate_text(parent_body) st += print_v(format_body_block(parent_body)) st += print_v(f"\nPrevious comment URL:\t\t{parent.html_url}") except Exception as e: st += print_v(f"\n* Could not fetch parent comment (ID {parent_id}): {e}") else: st += print_v("\n(This is the first comment in its thread)") elif event.type in ("IssueCommentEvent", "CommitCommentEvent"): if repo: comment_id = comment["id"] comment_created = datetime.fromisoformat(comment["created_at"].replace("Z", "+00:00")) if event.type == "IssueCommentEvent": issue_number = event.payload["issue"]["number"] issue = repo.get_issue(issue_number) virtual_comment_list = [] if issue.body: virtual_comment_list.append({ "id": f"issue-{issue.id}", # fake ID so it doesn't collide "created_at": issue.created_at, "user": issue.user, "body": issue.body, "html_url": issue.html_url }) for c in issue.get_comments(): virtual_comment_list.append({ "id": c.id, "created_at": c.created_at, "user": c.user, "body": c.body, "html_url": c.html_url }) previous = None for c in virtual_comment_list: if c["id"] == comment_id or (isinstance(c["id"], int) and c["id"] == comment_id): continue if c["created_at"] < comment_created: if not previous or c["created_at"] > previous["created_at"]: previous = c if previous: prev_date = get_date_from_ts(previous["created_at"]) st += print_v(f"\nPrevious comment:\n\n↳ In reply to {previous['user'].login} (@ {prev_date}):") parent_body = previous["body"] if len(parent_body) > MAX_EVENT_BODY_LENGTH: parent_body = safe_truncate_text(parent_body) st += print_v(format_body_block(parent_body)) st += print_v(f"\nPrevious comment URL:\t\t{previous['html_url']}") else: st += print_v("\n(This is the first comment in this thread)") elif event.type == "CommitCommentEvent": commit_sha = comment["commit_id"] comments = list(repo.get_commit(commit_sha).get_comments()) previous = None for c in comments: if c.id == comment_id: continue if c.created_at < comment_created: if not previous or c.created_at > previous.created_at: previous = c if previous: prev_date = get_date_from_ts(previous.created_at) st += print_v(f"\nPrevious comment:\n\n↳ In reply to {previous.user.login} (@ {prev_date}):") parent_body = previous.body if len(parent_body) > MAX_EVENT_BODY_LENGTH: parent_body = safe_truncate_text(parent_body) st += print_v(format_body_block(parent_body)) st += print_v(f"\nPrevious comment URL:\t\t{previous.html_url}") else: st += print_v("\n(This is the first comment in this thread)") if event.payload.get("forkee"): st += print_v(f"\nForked to repo:\t\t\t{event.payload['forkee'].get('full_name')}") st += print_v(f"Forked to repo (URL):\t\t{event.payload['forkee'].get('html_url')}") if event.type == "MemberEvent": member_login = event.payload.get("member", {}).get("login") member_role = event.payload.get("membership", {}).get("role") if member_login: st += print_v(f"\nMember added:\t\t\t{member_login}") member_url = event.payload.get("member", {}).get("html_url") if member_url: st += print_v(f"Member added URL:\t\t{member_url}") if member_role: st += print_v(f"Permission level:\t\t{member_role}") if event.type == "PublicEvent": st += print_v("\nRepository is now public") if event.type == "DiscussionEvent": discussion_title = event.payload.get("discussion", {}).get("title") discussion_url = event.payload.get("discussion", {}).get("html_url") discussion_category = event.payload.get("discussion", {}).get("category", {}).get("name") if discussion_title: st += print_v(f"\nDiscussion title:\t\t{discussion_title}") if discussion_url: st += print_v(f"Discussion URL:\t\t\t{discussion_url}") if discussion_category: st += print_v(f"Discussion category:\t\t{discussion_category}") if event.type == "DiscussionCommentEvent": comment_author = event.payload.get("comment", {}).get("user", {}).get("login") comment_body = event.payload.get("comment", {}).get("body") if comment_author: st += print_v(f"\nDiscussion comment by:\t\t{comment_author}") if comment_body: if len(comment_body) > MAX_EVENT_BODY_LENGTH: comment_body = safe_truncate_text(comment_body) st += print_v(f"\nComment body:") st += print_v(format_body_block(comment_body)) return event_date, repo_name, repo_url, st # Lists recent events for the user (-l) and potentially dumps the entries to CSV file (if -b is used) def github_list_events(user, number, csv_file_name): events = [] available_events = 0 try: if csv_file_name: init_csv_file(csv_file_name) except Exception as e: print(f"* Error: {e}") list_operation = "* Listing & saving" if csv_file_name else "* Listing" print(f"{list_operation} {number} recent events for '{user}' ...\n") try: auth = Auth.Token(GITHUB_TOKEN) g = Github(base_url=GITHUB_API_URL, auth=auth) g_user = g.get_user(user) all_events = list(g_user.get_events()) total_available = len(all_events) events = all_events[:number] available_events = len(events) user_login = g_user.login user_name = g_user.name user_url = g_user.html_url user_name_str = user_login if user_name: user_name_str += f" ({user_name})" except Exception as e: print(f"* Cannot fetch user details: {e}") return print(f"Username:\t\t\t{user_name_str}") print(f"User URL:\t\t\t{user_url}/") print(f"GitHub API URL:\t\t\t{GITHUB_API_URL}") if csv_file_name: print(f"CSV export enabled:\t\t{bool(csv_file_name)}" + (f" ({csv_file_name})" if csv_file_name else "")) print(f"Local timezone:\t\t\t{LOCAL_TIMEZONE}") print(f"Available events:\t\t{total_available}") print(f"\n{'─' * HORIZONTAL_LINE1}\n{'─' * HORIZONTAL_LINE1}") if available_events == 0: print("There are no events yet") else: try: event_number_map = {id(event): event_index + 1 for event_index, event in enumerate(events)} for event in reversed(events): if event.type in EVENTS_TO_MONITOR or 'ALL' in EVENTS_TO_MONITOR: event_number = event_number_map[id(event)] print(f"Event number:\t\t\t#{event_number}") try: event_date, repo_name, repo_url, event_text = github_print_event(event, g) except Exception as e: print(f"\n* Warning, cannot fetch all event details, skipping: {e}") print_cur_ts("\nTimestamp:\t\t\t") continue try: if csv_file_name: write_csv_entry(csv_file_name, convert_to_local_naive(event_date), str(event.type), str(repo_name), "", "") except Exception as e: print(f"* Error: {e}") print_cur_ts("\nTimestamp:\t\t\t") except Exception as e: print(f"* Cannot fetch events: {e}") # Detects and reports changes in a user's profile-level entities (followers, followings, public repos, starred repos) def handle_profile_change(label, count_old, count_new, list_old, raw_list, user, csv_file_name, field): try: list_new = [] list_new = [getattr(item, field) for item in raw_list] if not list_new and count_new > 0: return list_old, count_old except Exception as e: print(f"* Error while trying to get the list of {label.lower()}: {e}") print_cur_ts("Timestamp:\t\t\t") return list_old, count_old new_count = len(list_new) old_count = len(list_old) if list_new == list_old: return list_old, count_old diff = new_count - old_count diff_str = f"+{diff}" if diff > 0 else f"{diff}" label_context = "by" if label.lower() in ["followings", "starred repos"] else "for" if diff == 0: print(f"* {label} list changed {label_context} user {user}\n") else: print(f"* {label} number changed {label_context} user {user} from {old_count} to {new_count} ({diff_str})\n") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), f"{label} Count", user, old_count, new_count) except Exception as e: print(f"* Error: {e}") added_list_str = "" removed_list_str = "" added_mbody = "" removed_mbody = "" removed_items = list(set(list_old) - set(list_new)) added_items = list(set(list_new) - set(list_old)) removed_mbody_html = "" removed_list_str_html = "" added_mbody_html = "" added_list_str_html = "" if removed_items: print(f"Removed {label.lower()}:\n") removed_mbody = f"\nRemoved {label.lower()}:\n\n" removed_mbody_html = f"
        Removed {html.escape(label.lower())}:

        " web_base = github_web_base() for item in removed_items: item_url = (f"{web_base}/{item}/" if label.lower() in ["followers", "followings", "starred repos"] else f"{web_base}/{user}/{item}/") print(f"- {item} [ {item_url} ]") removed_list_str += f"- {item} [ {item_url} ]\n" removed_list_str_html += f"- {html.escape(item)}
        " try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), f"Removed {label[:-1]}", user, item, "") except Exception as e: print(f"* Error: {e}") print() if added_items: print(f"Added {label.lower()}:\n") added_mbody = f"\nAdded {label.lower()}:\n\n" added_mbody_html = f"
        Added {html.escape(label.lower())}:

        " web_base = github_web_base() for item in added_items: item_url = (f"{web_base}/{item}/" if label.lower() in ["followers", "followings", "starred repos"] else f"{web_base}/{user}/{item}/") print(f"- {item} [ {item_url} ]") added_list_str += f"- {item} [ {item_url} ]\n" added_list_str_html += f"- {html.escape(item)}
        " try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), f"Added {label[:-1]}", user, "", item) except Exception as e: print(f"* Error: {e}") print() if diff == 0: m_subject = f"GitHub user {user} {label.lower()} list changed" m_body = (f"{label} list changed {label_context} user {user}\n" f"{removed_mbody}{removed_list_str}{added_mbody}{added_list_str}\n" f"Check interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}") m_body_html = ( f"" f"{label} list changed {label_context} user {html.escape(user)}
        " f"{removed_mbody_html if removed_items else ''}{removed_list_str_html if removed_items else ''}" f"{added_mbody_html if added_items else ''}{added_list_str_html if added_items else ''}
        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) else: m_subject = f"GitHub user {user} {label.lower()} number has changed! ({diff_str}, {old_count} -> {new_count})" m_body = (f"{label} number changed {label_context} user {user} from {old_count} to {new_count} ({diff_str})\n" f"{removed_mbody}{removed_list_str}{added_mbody}{added_list_str}\n" f"Check interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}") m_body_html = ( f"" f"{label} number changed {label_context} user {html.escape(user)} from {old_count} to {new_count} ({html.escape(diff_str)})
        " f"{removed_mbody_html if removed_items else ''}{removed_list_str_html if removed_items else ''}" f"{added_mbody_html if added_items else ''}{added_list_str_html if added_items else ''}
        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if PROFILE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") return list_new, new_count # Detects and reports changes in repository-level entities (like stargazers, watchers, forks, issues, pull requests) def check_repo_list_changes(count_old, count_new, list_old, list_new, label, repo_name, repo_url, user, csv_file_name): if not list_new and count_new > 0: return # Handle None values by converting to empty lists if list_old is None: list_old = [] if list_new is None: list_new = [] old_count = len(list_old) new_count = len(list_new) if list_old == list_new: return diff = new_count - old_count diff_str = f"{'+' if diff > 0 else ''}{diff}" if diff == 0: print(f"* Repo '{repo_name}': {label.lower()} list changed\n* Repo URL: {repo_url}") else: print(f"* Repo '{repo_name}': number of {label.lower()} changed from {old_count} to {new_count} ({diff_str})\n* Repo URL: {repo_url}") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), f"Repo {label} Count", repo_name, old_count, new_count) except Exception as e: print(f"* Error: {e}") added_list_str = "" removed_list_str = "" added_mbody = "" removed_mbody = "" removed_items = list(set(list_old) - set(list_new)) added_items = list(set(list_new) - set(list_old)) # If lists are different but sets are the same (just reordered or duplicates), no actual change if not removed_items and not added_items: return removal_text = "Closed" if label in ["Issues", "Pull Requests"] else "Removed" if list_old != list_new: print() removed_mbody_html = "" removed_list_str_html = "" added_mbody_html = "" added_list_str_html = "" if removed_items: print(f"{removal_text} {label.lower()}:\n") removed_mbody = f"\n{removal_text} {label.lower()}:\n\n" removed_mbody_html = f"
        {html.escape(removal_text)} {html.escape(label.lower())}:

        " for item in removed_items: item_line = f"- {item} [ {github_web_base()}/{item}/ ]" if label.lower() in ["stargazers", "watchers", "forks"] else f"- {item}" print(item_line) removed_list_str += item_line + "\n" if label.lower() in ["stargazers", "watchers", "forks"]: item_url = f"{github_web_base()}/{item}/" removed_list_str_html += f"- {html.escape(item)}
        " elif label in ["Issues", "Pull Requests"]: match = re.match(r'#(\d+)\s+(.+?)\s+\(([^)]+)\)\s+\[\s*([^\]]+)\s*\]', item) if match: num, title, user_item, url = match.groups() removed_list_str_html += f"- #{num} {html.escape(title)} ({html.escape(user_item)})
        " else: removed_list_str_html += f"- {html.escape(item)}
        " else: removed_list_str_html += f"- {html.escape(item)}
        " try: if csv_file_name: value = item.rsplit("(", 1)[0].strip() if label in ["Issues", "Pull Requests"] else item write_csv_entry(csv_file_name, now_local_naive(), f"{removal_text} {label[:-1]}", repo_name, value, "") except Exception as e: print(f"* Error: {e}") print() if added_items: print(f"Added {label.lower()}:\n") added_mbody = f"\nAdded {label.lower()}:\n\n" added_mbody_html = f"
        Added {html.escape(label.lower())}:

        " for item in added_items: item_line = f"- {item} [ {github_web_base()}/{item}/ ]" if label.lower() in ["stargazers", "watchers", "forks"] else f"- {item}" print(item_line) added_list_str += item_line + "\n" if label.lower() in ["stargazers", "watchers", "forks"]: item_url = f"{github_web_base()}/{item}/" added_list_str_html += f"- {html.escape(item)}
        " elif label in ["Issues", "Pull Requests"]: match = re.match(r'#(\d+)\s+(.+?)\s+\(([^)]+)\)\s+\[\s*([^\]]+)\s*\]', item) if match: num, title, user_item, url = match.groups() added_list_str_html += f"- #{num} {html.escape(title)} ({html.escape(user_item)})
        " else: added_list_str_html += f"- {html.escape(item)}
        " else: added_list_str_html += f"- {html.escape(item)}
        " try: if csv_file_name: value = item.rsplit("(", 1)[0].strip() if label in ["Issues", "Pull Requests"] else item write_csv_entry(csv_file_name, now_local_naive(), f"Added {label[:-1]}", repo_name, "", value) except Exception as e: print(f"* Error: {e}") print() if diff == 0: m_subject = f"GitHub user {user} {label.lower()} list changed for repo '{repo_name}'!" m_body = (f"* Repo '{repo_name}': {label.lower()} list changed\n" f"* Repo URL: {repo_url}\n{removed_mbody}{removed_list_str}{added_mbody}{added_list_str}\n" f"Check interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}") m_body_html = ( f"" f"* Repo '{html.escape(repo_name)}': {html.escape(label.lower())} list changed
        " f"* Repo URL: {html.escape(repo_url)}
        " f"{removed_mbody_html}{removed_list_str_html}" f"{added_mbody_html}{added_list_str_html}
        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) else: m_subject = f"GitHub user {user} number of {label.lower()} for repo '{repo_name}' has changed! ({diff_str}, {old_count} -> {new_count})" m_body = (f"* Repo '{repo_name}': number of {label.lower()} changed from {old_count} to {new_count} ({diff_str})\n" f"* Repo URL: {repo_url}\n{removed_mbody}{removed_list_str}{added_mbody}{added_list_str}\n" f"Check interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}") m_body_html = ( f"" f"* Repo '{html.escape(repo_name)}': number of {html.escape(label.lower())} changed from {old_count} to {new_count} ({html.escape(diff_str)})
        " f"* Repo URL: {html.escape(repo_url)}
        " f"{removed_mbody_html}{removed_list_str_html}" f"{added_mbody_html}{added_list_str_html}
        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if REPO_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Finds an optional config file def find_config_file(cli_path=None): """ Search for an optional config file in: 1) CLI-provided path (must exist if given) 2) ./{DEFAULT_CONFIG_FILENAME} 3) ~/.{DEFAULT_CONFIG_FILENAME} 4) script-directory/{DEFAULT_CONFIG_FILENAME} """ if cli_path: p = Path(os.path.expanduser(cli_path)) return str(p) if p.is_file() else None candidates = [ Path.cwd() / DEFAULT_CONFIG_FILENAME, Path.home() / f".{DEFAULT_CONFIG_FILENAME}", Path(__file__).parent / DEFAULT_CONFIG_FILENAME, ] for p in candidates: if p.is_file(): return str(p) return None # Resolves an executable path by checking if it's a valid file or searching in $PATH def resolve_executable(path): if os.path.isfile(path) and os.access(path, os.X_OK): return path found = shutil.which(path) if found: return found raise FileNotFoundError(f"Could not find executable '{path}'") # Checks if the authenticated user (token's owner) is blocked by user def is_blocked_by(user): try: headers = { "Authorization": f"Bearer {GITHUB_TOKEN}", "Accept": "application/vnd.github+json", } response = req.get(f"{GITHUB_API_URL}/user", headers=headers, timeout=15) if response.status_code != 200: return False me_login = response.json().get("login", "").lower() if user.lower() == me_login: return False graphql_endpoint = GITHUB_API_URL.rstrip("/") + "/graphql" query = """ query($login: String!) { user(login: $login) { viewerCanFollow } } """ payload = {"query": query, "variables": {"login": user}} response_graphql = req.post(graphql_endpoint, json=payload, headers=headers, timeout=15) if response_graphql.status_code == 404: return False if not response_graphql.ok: return False data = response_graphql.json() can_follow = (data.get("data", {}).get("user", {}).get("viewerCanFollow", True)) return not bool(can_follow) except Exception: return False # Return the total number of repositories the user has starred (faster than via PyGithub) def get_starred_count(user): try: headers = { "Authorization": f"Bearer {GITHUB_TOKEN}", "Accept": "application/vnd.github+json", } graphql_endpoint = f"{GITHUB_API_URL.rstrip('/')}/graphql" query = """ query($login:String!){ user(login:$login){ starredRepositories{ totalCount } } } """ payload = {"query": query, "variables": {"login": user}} response = req.post(graphql_endpoint, json=payload, headers=headers, timeout=15) if not response.ok: return 0 data = response.json() return (data.get("data", {}).get("user", {}).get("starredRepositories", {}).get("totalCount", 0)) except Exception: return 0 # Returns True if the user's GitHub page shows "activity is private" def has_private_banner(user): try: url = f"{GITHUB_HTML_URL.rstrip('/')}/{user}" r = req.get(url, timeout=15) return r.ok and "activity is private" in r.text.lower() except Exception: return False # Returns True if the user's GitHub profile is public def is_profile_public(g: Github, user, new_account_days=30): if has_private_banner(user): return False try: u = g.get_user(user) if any([ u.followers > 0, u.following > 0, get_starred_count(user) > 0, ]): return True try: events_iter = iter(u.get_events()) next(events_iter) return True except (StopIteration, GithubException): pass except GithubException: pass return False # Returns a dict mapping 'YYYY-MM-DD' -> int contribution count for the range # Handles long date ranges by splitting into year-long chunks def get_daily_contributions(username: str, start: Optional[dt.date] = None, end: Optional[dt.date] = None, token: Optional[str] = None) -> dict: if token is None: raise ValueError("GitHub token is required") today = dt.date.today() if start is None: start = today if end is None: end = today # GitHub's contribution calendar API has limitations - typically only returns last year # For longer periods, we need to split into year-long chunks out = {} # Split into year-long chunks (max 1 year per request) current_start = start while current_start <= end: # Calculate end date for this chunk (1 year from start, or the requested end date, whichever is earlier) # Use relativedelta to handle leap years correctly (e.g., Feb 29, 2024 -> Feb 28, 2025) next_year_date = current_start + relativedelta.relativedelta(years=1) chunk_end = min( next_year_date - dt.timedelta(days=1), end ) url = GITHUB_API_URL.rstrip("/") + "/graphql" headers = { "Authorization": f"Bearer {token}", "Time-Zone": LOCAL_TIMEZONE, } tz = pytz.timezone(LOCAL_TIMEZONE) start_w = current_start - dt.timedelta(days=1) end_w_exclusive = chunk_end + dt.timedelta(days=2) start_iso = tz.localize(dt.datetime.combine(start_w, dt.time.min)).isoformat() end_iso = tz.localize(dt.datetime.combine(end_w_exclusive, dt.time.min)).isoformat() query = """ query($login: String!, $from: DateTime!, $to: DateTime!) { user(login: $login) { contributionsCollection(from: $from, to: $to) { contributionCalendar { weeks { contributionDays { date contributionCount } } } } } }""" variables = {"login": username, "from": start_iso, "to": end_iso} r = requests.post(url, json={"query": query, "variables": variables}, headers=headers, timeout=30) r.raise_for_status() data = r.json() # Check for errors in the response if "errors" in data: raise RuntimeError(f"GraphQL API errors: {data['errors']}") # Check if user exists if data.get("data", {}).get("user") is None: raise ValueError(f"User '{username}' not found") # Safely access nested data contrib_collection = data.get("data", {}).get("user", {}).get("contributionsCollection") if contrib_collection is None: raise RuntimeError(f"No contributions data returned for user '{username}'") contrib_calendar = contrib_collection.get("contributionCalendar") if contrib_calendar is None: # For very old dates, GitHub may not have data - skip this chunk print(f"Warning: No contribution calendar data available for {current_start} to {chunk_end}, skipping...") current_start = chunk_end + dt.timedelta(days=1) continue weeks = contrib_calendar.get("weeks") if weeks is None: print(f"Warning: No weeks data available for {current_start} to {chunk_end}, skipping...") current_start = chunk_end + dt.timedelta(days=1) continue # Process the weeks data for w in weeks: contribution_days = w.get("contributionDays", []) for d in contribution_days: date_str = d.get("date") if not date_str: continue try: date_obj = dt.date.fromisoformat(date_str) if start <= date_obj <= end: out[date_str] = d.get("contributionCount", 0) except ValueError: continue # Move to next chunk current_start = chunk_end + dt.timedelta(days=1) return out # Return contribution count for a single day def get_daily_contributions_count(username: str, day: dt.date, token: str) -> int: data = get_daily_contributions(username, day, day, token) return next(iter(data.values()), 0) # Checks count for today and decides whether to notify based on stored state. def check_daily_contribs(username: str, token: str, state: dict, min_delta: int = 1, fail_threshold: int = 3) -> tuple[bool, int, bool]: day = today_local() try: curr = get_daily_contributions_count(username, day, token=token) state["consecutive_failures"] = 0 state["last_error"] = None except Exception as e: state["consecutive_failures"] = state.get("consecutive_failures", 0) + 1 state["last_error"] = f"{type(e).__name__}: {e}" error_notify = state["consecutive_failures"] >= fail_threshold return False, state.get("count", 0), error_notify prev_day = state.get("day") prev_cnt = state.get("count") # New day -> reset baseline silently if prev_day != day: state["day"] = day state["count"] = curr state["prev_count"] = curr return False, curr, False # no notify on rollover # Same day -> notify if change >= threshold if prev_cnt is not None and abs(curr - prev_cnt) >= min_delta: state["prev_count"] = prev_cnt state["count"] = curr return True, curr, False # No change state["count"] = curr return False, curr, False # Monitors activity of the specified GitHub user def github_monitor_user(user, csv_file_name): try: if csv_file_name: init_csv_file(csv_file_name) except Exception as e: print(f"* Error: {e}") followers_count = 0 followings_count = 0 repos_count = 0 starred_count = 0 available_events = 0 events = [] repos_list = [] event_date: datetime | None = None blocked = None public = False contrib_state = {} contrib_curr = 0 print("Sneaking into GitHub like a ninja ...") try: auth = Auth.Token(GITHUB_TOKEN) g = Github(base_url=GITHUB_API_URL, auth=auth) g_user_myself = g.get_user() user_myself_login = g_user_myself.login user_myself_name = g_user_myself.name user_myself_url = g_user_myself.html_url g_user = g.get_user(user) user_login = g_user.login user_name = g_user.name user_url = g_user.html_url location = g_user.location bio = g_user.bio company = g_user.company email = g_user.email blog = g_user.blog account_created_date = g_user.created_at account_updated_date = g_user.updated_at followers_count = g_user.followers followings_count = g_user.following followers_list = g_user.get_followers() followings_list = g_user.get_following() if GET_ALL_REPOS: repos_list = g_user.get_repos() repos_count = g_user.public_repos else: repos_list = [repo for repo in g_user.get_repos(type='owner') if not repo.fork and repo.owner.login == user_login] repos_count = len(repos_list) starred_list = g_user.get_starred() starred_count = starred_list.totalCount public = is_profile_public(g, user) blocked = is_blocked_by(user) if public else None if TRACK_CONTRIB_CHANGES: contrib_curr = get_daily_contributions_count(user, today_local(), token=GITHUB_TOKEN) contrib_state = { "day": today_local(), "count": contrib_curr, "prev_count": contrib_curr } if not DO_NOT_MONITOR_GITHUB_EVENTS: events = list(islice(g_user.get_events(), EVENTS_NUMBER)) available_events = len(events) except Exception as e: print(f"\n* Error: {e}") sys.exit(1) last_event_id = 0 last_event_ts: datetime | None = None events_list_of_ids = set() if not DO_NOT_MONITOR_GITHUB_EVENTS: if available_events: try: for event in reversed(events): events_list_of_ids.add(event.id) newest = events[0] last_event_id = newest.id if last_event_id: last_event_ts = newest.created_at except Exception as e: print(f"\n* Cannot get event IDs / timestamps: {e}\n") pass followers_old_count = followers_count followings_old_count = followings_count repos_old_count = repos_count starred_old_count = starred_count user_name_old = user_name location_old = location bio_old = bio company_old = company email_old = email blog_old = blog blocked_old = blocked public_old = public last_event_id_old = last_event_id last_event_ts_old = last_event_ts events_list_of_ids_old = events_list_of_ids.copy() user_myself_name_str = user_myself_login if user_myself_name: user_myself_name_str += f" ({user_myself_name})" print(f"\nToken belongs to:\t\t{user_myself_name_str}" + f"\n\t\t\t\t[ {user_myself_url} ]" if user_myself_url else "") user_name_str = user_login if user_name: user_name_str += f" ({user_name})" print(f"\nUsername:\t\t\t{user_name_str}") print(f"User URL:\t\t\t{user_url}/") if location: print(f"Location:\t\t\t{location}") if company: print(f"Company:\t\t\t{company}") if email: print(f"Email:\t\t\t\t{email}") if blog: print(f"Blog URL:\t\t\t{blog}") print(f"\nPublic profile:\t\t\t{'Yes' if public else 'No'}") print(f"Blocked by the user:\t\t{'Unknown' if blocked is None else ('Yes' if blocked else 'No')}") print(f"\nAccount creation date:\t\t{get_date_from_ts(account_created_date)} ({calculate_timespan(int(time.time()), account_created_date, show_seconds=False)} ago)") print(f"Account updated date:\t\t{get_date_from_ts(account_updated_date)} ({calculate_timespan(int(time.time()), account_updated_date, show_seconds=False)} ago)") account_updated_date_old = account_updated_date print(f"\nFollowers:\t\t\t{followers_count}") print(f"Followings:\t\t\t{followings_count}") print(f"Repositories:\t\t\t{repos_count}") print(f"Starred repos:\t\t\t{starred_count}") if TRACK_CONTRIB_CHANGES: print(f"Today's contributions:\t\t{contrib_curr}") if not DO_NOT_MONITOR_GITHUB_EVENTS: print(f"Available events:\t\t{available_events}{'+' if available_events == EVENTS_NUMBER else ''}") if bio: print(f"\nBio:\n\n'{bio}'") print_cur_ts("\nTimestamp:\t\t\t") list_of_repos = [] if repos_list and TRACK_REPOS_CHANGES: # Filter repos for detailed monitoring only (keep full repos_list for profile change detection) repos_list_filtered = repos_list if 'ALL' not in REPOS_TO_MONITOR: repos_list_filtered = [] for repo in repos_list: # Check if repo matches any entry in REPOS_TO_MONITOR should_monitor = False for monitor_entry in REPOS_TO_MONITOR: if '/' in monitor_entry: # Format: 'user/repo_name' - check if user matches and repo matches monitor_user, monitor_repo = monitor_entry.split('/', 1) if monitor_user == user_login and monitor_repo == repo.name: should_monitor = True break else: # Format: just 'repo_name' (from CLI) - check if repo name matches for current user if monitor_entry == repo.name and repo.owner.login == user_login: should_monitor = True break if should_monitor: repos_list_filtered.append(repo) try: list_of_repos = github_process_repos(repos_list_filtered) except Exception as e: print(f"* Cannot process list of public repositories: {e}") print_cur_ts("\nTimestamp:\t\t\t") list_of_repos_old = list_of_repos if not DO_NOT_MONITOR_GITHUB_EVENTS: print(f"Latest event:\n") if available_events == 0: print("There are no events yet") else: try: github_print_event(events[0], g, True) except Exception as e: print(f"\n* Warning: cannot fetch last event details: {e}") print_cur_ts("\nTimestamp:\t\t\t") followers_old = [] followings_old = [] repos_old = [] starred_old = [] try: followers_old = [follower.login for follower in followers_list] followings_old = [following.login for following in followings_list] repos_old = [repo.name for repo in repos_list] starred_old = [star.full_name for star in starred_list] except Exception as e: print(f"* Error: {e}") sys.exit(1) time.sleep(GITHUB_CHECK_INTERVAL) alive_counter = 0 email_sent = False # Primary loop while True: try: g_user = g.get_user(user) email_sent = False except (GithubException, Exception) as e: print(f"* Error, retrying in {display_time(GITHUB_CHECK_INTERVAL)}: {e}") should_notify = False reason_msg = None if isinstance(e, BadCredentialsException): reason_msg = "GitHub token might not be valid anymore (bad credentials error)!" else: matched = next((msg for msg in ["Forbidden", "Bad Request"] if msg in str(e)), None) if matched: reason_msg = f"Session might not be valid ('{matched}' error)" if reason_msg: print(f"* {reason_msg}") should_notify = True if should_notify and ERROR_NOTIFICATION and not email_sent: m_subject = f"github_monitor: session error! (user: {user})" m_body = f"{reason_msg}\n{e}{get_cur_ts(nl_ch + nl_ch + 'Timestamp: ')}" m_body_html = ( f"" f"{html.escape(reason_msg)}
        " f"{html.escape(str(e))}{get_cur_ts('

        Timestamp: ')}" f"" ) print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) email_sent = True print_cur_ts("Timestamp:\t\t\t") time.sleep(GITHUB_CHECK_INTERVAL) continue # Changed followings try: followings_raw = list(gh_call(g_user.get_following)()) followings_count = gh_call(lambda: g_user.following)() except NET_ERRORS as e: print(f"* Error while fetching followings: {e}") print_cur_ts("Timestamp:\t\t\t") followings_raw = None followings_count = None if followings_raw is not None and followings_count is not None: followings_old, followings_old_count = handle_profile_change("Followings", followings_old_count, followings_count, followings_old, followings_raw, user, csv_file_name, field="login") # Changed followers try: followers_raw = list(gh_call(g_user.get_followers)()) followers_count = gh_call(lambda: g_user.followers)() except NET_ERRORS as e: print(f"* Error while fetching followers: {e}") print_cur_ts("Timestamp:\t\t\t") followers_raw = None followers_count = None if followers_raw is not None and followers_count is not None: followers_old, followers_old_count = handle_profile_change("Followers", followers_old_count, followers_count, followers_old, followers_raw, user, csv_file_name, field="login") # Changed public repositories try: if GET_ALL_REPOS: repos_raw = list(gh_call(g_user.get_repos)()) repos_count = gh_call(lambda: g_user.public_repos)() else: repos_raw = list(gh_call(lambda: [repo for repo in g_user.get_repos(type='owner') if not repo.fork and repo.owner.login == user_login])()) repos_count = len(repos_raw) except NET_ERRORS as e: print(f"* Error while fetching repositories: {e}") print_cur_ts("Timestamp:\t\t\t") repos_raw = None repos_count = None if repos_raw is not None and repos_count is not None: repos_old, repos_old_count = handle_profile_change("Repos", repos_old_count, repos_count, repos_old, repos_raw, user, csv_file_name, field="name") # Changed starred repositories try: starred_raw = gh_call(g_user.get_starred)() if starred_raw is not None: starred_list = list(starred_raw) starred_count = starred_raw.totalCount else: starred_list = None starred_count = None except NET_ERRORS as e: print(f"* Error while fetching starred repositories: {e}") print_cur_ts("Timestamp:\t\t\t") starred_list = None starred_count = None if starred_list is not None and starred_count is not None: starred_old, starred_old_count = handle_profile_change("Starred Repos", starred_old_count, starred_count, starred_old, starred_list, user, csv_file_name, field="full_name") # Changed contributions in a day if TRACK_CONTRIB_CHANGES: contrib_notify, contrib_curr, contrib_error_notify = check_daily_contribs(user, GITHUB_TOKEN, contrib_state, min_delta=1, fail_threshold=3) if contrib_error_notify and ERROR_NOTIFICATION: failures = contrib_state.get("consecutive_failures", 0) last_err = contrib_state.get("last_error", "Unknown error") err_msg = f"Error: GitHub daily contributions check failed {failures} times. Last error: {last_err}\n" print(err_msg) err_msg_html = ( f"" f"Error: GitHub daily contributions check failed {failures} times. Last error: {html.escape(str(last_err))}
        " f"{get_cur_ts('
        Timestamp: ')}" f"" ) send_email(f"GitHub monitor errors for {user}", err_msg + get_cur_ts(nl_ch + "Timestamp: "), err_msg_html, SMTP_SSL) if contrib_notify: contrib_old = contrib_state.get("prev_count") print(f"* Daily contributions changed for user {user} on {get_short_date_from_ts(contrib_state['day'], show_hour=False)} from {contrib_old} to {contrib_curr}!\n") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "Daily Contribs", user, contrib_old, contrib_curr) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} daily contributions changed from {contrib_old} to {contrib_curr}!" m_body = (f"GitHub user {user} daily contributions changed on {get_short_date_from_ts(contrib_state['day'], show_hour=False)} from {contrib_old} to {contrib_curr}\n\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}") m_body_html = ( f"" f"GitHub user {html.escape(user)} daily contributions changed on {html.escape(get_short_date_from_ts(contrib_state['day'], show_hour=False))} from {contrib_old} to {contrib_curr}

        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if CONTRIB_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Changed bio bio = gh_call(lambda: g_user.bio)() if bio is not None and bio != bio_old: print(f"* Bio has changed for user {user} !\n") print(f"Old bio:\n\n{bio_old}\n") print(f"New bio:\n\n{bio}\n") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "Bio", user, bio_old, bio) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} bio has changed!" m_body = f"GitHub user {user} bio has changed\n\nOld bio:\n\n{bio_old}\n\nNew bio:\n\n{bio}\n\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" bio_old_html = markdown_to_html(bio_old, convert_line_breaks=True) if bio_old else "" bio_html = markdown_to_html(bio, convert_line_breaks=True) if bio else "" m_body_html = ( f"" f"GitHub user {html.escape(user)} bio has changed

        " f"Old bio:

        {bio_old_html}

        " f"New bio:

        {bio_html}

        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if PROFILE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) bio_old = bio print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Changed location location = gh_call(lambda: g_user.location)() if location is not None and location != location_old: print(f"* Location has changed for user {user} !\n") print(f"Old location:\t\t\t{location_old}\n") print(f"New location:\t\t\t{location}\n") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "Location", user, location_old, location) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} location has changed!" m_body = f"GitHub user {user} location has changed\n\nOld location: {location_old}\n\nNew location: {location}\n\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" m_body_html = ( f"" f"GitHub user {html.escape(user)} location has changed

        " f"Old location: {html.escape(location_old)}

        " f"New location: {html.escape(location)}

        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if PROFILE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) location_old = location print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Changed user name user_name = gh_call(lambda: g_user.name)() if user_name is not None and user_name != user_name_old: print(f"* User name has changed for user {user} !\n") print(f"Old user name:\t\t\t{user_name_old}\n") print(f"New user name:\t\t\t{user_name}\n") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "User Name", user, user_name_old, user_name) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} name has changed!" m_body = f"GitHub user {user} name has changed\n\nOld user name: {user_name_old}\n\nNew user name: {user_name}\n\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" m_body_html = ( f"" f"GitHub user {html.escape(user)} name has changed

        " f"Old user name: {html.escape(user_name_old)}

        " f"New user name: {html.escape(user_name)}

        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if PROFILE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) user_name_old = user_name print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Changed company company = gh_call(lambda: g_user.company)() if company is not None and company != company_old: print(f"* User company has changed for user {user} !\n") print(f"Old company:\t\t\t{company_old}\n") print(f"New company:\t\t\t{company}\n") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "Company", user, company_old, company) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} company has changed!" m_body = f"GitHub user {user} company has changed\n\nOld company: {company_old}\n\nNew company: {company}\n\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" m_body_html = ( f"" f"GitHub user {html.escape(user)} company has changed

        " f"Old company: {html.escape(company_old)}

        " f"New company: {html.escape(company)}

        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if PROFILE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) company_old = company print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Changed email email = gh_call(lambda: g_user.email)() if email is not None and email != email_old: print(f"* User email has changed for user {user} !\n") print(f"Old email:\t\t\t{email_old}\n") print(f"New email:\t\t\t{email}\n") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "Email", user, email_old, email) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} email has changed!" m_body = f"GitHub user {user} email has changed\n\nOld email: {email_old}\n\nNew email: {email}\n\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" m_body_html = ( f"" f"GitHub user {html.escape(user)} email has changed

        " f"Old email: {html.escape(email_old)}

        " f"New email: {html.escape(email)}

        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if PROFILE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) email_old = email print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Changed blog URL blog = gh_call(lambda: g_user.blog)() if blog is not None and blog != blog_old: print(f"* User blog URL has changed for user {user} !\n") print(f"Old blog URL:\t\t\t{blog_old}\n") print(f"New blog URL:\t\t\t{blog}\n") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "Blog URL", user, blog_old, blog) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} blog URL has changed!" m_body = f"GitHub user {user} blog URL has changed\n\nOld blog URL: {blog_old}\n\nNew blog URL: {blog}\n\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" if PROFILE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, "", SMTP_SSL) blog_old = blog print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Changed account update date account_updated_date = gh_call(lambda: g_user.updated_at)() if account_updated_date is not None and account_updated_date != account_updated_date_old: print(f"* User account has been updated for user {user} ! (after {calculate_timespan(account_updated_date, account_updated_date_old, show_seconds=False, granularity=2)})\n") print(f"Old account update date:\t{get_date_from_ts(account_updated_date_old)}\n") print(f"New account update date:\t{get_date_from_ts(account_updated_date)}\n") try: if csv_file_name: write_csv_entry(csv_file_name, convert_to_local_naive(account_updated_date), "Account Update Date", user, convert_to_local_naive(account_updated_date_old), convert_to_local_naive(account_updated_date)) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} account has been updated! (after {calculate_timespan(account_updated_date, account_updated_date_old, show_seconds=False, granularity=2)})" m_body = f"GitHub user {user} account has been updated (after {calculate_timespan(account_updated_date, account_updated_date_old, show_seconds=False, granularity=2)})\n\nOld account update date: {get_date_from_ts(account_updated_date_old)}\n\nNew account update date: {get_date_from_ts(account_updated_date)}\n\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" if PROFILE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, "", SMTP_SSL) account_updated_date_old = account_updated_date print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Profile visibility changed public = is_profile_public(g, user) if public != public_old: def _get_profile_status(public): return "public" if public else "private" print(f"* User {user} has changed profile visibility to '{_get_profile_status(public)}' !\n") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "Profile Visibility", user, _get_profile_status(public_old), _get_profile_status(public)) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} has changed profile visibility to '{_get_profile_status(public)}' !" m_body = f"GitHub user {user} has changed profile visibility to '{_get_profile_status(public)}' !\n\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" if PROFILE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, "", SMTP_SSL) public_old = public print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Blocked status changed blocked = is_blocked_by(user) if public else None if blocked is not None and blocked_old is None: blocked_old = blocked elif None not in (blocked_old, blocked) and blocked != blocked_old: def _get_blocked_status(blocked, public): return 'Unknown' if blocked is None else ('Yes' if blocked else 'No') print(f"* User {user} has {'blocked' if blocked else 'unblocked'} you!\n") try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "Block Status", user, _get_blocked_status(blocked_old, public), _get_blocked_status(blocked, public)) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} has {'blocked' if blocked else 'unblocked'} you!" m_body = f"GitHub user {user} has {'blocked' if blocked else 'unblocked'} you!\n\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" if PROFILE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, "", SMTP_SSL) blocked_old = blocked print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") list_of_repos = [] # Changed repos details if TRACK_REPOS_CHANGES: if GET_ALL_REPOS: repos_list = gh_call(g_user.get_repos)() else: repos_list = gh_call(lambda: [repo for repo in g_user.get_repos(type='owner') if not repo.fork and repo.owner.login == user_login])() # Filter repos for detailed monitoring only (keep full repos_list for profile change detection) repos_list_filtered = repos_list if repos_list is not None and 'ALL' not in REPOS_TO_MONITOR: repos_list_filtered = [] for repo in repos_list: # Check if repo matches any entry in REPOS_TO_MONITOR should_monitor = False for monitor_entry in REPOS_TO_MONITOR: if '/' in monitor_entry: # Format: 'user/repo_name' - check if user matches and repo matches monitor_user, monitor_repo = monitor_entry.split('/', 1) if monitor_user == user_login and monitor_repo == repo.name: should_monitor = True break else: # Format: just 'repo_name' (from CLI) - check if repo name matches for current user if monitor_entry == repo.name and repo.owner.login == user_login: should_monitor = True break if should_monitor: repos_list_filtered.append(repo) if repos_list_filtered is not None: try: list_of_repos = github_process_repos(repos_list_filtered, show_progress=False) list_of_repos_ok = True except Exception as e: list_of_repos = list_of_repos_old print(f"* Cannot process list of public repositories, keeping old list: {e}") list_of_repos_ok = False if list_of_repos_ok: for repo in list_of_repos: r_name = repo.get("name") r_descr = repo.get("descr", "") r_forks = repo.get("forks", 0) r_stars = repo.get("stars", 0) r_subscribers = repo.get("subscribers", 0) r_url = repo.get("url", "") r_update = repo.get("update_date") r_stargazers_list = repo.get("stargazers_list") r_subscribers_list = repo.get("subscribers_list") r_forked_repos = repo.get("forked_repos") r_issues = repo.get("issues") r_pulls = repo.get("pulls") r_issues_list = repo.get("issues_list") r_pulls_list = repo.get("pulls_list") for repo_old in list_of_repos_old: r_name_old = repo_old.get("name") if r_name_old == r_name: r_descr_old = repo_old.get("descr", "") r_forks_old = repo_old.get("forks", 0) r_stars_old = repo_old.get("stars", 0) r_subscribers_old = repo_old.get("subscribers", 0) r_url_old = repo_old.get("url", "") r_update_old = repo_old.get("update_date") r_stargazers_list_old = repo_old.get("stargazers_list") r_subscribers_list_old = repo_old.get("subscribers_list") r_forked_repos_old = repo_old.get("forked_repos") r_issues_old = repo_old.get("issues") r_pulls_old = repo_old.get("pulls") r_issues_list_old = repo_old.get("issues_list") r_pulls_list_old = repo_old.get("pulls_list") # Update date for repo changed if r_update != r_update_old: r_message = f"* Repo '{r_name}' update date changed (after {calculate_timespan(r_update, r_update_old, show_seconds=False, granularity=2)})\n* Repo URL: {r_url}\n\nOld repo update date:\t{get_date_from_ts(r_update_old)}\n\nNew repo update date:\t{get_date_from_ts(r_update)}\n" print(r_message) try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "Repo Update Date", r_name, convert_to_local_naive(r_update_old), convert_to_local_naive(r_update)) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} repo '{r_name}' update date has changed ! (after {calculate_timespan(r_update, r_update_old, show_seconds=False, granularity=2)})" m_body = f"{r_message}\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" timespan_str = calculate_timespan(r_update, r_update_old, show_seconds=False, granularity=2) m_body_html = ( f"" f"* Repo '{html.escape(r_name)}' update date changed (after {html.escape(timespan_str)})
        " f"* Repo URL: {html.escape(r_url)}

        " f"Old repo update date: {html.escape(get_date_from_ts(r_update_old))}

        " f"New repo update date: {html.escape(get_date_from_ts(r_update))}

        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if REPO_UPDATE_DATE_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") # Number of stars for repo changed check_repo_list_changes(r_stars_old, r_stars, r_stargazers_list_old, r_stargazers_list, "Stargazers", r_name, r_url, user, csv_file_name) # Number of watchers/subscribers for repo changed check_repo_list_changes(r_subscribers_old, r_subscribers, r_subscribers_list_old, r_subscribers_list, "Watchers", r_name, r_url, user, csv_file_name) # Number of forks for repo changed check_repo_list_changes(r_forks_old, r_forks, r_forked_repos_old, r_forked_repos, "Forks", r_name, r_url, user, csv_file_name) # Number of issues for repo changed check_repo_list_changes(r_issues_old, r_issues, r_issues_list_old, r_issues_list, "Issues", r_name, r_url, user, csv_file_name) # Number of PRs for repo changed check_repo_list_changes(r_pulls_old, r_pulls, r_pulls_list_old, r_pulls_list, "Pull Requests", r_name, r_url, user, csv_file_name) # Repo description changed if r_descr != r_descr_old: r_message = f"* Repo '{r_name}' description changed from:\n\n'{r_descr_old}'\n\nto:\n\n'{r_descr}'\n\n* Repo URL: {r_url}\n" print(r_message) try: if csv_file_name: write_csv_entry(csv_file_name, now_local_naive(), "Repo Description", r_name, r_descr_old, r_descr) except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} repo '{r_name}' description has changed !" m_body = f"{r_message}\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" r_descr_old_html = markdown_to_html(r_descr_old, convert_line_breaks=True) if r_descr_old else "" r_descr_html = markdown_to_html(r_descr, convert_line_breaks=True) if r_descr else "" m_body_html = ( f"" f"* Repo '{html.escape(r_name)}' description changed from:

        " f"'{r_descr_old_html}'

        " f"to:

        " f"'{r_descr_html}'

        " f"* Repo URL: {html.escape(r_url)}

        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if REPO_NOTIFICATION: print(f"Sending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") list_of_repos_old = list_of_repos # New GitHub events if not DO_NOT_MONITOR_GITHUB_EVENTS: events = list(gh_call(lambda: list(islice(g_user.get_events(), EVENTS_NUMBER)))()) if events is not None: available_events = len(events) if available_events == 0: last_event_id = 0 last_event_ts = None else: try: newest = events[0] last_event_id = newest.id if last_event_id: last_event_ts = newest.created_at except Exception as e: last_event_id = 0 last_event_ts = None print(f"* Cannot get last event ID / timestamp: {e}") print_cur_ts("Timestamp:\t\t\t") events_list_of_ids = set() first_new = True # New events showed up if last_event_id and last_event_id != last_event_id_old: for event in reversed(events): events_list_of_ids.add(event.id) if event.id in events_list_of_ids_old: continue if event.type in EVENTS_TO_MONITOR or 'ALL' in EVENTS_TO_MONITOR: event_date = None repo_name = "" repo_url = "" event_text = "" try: event_date, repo_name, repo_url, event_text = github_print_event(event, g, first_new, last_event_ts_old) except Exception as e: print(f"\n* Warning, cannot fetch all event details: {e}") first_new = False if event_date and repo_name and event_text: try: if csv_file_name: write_csv_entry(csv_file_name, convert_to_local_naive(event_date), str(event.type), str(repo_name), "", "") except Exception as e: print(f"* Error: {e}") m_subject = f"GitHub user {user} has new {event.type} (repo: {repo_name})" m_body = f"GitHub user {user} has new {event.type} event\n\n{event_text}\nCheck interval: {display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)}){get_cur_ts(nl_ch + 'Timestamp: ')}" event_payload = None try: if hasattr(event, 'payload'): event_payload = event.payload except Exception: pass event_text_html = event_text_to_html(event_text, event.type, event_payload) m_body_html = ( f"" f"GitHub user {html.escape(user)} has new {html.escape(event.type)} event

        " f"{event_text_html}
        " f"Check interval: {html.escape(display_time(GITHUB_CHECK_INTERVAL))} ({html.escape(get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True))}){get_cur_ts('
        Timestamp: ')}" f"" ) if EVENT_NOTIFICATION: print(f"\nSending email notification to {RECEIVER_EMAIL}") send_email(m_subject, m_body, m_body_html, SMTP_SSL) print(f"Check interval:\t\t\t{display_time(GITHUB_CHECK_INTERVAL)} ({get_range_of_dates_from_tss(int(time.time()) - GITHUB_CHECK_INTERVAL, int(time.time()), short=True)})") print_cur_ts("Timestamp:\t\t\t") last_event_id_old = last_event_id last_event_ts_old = last_event_ts events_list_of_ids_old = events_list_of_ids.copy() alive_counter += 1 if LIVENESS_CHECK_COUNTER and alive_counter >= LIVENESS_CHECK_COUNTER: print_cur_ts("Liveness check, timestamp:\t") alive_counter = 0 time.sleep(GITHUB_CHECK_INTERVAL) def main(): global CLI_CONFIG_PATH, DOTENV_FILE, LOCAL_TIMEZONE, LIVENESS_CHECK_COUNTER, GITHUB_TOKEN, GITHUB_API_URL, CSV_FILE, DISABLE_LOGGING, GITHUB_LOGFILE, PROFILE_NOTIFICATION, EVENT_NOTIFICATION, REPO_NOTIFICATION, REPO_UPDATE_DATE_NOTIFICATION, ERROR_NOTIFICATION, GITHUB_CHECK_INTERVAL, SMTP_PASSWORD, stdout_bck, DO_NOT_MONITOR_GITHUB_EVENTS, TRACK_REPOS_CHANGES, REPOS_TO_MONITOR, GET_ALL_REPOS, CONTRIB_NOTIFICATION, TRACK_CONTRIB_CHANGES if "--generate-config" in sys.argv: print(CONFIG_BLOCK.strip("\n")) sys.exit(0) if "--version" in sys.argv: print(f"{os.path.basename(sys.argv[0])} v{VERSION}") sys.exit(0) stdout_bck = sys.stdout signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) clear_screen(CLEAR_SCREEN) print(f"GitHub Monitoring Tool v{VERSION}\n") parser = argparse.ArgumentParser( prog="github_monitor", description=("Monitor a GitHub user's profile and activity with customizable email alerts [ https://github.com/misiektoja/github_monitor/ ]"), formatter_class=argparse.RawTextHelpFormatter ) # Positional parser.add_argument( "username", nargs="?", metavar="GITHUB_USERNAME", help="GitHub username", type=str ) # Version, just to list in help, it is handled earlier parser.add_argument( "--version", action="version", version=f"%(prog)s v{VERSION}" ) # Configuration & dotenv files conf = parser.add_argument_group("Configuration & dotenv files") conf.add_argument( "--config-file", dest="config_file", metavar="PATH", help="Location of the optional config file", ) conf.add_argument( "--generate-config", action="store_true", help="Print default config template and exit", ) conf.add_argument( "--env-file", dest="env_file", metavar="PATH", help="Path to optional dotenv file (auto-search if not set, disable with 'none')", ) # API settings creds = parser.add_argument_group("API settings") creds.add_argument( "-t", "--github-token", dest="github_token", metavar="GITHUB_TOKEN", type=str, help="GitHub personal access token (classic)" ) creds.add_argument( "-x", "--github-url", dest="github_url", metavar="GITHUB_URL", type=str, help="GitHub API URL" ) # Notifications notify = parser.add_argument_group("Notifications") notify.add_argument( "-p", "--notify-profile", dest="notify_profile", action="store_true", default=None, help="Email when user's profile changes" ) notify.add_argument( "-s", "--notify-events", dest="notify_events", action="store_true", default=None, help="Email when new GitHub events appear" ) notify.add_argument( "-q", "--notify-repo-changes", dest="notify_repo_changes", action="store_true", default=None, help="Email when user's repositories change (stargazers, watchers, forks, issues, PRs, description etc., except for update date)" ) notify.add_argument( "-u", "--notify-repo-update-date", dest="notify_repo_update_date", action="store_true", default=None, help="Email when user's repositories update date changes" ) notify.add_argument( "-y", "--notify-daily-contribs", dest="notify_daily_contribs", action="store_true", default=None, help="Email when user's daily contributions count changes" ) notify.add_argument( "-e", "--no-error-notify", dest="notify_errors", action="store_false", default=None, help="Disable email on errors" ) notify.add_argument( "--send-test-email", dest="send_test_email", action="store_true", help="Send test email to verify SMTP settings" ) # Intervals & timers times = parser.add_argument_group("Intervals & timers") times.add_argument( "-c", "--check-interval", dest="check_interval", metavar="SECONDS", type=int, help="Time between monitoring checks, in seconds" ) # Listing listing = parser.add_argument_group("Listing") listing.add_argument( "-r", "--list-repos", dest="list_repos", action="store_true", default=None, help="List user's repositories with stats" ) listing.add_argument( "-g", "--list-starred-repos", dest="list_starred_repos", action="store_true", default=None, help="List user's starred repositories" ) listing.add_argument( "-f", "--list-followers-followings", dest="list_followers_and_followings", action="store_true", default=None, help="List user's followers & followings" ) listing.add_argument( "-l", "--list-recent-events", dest="list_recent_events", action="store_true", default=None, help="List user's recent GitHub events" ) listing.add_argument( "-n", "--recent-events-count", dest="recent_events_count", metavar="N", type=int, help="Number of events to list (use with -l)" ) # Features & output opts = parser.add_argument_group("Features & output") opts.add_argument( "-j", "--track-repos-changes", dest="track_repos_changes", action="store_true", default=None, help="Track user's repository changes (changed stargazers, watchers, forks, description, update date etc.)" ) opts.add_argument( "-k", "--no-monitor-events", dest="no_monitor_events", action="store_true", default=None, help="Disable event monitoring" ) opts.add_argument( "-a", "--get-all-repos", dest="get_all_repos", action="store_true", default=None, help="Fetch all user repos (owned, forks, collaborations)" ) opts.add_argument( "-b", "--csv-file", dest="csv_file", metavar="CSV_FILE", type=str, help="Write new events & profile changes to CSV" ) opts.add_argument( "-d", "--disable-logging", dest="disable_logging", action="store_true", default=None, help="Disable logging to github_monitor_.log" ) opts.add_argument( "-m", "--track-contribs-changes", dest="track_contribs_changes", action="store_true", default=None, help="Track user's daily contributions count and log changes" ) opts.add_argument( "--repos", dest="repos", metavar="REPO_LIST", type=str, help="Comma-separated list of repository names to monitor (only when -j/--track-repos-changes is enabled). Overrides REPOS_TO_MONITOR config. Example: --repos \"repo1,repo2,repo3\"" ) args = parser.parse_args() if len(sys.argv) == 1: parser.print_help(sys.stderr) sys.exit(1) if args.config_file: CLI_CONFIG_PATH = os.path.expanduser(args.config_file) cfg_path = find_config_file(CLI_CONFIG_PATH) if not cfg_path and CLI_CONFIG_PATH: print(f"* Error: Config file '{CLI_CONFIG_PATH}' does not exist") sys.exit(1) if cfg_path: try: with open(cfg_path, "r") as cf: exec(cf.read(), globals()) except Exception as e: print(f"* Error loading config file '{cfg_path}': {e}") sys.exit(1) if args.env_file: DOTENV_FILE = os.path.expanduser(args.env_file) else: if DOTENV_FILE: DOTENV_FILE = os.path.expanduser(DOTENV_FILE) if DOTENV_FILE and DOTENV_FILE.lower() == 'none': env_path = None else: try: from dotenv import load_dotenv, find_dotenv if DOTENV_FILE: env_path = DOTENV_FILE if not os.path.isfile(env_path): print(f"* Warning: dotenv file '{env_path}' does not exist\n") else: load_dotenv(env_path, override=True) else: env_path = find_dotenv() or None if env_path: load_dotenv(env_path, override=True) except ImportError: env_path = DOTENV_FILE if DOTENV_FILE else None if env_path: print(f"* Warning: Cannot load dotenv file '{env_path}' because 'python-dotenv' is not installed\n\nTo install it, run:\n pip3 install python-dotenv\n\nOnce installed, re-run this tool\n") if env_path: for secret in SECRET_KEYS: val = os.getenv(secret) if val is not None: globals()[secret] = val local_tz = None if LOCAL_TIMEZONE == "Auto": if get_localzone is not None: try: local_tz = get_localzone() except Exception: pass if local_tz: LOCAL_TIMEZONE = str(local_tz) else: print("* Error: Cannot detect local timezone, consider setting LOCAL_TIMEZONE to your local timezone manually !") sys.exit(1) else: if not is_valid_timezone(LOCAL_TIMEZONE): print(f"* Error: Configured LOCAL_TIMEZONE '{LOCAL_TIMEZONE}' is not valid. Please use a valid pytz timezone name.") sys.exit(1) if not check_internet(): sys.exit(1) if args.send_test_email: print("* Sending test email notification ...\n") if send_email("github_monitor: test email", "This is test email - your SMTP settings seems to be correct !", "", SMTP_SSL, smtp_timeout=5) == 0: print("* Email sent successfully !") else: sys.exit(1) sys.exit(0) if args.github_token: GITHUB_TOKEN = args.github_token if not GITHUB_TOKEN or GITHUB_TOKEN == "your_github_classic_personal_access_token": print("* Error: GITHUB_TOKEN (-t / --github_token) value is empty or incorrect") sys.exit(1) if not args.username: print("* Error: GITHUB_USERNAME argument is required !") sys.exit(1) if args.github_url: GITHUB_API_URL = args.github_url if not GITHUB_API_URL: print("* Error: GITHUB_API_URL (-x / --github_url) value is empty") sys.exit(1) if args.get_all_repos is True: GET_ALL_REPOS = True if args.list_followers_and_followings: try: github_print_followers_and_followings(args.username) except Exception as e: print(f"* Error: {e}") sys.exit(1) sys.exit(0) if args.list_repos: try: github_print_repos(args.username) except Exception as e: print(f"* Error: {e}") sys.exit(1) sys.exit(0) if args.list_starred_repos: try: github_print_starred_repos(args.username) except Exception as e: print(f"* Error: {e}") sys.exit(1) sys.exit(0) if args.check_interval: GITHUB_CHECK_INTERVAL = args.check_interval LIVENESS_CHECK_COUNTER = LIVENESS_CHECK_INTERVAL / GITHUB_CHECK_INTERVAL if args.csv_file: CSV_FILE = os.path.expanduser(args.csv_file) else: if CSV_FILE: CSV_FILE = os.path.expanduser(CSV_FILE) if CSV_FILE: try: with open(CSV_FILE, 'a', newline='', buffering=1, encoding="utf-8") as _: pass except Exception as e: print(f"* Error: CSV file cannot be opened for writing: {e}") sys.exit(1) if args.list_recent_events: if args.recent_events_count and args.recent_events_count > 0: events_n = args.recent_events_count else: events_n = 5 try: github_list_events(args.username, events_n, CSV_FILE) except Exception as e: print(f"* Error: {e}") sys.exit(1) sys.exit(0) if args.disable_logging is True: DISABLE_LOGGING = True if not DISABLE_LOGGING: log_path = Path(os.path.expanduser(GITHUB_LOGFILE)) if log_path.parent != Path('.'): if log_path.suffix == "": log_path = log_path.parent / f"{log_path.name}_{args.username}.log" else: if log_path.suffix == "": log_path = Path(f"{log_path.name}_{args.username}.log") log_path.parent.mkdir(parents=True, exist_ok=True) FINAL_LOG_PATH = str(log_path) sys.stdout = Logger(FINAL_LOG_PATH) else: FINAL_LOG_PATH = None if args.notify_profile is True: PROFILE_NOTIFICATION = True if args.notify_events is True: EVENT_NOTIFICATION = True if args.notify_repo_changes is True: REPO_NOTIFICATION = True if args.notify_repo_update_date is True: REPO_UPDATE_DATE_NOTIFICATION = True if args.notify_daily_contribs is True: CONTRIB_NOTIFICATION = True if args.notify_errors is False: ERROR_NOTIFICATION = False if args.track_repos_changes is True: TRACK_REPOS_CHANGES = True if args.repos is not None: if not TRACK_REPOS_CHANGES: print("* Error: --repos requires -j/--track-repos-changes to be enabled") sys.exit(1) # Split comma-separated repo names and strip whitespace REPOS_TO_MONITOR = [repo.strip() for repo in args.repos.split(',') if repo.strip()] if args.track_contribs_changes is True: TRACK_CONTRIB_CHANGES = True if args.no_monitor_events is True: DO_NOT_MONITOR_GITHUB_EVENTS = True if not TRACK_REPOS_CHANGES: REPO_NOTIFICATION = False REPO_UPDATE_DATE_NOTIFICATION = False if not TRACK_CONTRIB_CHANGES: CONTRIB_NOTIFICATION = False if DO_NOT_MONITOR_GITHUB_EVENTS: EVENT_NOTIFICATION = False if SMTP_HOST.startswith("your_smtp_server_"): EVENT_NOTIFICATION = False PROFILE_NOTIFICATION = False REPO_NOTIFICATION = False REPO_UPDATE_DATE_NOTIFICATION = False CONTRIB_NOTIFICATION = False ERROR_NOTIFICATION = False print(f"* GitHub polling interval:\t[ {display_time(GITHUB_CHECK_INTERVAL)} ]") print(f"* Email notifications:\t\t[profile changes = {PROFILE_NOTIFICATION}] [new events = {EVENT_NOTIFICATION}]\n*\t\t\t\t[repos changes = {REPO_NOTIFICATION}] [repos update date = {REPO_UPDATE_DATE_NOTIFICATION}]\n*\t\t\t\t[contrib changes = {CONTRIB_NOTIFICATION}] [errors = {ERROR_NOTIFICATION}]") print(f"* GitHub API URL:\t\t{GITHUB_API_URL}") print(f"* Track repos changes:\t\t{TRACK_REPOS_CHANGES}") print(f"* Track contrib changes:\t{TRACK_CONTRIB_CHANGES}") print(f"* Monitor GitHub events:\t{not DO_NOT_MONITOR_GITHUB_EVENTS}") print(f"* Get owned repos only:\t\t{not GET_ALL_REPOS}") print(f"* Liveness check:\t\t{bool(LIVENESS_CHECK_INTERVAL)}" + (f" ({display_time(LIVENESS_CHECK_INTERVAL)})" if LIVENESS_CHECK_INTERVAL else "")) print(f"* CSV logging enabled:\t\t{bool(CSV_FILE)}" + (f" ({CSV_FILE})" if CSV_FILE else "")) print(f"* Output logging enabled:\t{not DISABLE_LOGGING}" + (f" ({FINAL_LOG_PATH})" if not DISABLE_LOGGING else "")) print(f"* Configuration file:\t\t{cfg_path}") print(f"* Dotenv file:\t\t\t{env_path or 'None'}") print(f"* Local timezone:\t\t{LOCAL_TIMEZONE}") out = f"\nMonitoring GitHub user {args.username}" print(out) # print("-" * len(out)) print("─" * HORIZONTAL_LINE1) # We define signal handlers only for Linux, Unix & MacOS since Windows has limited number of signals supported if platform.system() != 'Windows': signal.signal(signal.SIGUSR1, toggle_profile_changes_notifications_signal_handler) signal.signal(signal.SIGUSR2, toggle_new_events_notifications_signal_handler) signal.signal(signal.SIGCONT, toggle_repo_changes_notifications_signal_handler) signal.signal(signal.SIGPIPE, toggle_repo_update_date_changes_notifications_signal_handler) signal.signal(signal.SIGURG, toggle_contrib_changes_notifications_signal_handler) signal.signal(signal.SIGTRAP, increase_check_signal_handler) signal.signal(signal.SIGABRT, decrease_check_signal_handler) signal.signal(signal.SIGHUP, reload_secrets_signal_handler) github_monitor_user(args.username, CSV_FILE) sys.stdout = stdout_bck sys.exit(0) if __name__ == "__main__": main()