#!/usr/bin/env python3
"""
twitter-archive-parser - Python code to parse a Twitter archive and output in various ways
Copyright (C) 2022 Tim Hutton - https://github.com/timhutton/twitter-archive-parser
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
"""
from collections import defaultdict
from typing import Optional
from urllib.parse import urlparse
import datetime
import glob
import importlib
import json
import logging
import os
import re
import shutil
import subprocess
import sys
import time
# hot-loaded if needed, see import_module():
# imagesize
# requests
# Print a compile-time error in Python < 3.6. This line does nothing in Python 3.6+ but is reported to the user
# as an error (because it is the first line that fails to compile) in older versions.
f' Error: This script requires Python 3.6 or later. Use `python --version` to check your version.'
class UserData:
def __init__(self, user_id: str, handle: str):
if user_id is None:
raise ValueError('ID "None" is not allowed in UserData.')
self.user_id = user_id
if handle is None:
raise ValueError('handle "None" is not allowed in UserData.')
self.handle = handle
class PathConfig:
"""
Helper class containing constants for various directories and files.
The script will only add / change / delete content in its own directories, which start with `parser-`.
Files within `parser-output` are the end result that the user is probably interested in.
Files within `parser-cache` are temporary working files, which improve the efficiency if you run
this script multiple times. They can safely be removed without harming the consistency of the
files within `parser-output`.
"""
def __init__(self, dir_archive):
self.dir_archive = dir_archive
self.dir_input_data = os.path.join(dir_archive, 'data')
self.file_account_js = os.path.join(self.dir_input_data, 'account.js')
# check if user is in correct folder
if not os.path.isfile(self.file_account_js):
print(f'Error: Failed to load {self.file_account_js}. ')
exit()
self.dir_input_media = find_dir_input_media(self.dir_input_data)
self.dir_output = os.path.join(self.dir_archive, 'parser-output')
self.dir_output_media = os.path.join(self.dir_output, 'media')
self.dir_output_cache = os.path.join(self.dir_archive, 'parser-cache')
self.file_output_following = os.path.join(self.dir_output, 'following.txt')
self.file_output_followers = os.path.join(self.dir_output, 'followers.txt')
self.file_download_log = os.path.join(self.dir_output_media, 'download_log.txt')
self.file_tweet_icon = os.path.join(self.dir_output_media, 'tweet.ico')
self.files_input_tweets = find_files_input_tweets(self.dir_input_data)
# structured like an actual tweet output file, can be used to compute relative urls to a media file
self.example_file_output_tweets = self.create_path_for_file_output_tweets(year=2020, month=12)
def create_path_for_file_output_tweets(self, year, month, format="html", kind="tweets") -> str:
"""Builds the path for a tweet-archive file based on some properties."""
# Previously the filename was f'{dt.year}-{dt.month:02}-01-Tweet-Archive-{dt.year}-{dt.month:02}'
return os.path.join(self.dir_output, f"{kind}-{format}", f"{year:04}", f"{year:04}-{month:02}-01-{kind}.{format}")
def create_path_for_file_output_dms(self, name: str, index: Optional[int]=None, format: str="html", kind: str="DMs") -> str:
"""Builds the path for a dm-archive file based on some properties."""
index_suffix = ""
if (index):
index_suffix = f"-part{index:03}"
return os.path.join(self.dir_output, kind, f"{kind}-{name}{index_suffix}.{format}")
def create_path_for_file_output_single(self, format: str, kind: str)->str:
"""Builds the path for a single output file which, i.e. one that is not part of a larger group or sequence."""
return os.path.join(self.dir_output, f"{kind}.{format}")
def get_consent(prompt: str, default_to_yes: bool = False):
"""Asks the user for consent, using the given prompt. Accepts various versions of yes/no, or
an empty answer to accept the default. The default is 'no' unless default_to_yes is passed as
True. The default will be indicated automatically. For unacceptable answers, the user will
be asked again."""
if default_to_yes:
suffix = " [Y/n]"
default_answer = "yes"
else:
suffix = " [y/N]"
default_answer = "no"
while True:
user_input = input(prompt + suffix)
if user_input == "":
print (f"Your empty response was assumed to mean '{default_answer}' (the default for this question).")
return default_to_yes
if user_input.lower() in ('y', 'yes'):
return True
if user_input.lower() in ('n', 'no'):
return False
print (f"Sorry, did not understand. Please answer with y, n, yes, no, or press enter to accept "
f"the default (which is '{default_answer}' in this case, as indicated by the uppercase "
f"'{default_answer.upper()[0]}'.)")
def import_module(module):
"""Imports a module specified by a string. Example: requests = import_module('requests')"""
try:
return importlib.import_module(module)
except ImportError:
print(f'\nError: This script uses the "{module}" module which is not installed.\n')
if not get_consent('OK to install using pip?'):
exit()
subprocess.run([sys.executable, '-m', 'pip', 'install', module], check=True)
return importlib.import_module(module)
def open_and_mkdirs(path_file):
"""Opens a file for writing. If the parent directory does not exist yet, it is created first."""
mkdirs_for_file(path_file)
return open(path_file, 'w', encoding='utf-8')
def mkdirs_for_file(path_file):
"""Creates the parent directory of the given file, if it does not exist yet."""
path_dir = os.path.split(path_file)[0]
os.makedirs(path_dir, exist_ok=True)
def rel_url(media_path, document_path):
"""Computes the relative URL needed to link from `document_path` to `media_path`.
Assumes that `document_path` points to a file (e.g. `.md` or `.html`), not a directory."""
return os.path.relpath(media_path, os.path.split(document_path)[0]).replace("\\", "/")
def get_twitter_api_guest_token(session, bearer_token):
"""Returns a Twitter API guest token for the current session."""
guest_token_response = session.post("https://api.twitter.com/1.1/guest/activate.json",
headers={'authorization': f'Bearer {bearer_token}'},
timeout=2,
)
guest_token = json.loads(guest_token_response.content)['guest_token']
if not guest_token:
raise Exception(f"Failed to retrieve guest token")
return guest_token
def get_twitter_users(session, bearer_token, guest_token, user_ids):
"""Asks Twitter for all metadata associated with user_ids."""
users = {}
while user_ids:
max_batch = 100
user_id_batch = user_ids[:max_batch]
user_ids = user_ids[max_batch:]
user_id_list = ",".join(user_id_batch)
query_url = f"https://api.twitter.com/1.1/users/lookup.json?user_id={user_id_list}"
response = session.get(query_url,
headers={'authorization': f'Bearer {bearer_token}', 'x-guest-token': guest_token},
timeout=2,
)
if not response.status_code == 200:
raise Exception(f'Failed to get user handle: {response}')
response_json = json.loads(response.content)
for user in response_json:
users[user["id_str"]] = user
return users
def lookup_users(user_ids, users):
"""Fill the users dictionary with data from Twitter"""
# Filter out any users already known
filtered_user_ids = [id for id in user_ids if id not in users]
if not filtered_user_ids:
# Don't bother opening a session if there's nothing to get
return
# Account metadata observed at ~2.1KB on average.
estimated_size = int(2.1 * len(filtered_user_ids))
print(f'{len(filtered_user_ids)} users are unknown.')
if not get_consent(f'Download user data from Twitter (approx {estimated_size:,} KB)?'):
return
requests = import_module('requests')
try:
with requests.Session() as session:
bearer_token = 'AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'
guest_token = get_twitter_api_guest_token(session, bearer_token)
retrieved_users = get_twitter_users(session, bearer_token, guest_token, filtered_user_ids)
for user_id, user in retrieved_users.items():
if user["screen_name"] is not None:
users[user_id] = UserData(user_id=user_id, handle=user["screen_name"])
print() # empty line for better readability of output
except Exception as err:
print(f'Failed to download user data: {err}')
def read_json_from_js_file(filename):
"""Reads the contents of a Twitter-produced .js file into a dictionary."""
print(f'Parsing {filename}...')
with open(filename, 'r', encoding='utf8') as f:
data = f.readlines()
# if the JSON has no real content, it can happen that the file is only one line long.
# in this case, return an empty dict to avoid errors while trying to read non-existing lines.
if len(data) <= 1:
return {}
# convert js file to JSON: replace first line with just '[', squash lines into a single string
prefix = '['
if '{' in data[0]:
prefix += ' {'
data = prefix + ''.join(data[1:])
# parse the resulting JSON and return as a dict
return json.loads(data)
def extract_username(paths: PathConfig):
"""Returns the user's Twitter username from account.js."""
account = read_json_from_js_file(paths.file_account_js)
return account[0]['account']['username']
def convert_tweet(tweet, username, media_sources, users: dict, paths: PathConfig):
"""Converts a JSON-format tweet. Returns tuple of timestamp, markdown and HTML."""
if 'tweet' in tweet.keys():
tweet = tweet['tweet']
tweet_text = tweet['full_text']
tweet_id_str = tweet['id_str']
if 'in_reply_to_status_id' in tweet:
replying_to = re.match(r'^(@[0-9A-Za-z_]* )*', tweet_text)[0]
if replying_to:
return None
in_reply_to_status_id = tweet['in_reply_to_status_id']
else:
in_reply_to_status_id = None
return tweet_text, tweet_id_str, in_reply_to_status_id
def find_files_input_tweets(dir_path_input_data):
"""Identify the tweet archive's file and folder names -
they change slightly depending on the archive size it seems."""
input_tweets_file_templates = ['tweet.js', 'tweets.js', 'tweets-part*.js', 'deleted-tweets.js']
files_paths_input_tweets = []
for input_tweets_file_template in input_tweets_file_templates:
files_paths_input_tweets += glob.glob(os.path.join(dir_path_input_data, input_tweets_file_template))
if len(files_paths_input_tweets)==0:
print(f'Error: no files matching {input_tweets_file_templates} in {dir_path_input_data}')
exit()
return files_paths_input_tweets
def find_dir_input_media(dir_path_input_data):
input_media_dir_templates = ['tweet_media', 'tweets_media']
input_media_dirs = []
for input_media_dir_template in input_media_dir_templates:
input_media_dirs += glob.glob(os.path.join(dir_path_input_data, input_media_dir_template))
if len(input_media_dirs) == 0:
print(f'Error: no folders matching {input_media_dir_templates} in {dir_path_input_data}')
exit()
if len(input_media_dirs) > 1:
print(f'Error: multiple folders matching {input_media_dir_templates} in {dir_path_input_data}')
exit()
return input_media_dirs[0]
# def download_file_if_larger(url, filename, index, count, sleep_time):
# """Attempts to download from the specified URL. Overwrites file if larger.
# Returns whether the file is now known to be the largest available, and the number of bytes downloaded.
# """
# requests = import_module('requests')
# imagesize = import_module('imagesize')
# pref = f'{index:3d}/{count:3d} {filename}: '
# # Sleep briefly, in an attempt to minimize the possibility of trigging some auto-cutoff mechanism
# if index > 1:
# print(f'{pref}Sleeping...', end='\r')
# time.sleep(sleep_time)
# # Request the URL (in stream mode so that we can conditionally abort depending on the headers)
# print(f'{pref}Requesting headers for {url}...', end='\r')
# byte_size_before = os.path.getsize(filename)
# try:
# with requests.get(url, stream=True, timeout=2) as res:
# if not res.status_code == 200:
# # Try to get content of response as `res.text`.
# # For twitter.com, this will be empty in most (all?) cases.
# # It is successfully tested with error responses from other domains.
# raise Exception(f'Download failed with status "{res.status_code} {res.reason}". '
# f'Response content: "{res.text}"')
# byte_size_after = int(res.headers['content-length'])
# if byte_size_after != byte_size_before:
# # Proceed with the full download
# tmp_filename = filename+'.tmp'
# print(f'{pref}Downloading {url}... ', end='\r')
# with open(tmp_filename,'wb') as f:
# shutil.copyfileobj(res.raw, f)
# post = f'{byte_size_after/2**20:.1f}MB downloaded'
# width_before, height_before = imagesize.get(filename)
# width_after, height_after = imagesize.get(tmp_filename)
# pixels_before, pixels_after = width_before * height_before, width_after * height_after
# pixels_percentage_increase = 100.0 * (pixels_after - pixels_before) / pixels_before
# if width_before == -1 and height_before == -1 and width_after == -1 and height_after == -1:
# # could not check size of both versions, probably a video or unsupported image format
# os.replace(tmp_filename, filename)
# bytes_percentage_increase = 100.0 * (byte_size_after - byte_size_before) / byte_size_before
# logging.info(f'{pref}SUCCESS. New version is {bytes_percentage_increase:3.0f}% '
# f'larger in bytes (pixel comparison not possible). {post}')
# return True, byte_size_after
# elif width_before == -1 or height_before == -1 or width_after == -1 or height_after == -1:
# # could not check size of one version, this should not happen (corrupted download?)
# logging.info(f'{pref}SKIPPED. Pixel size comparison inconclusive: '
# f'{width_before}*{height_before}px vs. {width_after}*{height_after}px. {post}')
# return False, byte_size_after
# elif pixels_after >= pixels_before:
# os.replace(tmp_filename, filename)
# bytes_percentage_increase = 100.0 * (byte_size_after - byte_size_before) / byte_size_before
# if bytes_percentage_increase >= 0:
# logging.info(f'{pref}SUCCESS. New version is {bytes_percentage_increase:3.0f}% larger in bytes '
# f'and {pixels_percentage_increase:3.0f}% larger in pixels. {post}')
# else:
# logging.info(f'{pref}SUCCESS. New version is actually {-bytes_percentage_increase:3.0f}% '
# f'smaller in bytes but {pixels_percentage_increase:3.0f}% '
# f'larger in pixels. {post}')
# return True, byte_size_after
# else:
# logging.info(f'{pref}SKIPPED. Online version has {-pixels_percentage_increase:3.0f}% '
# f'smaller pixel size. {post}')
# return True, byte_size_after
# else:
# logging.info(f'{pref}SKIPPED. Online version is same byte size, assuming same content. Not downloaded.')
# return True, 0
# except Exception as err:
# logging.error(f"{pref}FAIL. Media couldn't be retrieved from {url} because of exception: {err}")
# return False, 0
# def download_larger_media(media_sources, paths: PathConfig):
# """Uses (filename, URL) tuples in media_sources to download files from remote storage.
# Aborts downloads if the remote file is the same size or smaller than the existing local version.
# Retries the failed downloads several times, with increasing pauses between each to avoid being blocked.
# """
# # Log to file as well as the console
# logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(message)s')
# mkdirs_for_file(paths.file_download_log)
# logfile_handler = logging.FileHandler(filename=paths.file_download_log, mode='w')
# logfile_handler.setLevel(logging.INFO)
# logging.getLogger().addHandler(logfile_handler)
# # Download new versions
# start_time = time.time()
# total_bytes_downloaded = 0
# sleep_time = 0.25
# remaining_tries = 5
# while remaining_tries > 0:
# number_of_files = len(media_sources)
# success_count = 0
# retries = []
# for index, (local_media_path, media_url) in enumerate(media_sources):
# success, bytes_downloaded = download_file_if_larger(
# media_url, local_media_path, index + 1, number_of_files, sleep_time
# )
# if success:
# success_count += 1
# else:
# retries.append((local_media_path, media_url))
# total_bytes_downloaded += bytes_downloaded
# # show % done and estimated remaining time:
# time_elapsed: float = time.time() - start_time
# estimated_time_per_file: float = time_elapsed / (index + 1)
# estimated_time_remaining: datetime.datetime = \
# datetime.datetime.fromtimestamp(
# (number_of_files - (index + 1)) * estimated_time_per_file,
# tz=datetime.timezone.utc
# )
# if estimated_time_remaining.hour >= 1:
# time_remaining_string: str = \
# f"{estimated_time_remaining.hour} hour{'' if estimated_time_remaining.hour == 1 else 's'} " \
# f"{estimated_time_remaining.minute} minute{'' if estimated_time_remaining.minute == 1 else 's'}"
# elif estimated_time_remaining.minute >= 1:
# time_remaining_string: str = \
# f"{estimated_time_remaining.minute} minute{'' if estimated_time_remaining.minute == 1 else 's'} " \
# f"{estimated_time_remaining.second} second{'' if estimated_time_remaining.second == 1 else 's'}"
# else:
# time_remaining_string: str = \
# f"{estimated_time_remaining.second} second{'' if estimated_time_remaining.second == 1 else 's'}"
# if index + 1 == number_of_files:
# print(' 100 % done.')
# else:
# print(f' {(100*(index+1)/number_of_files):.1f} % done, about {time_remaining_string} remaining...')
# media_sources = retries
# remaining_tries -= 1
# sleep_time += 2
# logging.info(f'\n{success_count} of {number_of_files} tested media files '
# f'are known to be the best-quality available.\n')
# if len(retries) == 0:
# break
# if remaining_tries > 0:
# print(f'----------------------\n\nRetrying the ones that failed, with a longer sleep. '
# f'{remaining_tries} tries remaining.\n')
# end_time = time.time()
# logging.info(f'Total downloaded: {total_bytes_downloaded/2**20:.1f}MB = {total_bytes_downloaded/2**30:.2f}GB')
# logging.info(f'Time taken: {end_time-start_time:.0f}s')
# print(f'Wrote log to {paths.file_download_log}')
def parse_tweets(username, users, html_template, paths: PathConfig):
"""Read tweets from paths.files_input_tweets, write to *.md and *.html.
Copy the media used to paths.dir_output_media.
Collect user_id:user_handle mappings for later use, in 'users'.
Returns the mapping from media filename to best-quality URL.
"""
tweets = {}
media_sources = []
#
for tweets_js_filename in paths.files_input_tweets:
json = read_json_from_js_file(tweets_js_filename)
for tweet in json:
result = convert_tweet(tweet, username, media_sources, users, paths)
if result is not None:
tweet_text, tweet_id_str, in_reply_to_status_id = result
tweets[tweet_id_str] = (tweet_text, in_reply_to_status_id)
return tweets
# def collect_user_ids_from_followings(paths) -> list:
# """
# Collect all user ids that appear in the followings archive data.
# (For use in bulk online lookup from Twitter.)
# """
# # read JSON file from archive
# following_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'following.js'))
# # collect all user ids in a list
# following_ids = []
# for follow in following_json:
# if 'following' in follow and 'accountId' in follow['following']:
# following_ids.append(follow['following']['accountId'])
# return following_ids
# def parse_followings(users, user_id_url_template, paths: PathConfig):
# """Parse paths.dir_input_data/following.js, write to paths.file_output_following.
# """
# following = []
# following_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'following.js'))
# following_ids = []
# for follow in following_json:
# if 'following' in follow and 'accountId' in follow['following']:
# following_ids.append(follow['following']['accountId'])
# for following_id in following_ids:
# handle = users[following_id].handle if following_id in users else '~unknown~handle~'
# following.append(handle + ' ' + user_id_url_template.format(following_id))
# following.sort()
# following_output_path = paths.create_path_for_file_output_single(format="txt", kind="following")
# with open_and_mkdirs(following_output_path) as f:
# f.write('\n'.join(following))
# print(f"Wrote {len(following)} accounts to {following_output_path}")
# def collect_user_ids_from_followers(paths) -> list:
# """
# Collect all user ids that appear in the followers archive data.
# (For use in bulk online lookup from Twitter.)
# """
# # read JSON file from archive
# follower_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'follower.js'))
# # collect all user ids in a list
# follower_ids = []
# for follower in follower_json:
# if 'follower' in follower and 'accountId' in follower['follower']:
# follower_ids.append(follower['follower']['accountId'])
# return follower_ids
# def parse_followers(users, user_id_url_template, paths: PathConfig):
# """Parse paths.dir_input_data/followers.js, write to paths.file_output_followers.
# """
# followers = []
# follower_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'follower.js'))
# follower_ids = []
# for follower in follower_json:
# if 'follower' in follower and 'accountId' in follower['follower']:
# follower_ids.append(follower['follower']['accountId'])
# for follower_id in follower_ids:
# handle = users[follower_id].handle if follower_id in users else '~unknown~handle~'
# followers.append(handle + ' ' + user_id_url_template.format(follower_id))
# followers.sort()
# followers_output_path = paths.create_path_for_file_output_single(format="txt", kind="followers")
# with open_and_mkdirs(followers_output_path) as f:
# f.write('\n'.join(followers))
# print(f"Wrote {len(followers)} accounts to {followers_output_path}")
# def chunks(lst: list, n: int):
# """Yield successive n-sized chunks from lst."""
# for i in range(0, len(lst), n):
# yield lst[i:i + n]
# def collect_user_ids_from_direct_messages(paths) -> list:
# """
# Collect all user ids that appear in the direct messages archive data.
# (For use in bulk online lookup from Twitter.)
# """
# # read JSON file from archive
# dms_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'direct-messages.js'))
# # collect all user ids in a set
# dms_user_ids = set()
# for conversation in dms_json:
# if 'dmConversation' in conversation and 'conversationId' in conversation['dmConversation']:
# dm_conversation = conversation['dmConversation']
# conversation_id = dm_conversation['conversationId']
# user1_id, user2_id = conversation_id.split('-')
# dms_user_ids.add(user1_id)
# dms_user_ids.add(user2_id)
# return list(dms_user_ids)
# def parse_direct_messages(username, users, user_id_url_template, paths: PathConfig):
# """Parse paths.dir_input_data/direct-messages.js, write to one markdown file per conversation.
# """
# # read JSON file
# dms_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'direct-messages.js'))
# # Parse the DMs and store the messages in a dict
# conversations_messages = defaultdict(list)
# for conversation in dms_json:
# if 'dmConversation' in conversation and 'conversationId' in conversation['dmConversation']:
# dm_conversation = conversation['dmConversation']
# conversation_id = dm_conversation['conversationId']
# user1_id, user2_id = conversation_id.split('-')
# messages = []
# if 'messages' in dm_conversation:
# for message in dm_conversation['messages']:
# if 'messageCreate' in message:
# message_create = message['messageCreate']
# if all(tag in message_create for tag in ['senderId', 'recipientId', 'text', 'createdAt']):
# from_id = message_create['senderId']
# to_id = message_create['recipientId']
# body = message_create['text']
# # replace t.co URLs with their original versions
# if 'urls' in message_create and len(message_create['urls']) > 0:
# for url in message_create['urls']:
# if 'url' in url and 'expanded' in url:
# expanded_url = url['expanded']
# body = body.replace(url['url'], expanded_url)
# # escape message body for markdown rendering:
# body_markdown = escape_markdown(body)
# # replace image URLs with image links to local files
# if 'mediaUrls' in message_create \
# and len(message_create['mediaUrls']) == 1 \
# and 'urls' in message_create:
# original_expanded_url = message_create['urls'][0]['expanded']
# message_id = message_create['id']
# media_hash_and_type = message_create['mediaUrls'][0].split('/')[-1]
# media_id = message_create['mediaUrls'][0].split('/')[-2]
# archive_media_filename = f'{message_id}-{media_hash_and_type}'
# new_url = os.path.join(paths.dir_output_media, archive_media_filename)
# archive_media_path = \
# os.path.join(paths.dir_input_data, 'direct_messages_media', archive_media_filename)
# if os.path.isfile(archive_media_path):
# # found a matching image, use this one
# if not os.path.isfile(new_url):
# shutil.copy(archive_media_path, new_url)
# image_markdown = f'\n\n'
# body_markdown = body_markdown.replace(
# escape_markdown(original_expanded_url), image_markdown
# )
# # Save the online location of the best-quality version of this file,
# # for later upgrading if wanted
# best_quality_url = \
# f'https://ton.twitter.com/i//ton/data/dm/' \
# f'{message_id}/{media_id}/{media_hash_and_type}'
# # there is no ':orig' here, the url without any suffix has the original size
# # TODO: a cookie (and a 'Referer: https://twitter.com' header)
# # is needed to retrieve it, so the url might be useless anyway...
# # WARNING: Do not uncomment the statement below until the cookie problem is solved!
# # media_sources.append(
# # (
# # os.path.join(output_media_folder_name, archive_media_filename),
# # best_quality_url
# # )
# # )
# else:
# archive_media_paths = glob.glob(
# os.path.join(paths.dir_input_data, 'direct_messages_media', message_id + '*'))
# if len(archive_media_paths) > 0:
# for archive_media_path in archive_media_paths:
# archive_media_filename = os.path.split(archive_media_path)[-1]
# media_url = os.path.join(paths.dir_output_media, archive_media_filename)
# if not os.path.isfile(media_url):
# shutil.copy(archive_media_path, media_url)
# video_markdown = f'\n\n'
# body_markdown = body_markdown.replace(
# escape_markdown(original_expanded_url), video_markdown
# )
# # TODO: maybe also save the online location of the best-quality version for videos?
# # (see above)
# else:
# print(f'Warning: missing local file: {archive_media_path}. '
# f'Using original link instead: {original_expanded_url})')
# created_at = message_create['createdAt'] # example: 2022-01-27T15:58:52.744Z
# timestamp = \
# int(round(datetime.datetime.strptime(created_at, '%Y-%m-%dT%X.%fZ').timestamp()))
# from_handle = escape_markdown(users[from_id].handle) if from_id in users \
# else user_id_url_template.format(from_id)
# to_handle = escape_markdown(users[to_id].handle) if to_id in users \
# else user_id_url_template.format(to_id)
# # make the body a quote
# body_markdown = '> ' + '\n> '.join(body_markdown.splitlines())
# message_markdown = f'{from_handle} -> {to_handle}: ({created_at}) \n\n' \
# f'{body_markdown}'
# messages.append((timestamp, message_markdown))
# # find identifier for the conversation
# other_user_id = user2_id if (user1_id in users and users[user1_id].handle == username) else user1_id
# # collect messages per identifying user in conversations_messages dict
# conversations_messages[other_user_id].extend(messages)
# # output as one file per conversation (or part of long conversation)
# num_written_messages = 0
# num_written_files = 0
# for other_user_id, messages in conversations_messages.items():
# # sort messages by timestamp
# messages.sort(key=lambda tup: tup[0])
# other_user_name = escape_markdown(users[other_user_id].handle) if other_user_id in users \
# else user_id_url_template.format(other_user_id)
# other_user_short_name: str = users[other_user_id].handle if other_user_id in users else other_user_id
# escaped_username = escape_markdown(username)
# # if there are more than 1000 messages, the conversation was split up in the twitter archive.
# # following this standard, also split up longer conversations in the output files:
# if len(messages) > 1000:
# for chunk_index, chunk in enumerate(chunks(messages, 1000)):
# markdown = ''
# markdown += f'### Conversation between {escaped_username} and {other_user_name}, ' \
# f'part {chunk_index+1}: ###\n\n----\n\n'
# markdown += '\n\n----\n\n'.join(md for _, md in chunk)
# conversation_output_path = paths.create_path_for_file_output_dms(name=other_user_short_name, index=(chunk_index + 1), format="md")
# # write part to a markdown file
# with open_and_mkdirs(conversation_output_path) as f:
# f.write(markdown)
# print(f'Wrote {len(chunk)} messages to {conversation_output_path}')
# num_written_files += 1
# else:
# markdown = ''
# markdown += f'### Conversation between {escaped_username} and {other_user_name}: ###\n\n----\n\n'
# markdown += '\n\n----\n\n'.join(md for _, md in messages)
# conversation_output_path = paths.create_path_for_file_output_dms(name=other_user_short_name, format="md")
# with open_and_mkdirs(conversation_output_path) as f:
# f.write(markdown)
# print(f'Wrote {len(messages)} messages to {conversation_output_path}')
# num_written_files += 1
# num_written_messages += len(messages)
# print(f"\nWrote {len(conversations_messages)} direct message conversations "
# f"({num_written_messages} total messages) to {num_written_files} markdown files\n")
# def make_conversation_name_safe_for_filename(conversation_name: str) -> str:
# """
# Remove/replace characters that could be unsafe in filenames
# """
# forbidden_chars = \
# ['"', "'", '*', '/', '\\', ':', '<', '>', '?', '|', '!', '@', ';', ',', '=', '.', '\n', '\r', '\t']
# new_conversation_name = ''
# for char in conversation_name:
# if char in forbidden_chars:
# new_conversation_name = new_conversation_name + '_'
# elif char.isspace():
# # replace spaces with underscores
# new_conversation_name = new_conversation_name + '_'
# elif char == 0x7F or (0x1F >= ord(char) >= 0x00):
# # 0x00 - 0x1F and 0x7F are also forbidden, just discard them
# continue
# else:
# new_conversation_name = new_conversation_name + char
# return new_conversation_name
# def find_group_dm_conversation_participant_ids(conversation: dict) -> set:
# """
# Find IDs of all participating Users in a group direct message conversation
# """
# group_user_ids = set()
# if 'dmConversation' in conversation and 'conversationId' in conversation['dmConversation']:
# dm_conversation = conversation['dmConversation']
# if 'messages' in dm_conversation:
# for message in dm_conversation['messages']:
# if 'messageCreate' in message:
# group_user_ids.add(message['messageCreate']['senderId'])
# elif 'joinConversation' in message:
# group_user_ids.add(message['joinConversation']['initiatingUserId'])
# for participant_id in message['joinConversation']['participantsSnapshot']:
# group_user_ids.add(participant_id)
# elif "participantsJoin" in message:
# group_user_ids.add(message['participantsJoin']['initiatingUserId'])
# for participant_id in message['participantsJoin']['userIds']:
# group_user_ids.add(participant_id)
# return group_user_ids
# def collect_user_ids_from_group_direct_messages(paths) -> list:
# """
# Collect all user ids that appear in the group direct messages archive data.
# (For use in bulk online lookup from Twitter.)
# """
# # read JSON file from archive
# group_dms_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'direct-messages-group.js'))
# # collect all user ids in a set
# group_dms_user_ids = set()
# for conversation in group_dms_json:
# participants = find_group_dm_conversation_participant_ids(conversation)
# for participant_id in participants:
# group_dms_user_ids.add(participant_id)
# return list(group_dms_user_ids)
# def parse_group_direct_messages(username, users, user_id_url_template, paths):
# """Parse data_folder/direct-messages-group.js, write to one markdown file per conversation.
# """
# # read JSON file from archive
# group_dms_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'direct-messages-group.js'))
# # Parse the group DMs, store messages and metadata in a dict
# group_conversations_messages = defaultdict(list)
# group_conversations_metadata = defaultdict(dict)
# for conversation in group_dms_json:
# if 'dmConversation' in conversation and 'conversationId' in conversation['dmConversation']:
# dm_conversation = conversation['dmConversation']
# conversation_id = dm_conversation['conversationId']
# participants = find_group_dm_conversation_participant_ids(conversation)
# participant_names = []
# for participant_id in participants:
# if participant_id in users:
# participant_names.append(users[participant_id].handle)
# else:
# participant_names.append(user_id_url_template.format(participant_id))
# # save names in metadata
# group_conversations_metadata[conversation_id]['participants'] = participants
# group_conversations_metadata[conversation_id]['participant_names'] = participant_names
# group_conversations_metadata[conversation_id]['conversation_names'] = [(0, conversation_id)]
# group_conversations_metadata[conversation_id]['participant_message_count'] = defaultdict(int)
# for participant_id in participants:
# # init every participant's message count with 0, so that users with no activity are not ignored
# group_conversations_metadata[conversation_id]['participant_message_count'][participant_id] = 0
# messages = []
# if 'messages' in dm_conversation:
# for message in dm_conversation['messages']:
# if 'messageCreate' in message:
# message_create = message['messageCreate']
# if all(tag in message_create for tag in ['senderId', 'text', 'createdAt']):
# from_id = message_create['senderId']
# # count how many messages this user has sent to the group
# group_conversations_metadata[conversation_id]['participant_message_count'][from_id] += 1
# body = message_create['text']
# # replace t.co URLs with their original versions
# if 'urls' in message_create:
# for url in message_create['urls']:
# if 'url' in url and 'expanded' in url:
# expanded_url = url['expanded']
# body = body.replace(url['url'], expanded_url)
# # escape message body for markdown rendering:
# body_markdown = escape_markdown(body)
# # replace image URLs with image links to local files
# if 'mediaUrls' in message_create \
# and len(message_create['mediaUrls']) == 1 \
# and 'urls' in message_create:
# original_expanded_url = message_create['urls'][0]['expanded']
# message_id = message_create['id']
# media_hash_and_type = message_create['mediaUrls'][0].split('/')[-1]
# media_id = message_create['mediaUrls'][0].split('/')[-2]
# archive_media_filename = f'{message_id}-{media_hash_and_type}'
# new_url = os.path.join(paths.dir_output_media, archive_media_filename)
# archive_media_path = \
# os.path.join(paths.dir_input_data, 'direct_messages_group_media',
# archive_media_filename)
# if os.path.isfile(archive_media_path):
# # found a matching image, use this one
# if not os.path.isfile(new_url):
# shutil.copy(archive_media_path, new_url)
# image_markdown = f'\n\n'
# body_markdown = body_markdown.replace(
# escape_markdown(original_expanded_url), image_markdown
# )
# # Save the online location of the best-quality version of this file,
# # for later upgrading if wanted
# best_quality_url = \
# f'https://ton.twitter.com/i//ton/data/dm/' \
# f'{message_id}/{media_id}/{media_hash_and_type}'
# # there is no ':orig' here, the url without any suffix has the original size
# # TODO: a cookie (and a 'Referer: https://twitter.com' header)
# # is needed to retrieve it, so the url might be useless anyway...
# # WARNING: Do not uncomment the statement below until the cookie problem is solved!
# # media_sources.append(
# # (
# # os.path.join(output_media_folder_name, archive_media_filename),
# # best_quality_url
# # )
# # )
# else:
# archive_media_paths = glob.glob(
# os.path.join(paths.dir_input_data, 'direct_messages_group_media',
# message_id + '*'))
# if len(archive_media_paths) > 0:
# for archive_media_path in archive_media_paths:
# archive_media_filename = os.path.split(archive_media_path)[-1]
# media_url = os.path.join(paths.dir_output_media,
# archive_media_filename)
# if not os.path.isfile(media_url):
# shutil.copy(archive_media_path, media_url)
# video_markdown = f'\n\n'
# body_markdown = body_markdown.replace(
# escape_markdown(original_expanded_url), video_markdown
# )
# # TODO: maybe also save the online location of the best-quality version for videos?
# # (see above)
# else:
# print(f'Warning: missing local file: {archive_media_path}. '
# f'Using original link instead: {original_expanded_url})')
# created_at = message_create['createdAt'] # example: 2022-01-27T15:58:52.744Z
# timestamp = int(round(
# datetime.datetime.strptime(created_at, '%Y-%m-%dT%X.%fZ').timestamp()
# ))
# from_handle = escape_markdown(users[from_id].handle) if from_id in users \
# else user_id_url_template.format(from_id)
# # make the body a quote
# body_markdown = '> ' + '\n> '.join(body_markdown.splitlines())
# message_markdown = f'{from_handle}: ({created_at})\n\n' \
# f'{body_markdown}'
# messages.append((timestamp, message_markdown))
# elif "conversationNameUpdate" in message:
# conversation_name_update = message['conversationNameUpdate']
# if all(tag in conversation_name_update for tag in ['initiatingUserId', 'name', 'createdAt']):
# from_id = conversation_name_update['initiatingUserId']
# body_markdown = f"_changed group name to: {escape_markdown(conversation_name_update['name'])}_"
# created_at = conversation_name_update['createdAt'] # example: 2022-01-27T15:58:52.744Z
# timestamp = int(round(
# datetime.datetime.strptime(created_at, '%Y-%m-%dT%X.%fZ').timestamp()
# ))
# from_handle = escape_markdown(users[from_id].handle) if from_id in users \
# else user_id_url_template.format(from_id)
# message_markdown = f'{from_handle}: ({created_at})\n\n{body_markdown}'
# messages.append((timestamp, message_markdown))
# # save metadata about name change:
# group_conversations_metadata[conversation_id]['conversation_names'].append(
# (timestamp, conversation_name_update['name'])
# )
# elif "joinConversation" in message:
# join_conversation = message['joinConversation']
# if all(tag in join_conversation for tag in ['initiatingUserId', 'createdAt']):
# from_id = join_conversation['initiatingUserId']
# created_at = join_conversation['createdAt'] # example: 2022-01-27T15:58:52.744Z
# timestamp = int(round(
# datetime.datetime.strptime(created_at, '%Y-%m-%dT%X.%fZ').timestamp()
# ))
# from_handle = escape_markdown(users[from_id].handle) if from_id in users \
# else user_id_url_template.format(from_id)
# escaped_username = escape_markdown(username)
# body_markdown = f'_{from_handle} added {escaped_username} to the group_'
# message_markdown = f'{from_handle}: ({created_at})\n\n{body_markdown}'
# messages.append((timestamp, message_markdown))
# elif "participantsJoin" in message:
# participants_join = message['participantsJoin']
# if all(tag in participants_join for tag in ['initiatingUserId', 'userIds', 'createdAt']):
# from_id = participants_join['initiatingUserId']
# created_at = participants_join['createdAt'] # example: 2022-01-27T15:58:52.744Z
# timestamp = int(round(
# datetime.datetime.strptime(created_at, '%Y-%m-%dT%X.%fZ').timestamp()
# ))
# from_handle = escape_markdown(users[from_id].handle) if from_id in users \
# else user_id_url_template.format(from_id)
# joined_ids = participants_join['userIds']
# joined_handles = [escape_markdown(users[joined_id].handle) if joined_id in users
# else user_id_url_template.format(joined_id) for joined_id in joined_ids]
# name_list = ', '.join(joined_handles[:-1]) + \
# (f' and {joined_handles[-1]}' if len(joined_handles) > 1 else
# joined_handles[0])
# body_markdown = f'_{from_handle} added {name_list} to the group_'
# message_markdown = f'{from_handle}: ({created_at})\n\n{body_markdown}'
# messages.append((timestamp, message_markdown))
# elif "participantsLeave" in message:
# participants_leave = message['participantsLeave']
# if all(tag in participants_leave for tag in ['userIds', 'createdAt']):
# created_at = participants_leave['createdAt'] # example: 2022-01-27T15:58:52.744Z
# timestamp = int(round(
# datetime.datetime.strptime(created_at, '%Y-%m-%dT%X.%fZ').timestamp()
# ))
# left_ids = participants_leave['userIds']
# left_handles = [escape_markdown(users[left_id].handle) if left_id in users
# else user_id_url_template.format(left_id) for left_id in left_ids]
# name_list = ', '.join(left_handles[:-1]) + \
# (f' and {left_handles[-1]}' if len(left_handles) > 1 else
# left_handles[0])
# body_markdown = f'_{name_list} left the group_'
# message_markdown = f'{name_list}: ({created_at})\n\n{body_markdown}'
# messages.append((timestamp, message_markdown))
# # collect messages per conversation in group_conversations_messages dict
# group_conversations_messages[conversation_id].extend(messages)
# # output as one file per conversation (or part of long conversation)
# num_written_messages = 0
# num_written_files = 0
# for conversation_id, messages in group_conversations_messages.items():
# # sort messages by timestamp
# messages.sort(key=lambda tup: tup[0])
# # create conversation name for use in filename:
# # first, try to find an official name in the parsed conversation data
# # Not-so-fun fact:
# # If the name was set before the archive's owner joined the group, the name is not included
# # in the archive data and can't be found anywhere (except by looking it up from twitter,
# # and that would probably need a cookie). So there are many groups that do actually have a name,
# # but it can't be used here because we don't know it.
# group_conversations_metadata[conversation_id]['conversation_names'].sort(key=lambda tup: tup[0], reverse=True)
# official_name = group_conversations_metadata[conversation_id]['conversation_names'][0][1]
# safe_group_name = make_conversation_name_safe_for_filename(official_name)
# if len(safe_group_name) < 2:
# # discard name if it's too short (because of collision risk)
# group_name = conversation_id
# else:
# group_name = safe_group_name
# if group_name == conversation_id:
# # try to make a nice list of participant handles for the conversation name
# handles = []
# for participant_id, message_count in \
# group_conversations_metadata[conversation_id]['participant_message_count'].items():
# if participant_id in users:
# participant_handle = users[participant_id].handle
# if participant_handle != username:
# handles.append((participant_handle, message_count))
# # sort alphabetically by handle first, for a more deterministic order
# handles.sort(key=lambda tup: tup[0])
# # sort so that the most active users are at the start of the list
# handles.sort(key=lambda tup: tup[1], reverse=True)
# if len(handles) == 1:
# group_name = \
# f'{handles[0][0]}_and_{len(group_conversations_metadata[conversation_id]["participants"]) - 1}_more'
# elif len(handles) == 2 and len(group_conversations_metadata[conversation_id]["participants"]) == 3:
# group_name = f'{handles[0][0]}_and_{handles[1][0]}_and_{username}'
# elif len(handles) >= 2:
# group_name = \
# f'{handles[0][0]}_and_{handles[1][0]}_and' \
# f'_{len(group_conversations_metadata[conversation_id]["participants"]) - 2}_more'
# else:
# # just use the conversation id
# group_name = conversation_id
# # create a list of names of the form '@name1, @name2 and @name3'
# # to use as a headline in the output file
# escaped_participant_names = [
# escape_markdown(participant_name)
# for participant_name in group_conversations_metadata[conversation_id]['participant_names']
# ]
# name_list = ', '.join(escaped_participant_names[:-1]) + \
# (f' and {escaped_participant_names[-1]}'
# if len(escaped_participant_names) > 1
# else escaped_participant_names[0])
# if len(messages) > 1000:
# for chunk_index, chunk in enumerate(chunks(messages, 1000)):
# markdown = ''
# markdown += f'## {official_name} ##\n\n'
# markdown += f'### Group conversation between {name_list}, part {chunk_index + 1}: ###\n\n----\n\n'
# markdown += '\n\n----\n\n'.join(md for _, md in chunk)
# conversation_output_filename = paths.create_path_for_file_output_dms(
# name=group_name, format="md", kind="DMs-Group", index=chunk_index + 1
# )
# # write part to a markdown file
# with open_and_mkdirs(conversation_output_filename) as f:
# f.write(markdown)
# print(f'Wrote {len(chunk)} messages to {conversation_output_filename}')
# num_written_files += 1
# else:
# markdown = ''
# markdown += f'## {official_name} ##\n\n'
# markdown += f'### Group conversation between {name_list}: ###\n\n----\n\n'
# markdown += '\n\n----\n\n'.join(md for _, md in messages)
# conversation_output_filename = \
# paths.create_path_for_file_output_dms(name=group_name, format="md", kind="DMs-Group")
# with open_and_mkdirs(conversation_output_filename) as f:
# f.write(markdown)
# print(f'Wrote {len(messages)} messages to {conversation_output_filename}')
# num_written_files += 1
# num_written_messages += len(messages)
# print(f"\nWrote {len(group_conversations_messages)} direct message group conversations "
# f"({num_written_messages} total messages) to {num_written_files} markdown files")
def is_archive(path):
"""Return true if there is a Twitter archive at the given path"""
return os.path.isfile(os.path.join(path, 'data', 'account.js'))
def find_archive():
"""
Search for the archive
1. First try the working directory.
2. Then try the script directory.
3. Finally prompt the user.
"""
if is_archive('.'):
return '.'
script_dir = os.path.dirname(__file__)
if script_dir != os.getcwd():
if is_archive(script_dir):
return script_dir
print('Archive not found in working directory or script directory.\n'
'Please enter the path of your Twitter archive, or just press Enter to exit.\n'
'On most operating systems, you can also try to drag and drop your archive folder '
'into the terminal window, and it will paste its path automatically.\n')
# Give the user as many attempts as they need.
while True:
input_path = input('Archive path: ')
if not input_path:
exit()
if is_archive(input_path):
return input_path
print(f'Archive not found at {input_path}')
def main():
archive_path = find_archive()
paths = PathConfig(dir_archive=archive_path)
# Extract the archive owner's username from data/account.js
username = extract_username(paths)
user_id_url_template = 'https://twitter.com/i/user/{}'
html_template = """\
Your Twitter archive!
Your twitter archive
{}
"""
users = {}
# Make a folder to copy the images and videos into.
os.makedirs(paths.dir_output_media, exist_ok=True)
if not os.path.isfile(paths.file_tweet_icon):
shutil.copy('assets/images/favicon.ico', paths.file_tweet_icon)
tweets = parse_tweets(username, users, html_template, paths)
while True:
tweet_id = input("마지막 트윗 주소를 입력. 빈칸 엔터시 종료:").split('/')[-1]
if tweet_id == "":
break
output = ""
filename = tweet_id + ".txt"
print("start extract :" + filename)
while tweet_id is not None:
if(tweet_id not in tweets):
print("!!!!!삭제된 트윗이 존재합니다. 삭제된 트윗 바로 위부터 다시 백업해주세요.!!!!!")
break
print(tweets[tweet_id][0])
output = tweets[tweet_id][0] + "\n" + output
tweet_id = tweets[tweet_id][1]
print("start saving :" + filename)
with open(filename, encoding = 'utf-8', mode="w+") as f:
f.write(output)
if __name__ == "__main__":
main()