#!/usr/bin/env python3 """ Gather recovery information for Macs. Copyright (c) 2019, vit9696 """ import argparse import binascii import hashlib import json import linecache import os import random import struct import sys try: from urllib.request import Request, HTTPError, urlopen from urllib.parse import urlparse except ImportError: from urllib2 import Request, HTTPError, urlopen from urlparse import urlparse SELF_DIR = os.path.dirname(os.path.realpath(__file__)) RECENT_MAC = 'Mac-7BA5B2D9E42DDD94' MLB_ZERO = '00000000000000000' MLB_VALID = 'C02749200YGJ803AX' MLB_PRODUCT = '00000000000J80300' TYPE_SID = 16 TYPE_K = 64 TYPE_FG = 64 INFO_PRODUCT = 'AP' INFO_IMAGE_LINK = 'AU' INFO_IMAGE_HASH = 'AH' INFO_IMAGE_SESS = 'AT' INFO_SIGN_LINK = 'CU' INFO_SIGN_HASH = 'CH' INFO_SIGN_SESS = 'CT' INFO_REQURED = [INFO_PRODUCT, INFO_IMAGE_LINK, INFO_IMAGE_HASH, INFO_IMAGE_SESS, INFO_SIGN_LINK, INFO_SIGN_HASH, INFO_SIGN_SESS] def run_query(url, headers, post=None, raw=False): if post is not None: data = '\n'.join([entry + '=' + post[entry] for entry in post]) if sys.version_info[0] >= 3: data = data.encode('utf-8') else: data = None req = Request(url=url, headers=headers, data=data) try: response = urlopen(req) if raw: return response return dict(response.info()), response.read() except HTTPError as e: print(f'ERROR: "{e}" when connecting to {url}') sys.exit(1) def generate_id(id_type, id_value=None): valid_chars = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'] return ''.join(random.choice(valid_chars) for i in range(id_type)) if not id_value else id_value def product_mlb(mlb): return '00000000000' + mlb[11] + mlb[12] + mlb[13] + mlb[14] + '00' def mlb_from_eeee(eeee): if len(eeee) != 4: print('ERROR: Invalid EEEE code length!') sys.exit(1) return f'00000000000{eeee}00' def int_from_unsigned_bytes(byte_list, byteorder): if byteorder == 'little': byte_list = byte_list[::-1] encoded = binascii.hexlify(byte_list) return int(encoded, 16) # zhangyoufu https://gist.github.com/MCJack123/943eaca762730ca4b7ae460b731b68e7#gistcomment-3061078 2021-10-08 Apple_EFI_ROM_public_key_1 = 0xC3E748CAD9CD384329E10E25A91E43E1A762FF529ADE578C935BDDF9B13F2179D4855E6FC89E9E29CA12517D17DFA1EDCE0BEBF0EA7B461FFE61D94E2BDF72C196F89ACD3536B644064014DAE25A15DB6BB0852ECBD120916318D1CCDEA3C84C92ED743FC176D0BACA920D3FCF3158AFF731F88CE0623182A8ED67E650515F75745909F07D415F55FC15A35654D118C55A462D37A3ACDA08612F3F3F6571761EFCCBCC299AEE99B3A4FD6212CCFFF5EF37A2C334E871191F7E1C31960E010A54E86FA3F62E6D6905E1CD57732410A3EB0C6B4DEFDABE9F59BF1618758C751CD56CEF851D1C0EAA1C558E37AC108DA9089863D20E2E7E4BF475EC66FE6B3EFDCF ChunkListHeader = struct.Struct('<4sIBBBxQQQ') assert ChunkListHeader.size == 0x24 Chunk = struct.Struct(' 0 assert chunk_offset == 0x24 assert signature_offset == chunk_offset + Chunk.size * chunk_count for _ in range(chunk_count): data = f.read(Chunk.size) hash_ctx.update(data) chunk_size, chunk_sha256 = Chunk.unpack(data) yield chunk_size, chunk_sha256 digest = hash_ctx.digest() if signature_method == 1: data = f.read(256) assert len(data) == 256 signature = int_from_unsigned_bytes(data, 'little') plaintext = 0x1ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff003031300d0609608648016503040201050004200000000000000000000000000000000000000000000000000000000000000000 | int_from_unsigned_bytes(digest, 'big') assert pow(signature, 0x10001, Apple_EFI_ROM_public_key_1) == plaintext elif signature_method == 2: data = f.read(32) assert data == digest raise RuntimeError('Chunklist missing digital signature') else: raise NotImplementedError assert f.read(1) == b'' def get_session(args): headers = { 'Host': 'osrecovery.apple.com', 'Connection': 'close', 'User-Agent': 'InternetRecovery/1.0', } headers, _ = run_query('http://osrecovery.apple.com/', headers) if args.verbose: print('Session headers:') for header in headers: print(f'{header}: {headers[header]}') for header in headers: if header.lower() == 'set-cookie': cookies = headers[header].split('; ') for cookie in cookies: return cookie if cookie.startswith('session=') else ... raise RuntimeError('No session in headers ' + str(headers)) def get_image_info(session, bid, mlb=MLB_ZERO, diag=False, os_type='default', cid=None): headers = { 'Host': 'osrecovery.apple.com', 'Connection': 'close', 'User-Agent': 'InternetRecovery/1.0', 'Cookie': session, 'Content-Type': 'text/plain', } post = { 'cid': generate_id(TYPE_SID, cid), 'sn': mlb, 'bid': bid, 'k': generate_id(TYPE_K), 'fg': generate_id(TYPE_FG) } if diag: url = 'http://osrecovery.apple.com/InstallationPayload/Diagnostics' else: url = 'http://osrecovery.apple.com/InstallationPayload/RecoveryImage' post['os'] = os_type headers, output = run_query(url, headers, post) output = output.decode('utf-8') info = {} for line in output.split('\n'): try: key, value = line.split(': ') info[key] = value except Exception: continue for k in INFO_REQURED: if k not in info: raise RuntimeError(f'Missing key {k}') return info def save_image(url, sess, filename='', directory=''): purl = urlparse(url) headers = { 'Host': purl.hostname, 'Connection': 'close', 'User-Agent': 'InternetRecovery/1.0', 'Cookie': '='.join(['AssetToken', sess]) } if not os.path.exists(directory): os.mkdir(directory) if filename == '': filename = os.path.basename(purl.path) if filename.find('/') >= 0 or filename == '': raise RuntimeError('Invalid save path ' + filename) print(f'Saving {url} to {directory}/{filename}...') with open(os.path.join(directory, filename), 'wb') as fh: response = run_query(url, headers, raw=True) size = 0 while True: chunk = response.read(2**20) if not chunk: break fh.write(chunk) size += len(chunk) print(f'\r{size / (2**20)} MBs downloaded...', end='') sys.stdout.flush() print('\rDownload complete!\t\t\t\t\t') return os.path.join(directory, os.path.basename(filename)) def verify_image(dmgpath, cnkpath): print('Verifying image with chunklist...') with open(dmgpath, 'rb') as dmgf: cnkcount = 0 for cnksize, cnkhash in verify_chunklist(cnkpath): cnkcount += 1 print(f'\rChunk {cnkcount} ({cnksize} bytes)', end='') sys.stdout.flush() cnk = dmgf.read(cnksize) if len(cnk) != cnksize: raise RuntimeError(f'Invalid chunk {cnkcount} size: expected {cnksize}, read {len(cnk)}') if hashlib.sha256(cnk).digest() != cnkhash: raise RuntimeError(f'Invalid chunk {cnkcount}: hash mismatch') if dmgf.read(1) != b'': raise RuntimeError('Invalid image: larger than chunklist') print('\rImage verification complete!\t\t\t\t\t') def action_download(args): """ Reference information for queries: Recovery latest: cid=3076CE439155BA14 sn=... bid=Mac-E43C1C25D4880AD6 k=4BE523BB136EB12B1758C70DB43BDD485EBCB6A457854245F9E9FF0587FB790C os=latest fg=B2E6AA07DB9088BE5BDB38DB2EA824FDDFB6C3AC5272203B32D89F9D8E3528DC Recovery default: cid=4A35CB95FF396EE7 sn=... bid=Mac-E43C1C25D4880AD6 k=0A385E6FFC3DDD990A8A1F4EC8B98C92CA5E19C9FF1DD26508C54936D8523121 os=default fg=B2E6AA07DB9088BE5BDB38DB2EA824FDDFB6C3AC5272203B32D89F9D8E3528DC Diagnostics: cid=050C59B51497CEC8 sn=... bid=Mac-E43C1C25D4880AD6 k=37D42A8282FE04A12A7D946304F403E56A2155B9622B385F3EB959A2FBAB8C93 fg=B2E6AA07DB9088BE5BDB38DB2EA824FDDFB6C3AC5272203B32D89F9D8E3528DC """ session = get_session(args) info = get_image_info(session, bid=args.board_id, mlb=args.mlb, diag=args.diagnostics, os_type=args.os_type) if args.verbose: print(info) print(f'Downloading {info[INFO_PRODUCT]}...') cnkname = '' if args.basename == '' else args.basename + '.chunklist' cnkpath = save_image(info[INFO_SIGN_LINK], info[INFO_SIGN_SESS], cnkname, args.outdir) dmgname = '' if args.basename == '' else args.basename + '.dmg' dmgpath = save_image(info[INFO_IMAGE_LINK], info[INFO_IMAGE_SESS], dmgname, args.outdir) try: verify_image(dmgpath, cnkpath) return 0 except Exception as err: if isinstance(err, AssertionError) and str(err) == '': try: tb = sys.exc_info()[2] while tb.tb_next: tb = tb.tb_next err = linecache.getline(tb.tb_frame.f_code.co_filename, tb.tb_lineno, tb.tb_frame.f_globals).strip() except Exception: err = "Invalid chunklist" print(f'\rImage verification failed. ({err})') return 1 def action_selfcheck(args): """ Sanity check server logic for recovery: if not valid(bid): return error() ppp = get_ppp(sn) if not valid(ppp): return latest_recovery(bid = bid) # Returns newest for bid. if valid(sn): if os == 'default': return default_recovery(sn = sn, ppp = ppp) # Returns oldest for sn. else: return latest_recovery(sn = sn, ppp = ppp) # Returns newest for sn. return default_recovery(ppp = ppp) # Returns oldest. """ session = get_session(args) valid_default = get_image_info(session, bid=RECENT_MAC, mlb=MLB_VALID, diag=False, os_type='default') valid_latest = get_image_info(session, bid=RECENT_MAC, mlb=MLB_VALID, diag=False, os_type='latest') product_default = get_image_info(session, bid=RECENT_MAC, mlb=MLB_PRODUCT, diag=False, os_type='default') product_latest = get_image_info(session, bid=RECENT_MAC, mlb=MLB_PRODUCT, diag=False, os_type='latest') generic_default = get_image_info(session, bid=RECENT_MAC, mlb=MLB_ZERO, diag=False, os_type='default') generic_latest = get_image_info(session, bid=RECENT_MAC, mlb=MLB_ZERO, diag=False, os_type='latest') if args.verbose: print(valid_default) print(valid_latest) print(product_default) print(product_latest) print(generic_default) print(generic_latest) if valid_default[INFO_PRODUCT] == valid_latest[INFO_PRODUCT]: # Valid MLB must give different default and latest if this is not a too new product. print(f'ERROR: Cannot determine any previous product, got {valid_default[INFO_PRODUCT]}') return 1 if product_default[INFO_PRODUCT] != product_latest[INFO_PRODUCT]: # Product-only MLB must give the same value for default and latest. print(f'ERROR: Latest and default do not match for product MLB, got {product_default[INFO_PRODUCT]} and {product_latest[INFO_PRODUCT]}') return 1 if generic_default[INFO_PRODUCT] != generic_latest[INFO_PRODUCT]: # Zero MLB always give the same value for default and latest. print(f'ERROR: Generic MLB gives different product, got {generic_default[INFO_PRODUCT]} and {generic_latest[INFO_PRODUCT]}') return 1 if valid_latest[INFO_PRODUCT] != generic_latest[INFO_PRODUCT]: # Valid MLB must always equal generic MLB. print(f'ERROR: Cannot determine unified latest product, got {valid_latest[INFO_PRODUCT]} and {generic_latest[INFO_PRODUCT]}') return 1 if product_default[INFO_PRODUCT] != valid_default[INFO_PRODUCT]: # Product-only MLB can give the same value with valid default MLB. # This is not an error for all models, but for our chosen code it is. print(f'ERROR: Valid and product MLB give mismatch, got {product_default[INFO_PRODUCT]} and {valid_default[INFO_PRODUCT]}') return 1 print('SUCCESS: Found no discrepancies with MLB validation algorithm!') return 0 def action_verify(args): """ Try to verify MLB serial number. """ session = get_session(args) generic_latest = get_image_info(session, bid=RECENT_MAC, mlb=MLB_ZERO, diag=False, os_type='latest') uvalid_default = get_image_info(session, bid=args.board_id, mlb=args.mlb, diag=False, os_type='default') uvalid_latest = get_image_info(session, bid=args.board_id, mlb=args.mlb, diag=False, os_type='latest') uproduct_default = get_image_info(session, bid=args.board_id, mlb=product_mlb(args.mlb), diag=False, os_type='default') if args.verbose: print(generic_latest) print(uvalid_default) print(uvalid_latest) print(uproduct_default) # Verify our MLB number. if uvalid_default[INFO_PRODUCT] != uvalid_latest[INFO_PRODUCT]: print(f'SUCCESS: {args.mlb} MLB looks valid and supported!' if uvalid_latest[INFO_PRODUCT] == generic_latest[INFO_PRODUCT] else f'SUCCESS: {args.mlb} MLB looks valid, but probably unsupported!') return 0 print('UNKNOWN: Run selfcheck, check your board-id, or try again later!') # Here we have matching default and latest products. This can only be true for very # new models. These models get either latest or special builds. if uvalid_default[INFO_PRODUCT] == generic_latest[INFO_PRODUCT]: print(f'UNKNOWN: {args.mlb} MLB can be valid if very new!') return 0 if uproduct_default[INFO_PRODUCT] != uvalid_default[INFO_PRODUCT]: print(f'UNKNOWN: {args.mlb} MLB looks invalid, other models use product {uproduct_default[INFO_PRODUCT]} instead of {uvalid_default[INFO_PRODUCT]}!') return 0 print(f'UNKNOWN: {args.mlb} MLB can be valid if very new and using special builds!') return 0 def action_guess(args): """ Attempt to guess which model does this MLB belong. """ mlb = args.mlb anon = mlb.startswith('000') with open(args.board_db, 'r', encoding='utf-8') as fh: db = json.load(fh) supported = {} session = get_session(args) generic_latest = get_image_info(session, bid=RECENT_MAC, mlb=MLB_ZERO, diag=False, os_type='latest') for model in db: try: if anon: # For anonymous lookup check when given model does not match latest. model_latest = get_image_info(session, bid=model, mlb=MLB_ZERO, diag=False, os_type='latest') if model_latest[INFO_PRODUCT] != generic_latest[INFO_PRODUCT]: if db[model] == 'current': print(f'WARN: Skipped {model} due to using latest product {model_latest[INFO_PRODUCT]} instead of {generic_latest[INFO_PRODUCT]}') continue user_default = get_image_info(session, bid=model, mlb=mlb, diag=False, os_type='default') if user_default[INFO_PRODUCT] != generic_latest[INFO_PRODUCT]: supported[model] = [db[model], user_default[INFO_PRODUCT], generic_latest[INFO_PRODUCT]] else: # For normal lookup check when given model has mismatching normal and latest. user_latest = get_image_info(session, bid=model, mlb=mlb, diag=False, os_type='latest') user_default = get_image_info(session, bid=model, mlb=mlb, diag=False, os_type='default') if user_latest[INFO_PRODUCT] != user_default[INFO_PRODUCT]: supported[model] = [db[model], user_default[INFO_PRODUCT], user_latest[INFO_PRODUCT]] except Exception as e: print(f'WARN: Failed to check {model}, exception: {e}') if len(supported) > 0: print(f'SUCCESS: MLB {mlb} looks supported for:') for model in supported.items(): print(f'- {model}, up to {supported[model][0]}, default: {supported[model][1]}, latest: {supported[model][2]}') return 0 print(f'UNKNOWN: Failed to determine supported models for MLB {mlb}!') return None def main(): parser = argparse.ArgumentParser(description='Gather recovery information for Macs') parser.add_argument('action', choices=['download', 'selfcheck', 'verify', 'guess'], help='Action to perform: "download" - performs recovery downloading,' ' "selfcheck" checks whether MLB serial validation is possible, "verify" performs' ' MLB serial verification, "guess" tries to find suitable mac model for MLB.') parser.add_argument('-o', '--outdir', type=str, default='com.apple.recovery.boot', help='customise output directory for downloading, defaults to com.apple.recovery.boot') parser.add_argument('-n', '--basename', type=str, default='', help='customise base name for downloading, defaults to remote name') parser.add_argument('-b', '--board-id', type=str, default=RECENT_MAC, help=f'use specified board identifier for downloading, defaults to {RECENT_MAC}') parser.add_argument('-m', '--mlb', type=str, default=MLB_ZERO, help=f'use specified logic board serial for downloading, defaults to {MLB_ZERO}') parser.add_argument('-e', '--code', type=str, default='', help='generate product logic board serial with specified product EEEE code') parser.add_argument('-os', '--os-type', type=str, default='default', choices=['default', 'latest'], help=f'use specified os type, defaults to default {MLB_ZERO}') parser.add_argument('-diag', '--diagnostics', action='store_true', help='download diagnostics image') parser.add_argument('-v', '--verbose', action='store_true', help='print debug information') parser.add_argument('-db', '--board-db', type=str, default=os.path.join(SELF_DIR, 'boards.json'), help='use custom board list for checking, defaults to boards.json') args = parser.parse_args() if args.code != '': args.mlb = mlb_from_eeee(args.code) if len(args.mlb) != 17: print('ERROR: Cannot use MLBs in non 17 character format!') sys.exit(1) if args.action == 'download': return action_download(args) if args.action == 'selfcheck': return action_selfcheck(args) if args.action == 'verify': return action_verify(args) if args.action == 'guess': return action_guess(args) assert False if __name__ == '__main__': sys.exit(main())