# OpenID Connect Flows With An Example Keycloak Setup

This is a Python3 notebook that illustrates different OpenID Connect flows, using a local Keycloak instance as OpenID provider and some basic libraries to handle the HTTP interactions.

You can skip the set up part and go straight to the flows:

- [Client Credentials Flow](#Client-Credentials-Flow)
- [Resource Owner Password Flow](#Resource-Owner-Password-Flow)
- [Authorization Code Flow](#Authorization-Code-Flow)



In [1]:
import base64
import html
import json
import logging
import re
import urllib.parse
import uuid

import requests

In [2]:
logging.basicConfig(level=logging.INFO)

## Setup of Keycloak instance

We need a test/development Keycloak instance.
For example, spin up a local Keycloak instance with Docker as follows:

    docker run --rm -it -p 9090:8080  \
        -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin  \
        jboss/keycloak:7.0.0

In [3]:
keycloak_base_url = "http://localhost:9090/auth"
admin_username = "admin"
admin_password = "admin"

### Keycloak Admin API

To be able to create clients and users through the Keycloak admin API, we first have to obtain an admin access token through OpenID, which we have to use a bearer token for other admin API requests. Let's wrap this stuff in a class.

In [4]:
class KeycloakAdmin:
    def __init__(self, base_url: str, username: str, password: str):
        r = requests.post(
            base_url + '/realms/master/protocol/openid-connect/token', 
            data={
                "username": username,
                "password": password,
                "grant_type": "password",
                "client_id": "admin-cli"
            })
        r.raise_for_status()
        
        self.session = requests.Session()
        self.session.headers["Authorization"]= "Bearer " + r.json()["access_token"]
        self.admin_base_url = base_url + '/admin/realms/master'
        self.log = logging.getLogger("keycloak-admin")
    
    def create_client(self, options: dict = None, prefix: str = "myclient-") -> str:
        client_id = prefix + uuid.uuid4().hex[:8]
        data = {"id": client_id}
        data.update(options)
        self.log.info("Creating client with settings {s!r}".format(s=data))
        r = self.session.post(self.admin_base_url + '/clients', json=data)
        r.raise_for_status()
        return client_id

    def get_client_secret(self, client_id) -> str:
        r = self.session.get(self.admin_base_url + '/clients/{c}/client-secret'.format(c=client_id))
        r.raise_for_status()
        self.log.info("Client secret response: {r!r}".format(r=r.text))
        client_secret = r.json()["value"]
        return client_secret
    
    def create_user(self, prefix: str = "John-", password: str = "j0hn"):
        username = prefix + uuid.uuid4().hex[:8]

        r = self.session.post(
            self.admin_base_url + '/users', 
            json={
                "username": username,
                "credentials": [
                    {"type": "password", "value": password, "temporary": False},
                ],
                "enabled": True,
            }
        )
        r.raise_for_status()
        return username, password


# And while we're at it,
def jwt_decode(token: str):
    """Poor man's JWT decoding"""

    def _decode(data: str) -> dict:
        decoded = base64.b64decode(data + '=' * (4 - len(data) % 4)).decode('ascii')
        return json.loads(decoded)

    header, payload, signature = token.split('.')
    return _decode(header), _decode(payload)

In [5]:
keycloak_admin = KeycloakAdmin(keycloak_base_url, admin_username, admin_password)

## General Set Up

To better see what is going on the HTTP level when doing OpenID Connect request, we'll add a `requests` hook that prints a bit of request and response info.

In [6]:
from IPython.display import HTML, display

def _show_request_info(r, *args, **kwargs):
    req = r.request
    default_headers = requests.utils.default_headers()
    headers = {k:v for k,v in req.headers.items() if k not in default_headers}
    display(HTML('''<div style="padding: 1ex; border: 1px solid #ddf; background-color: #eef;">
        Did request: <code>{m} {u}</code> with <ul>
        <li>body <code>{b!r}</code></li>
        <li>headers <code>{h!r}</code></li>
        <li>&rArr; response {r}</li>
        </ul></div>'''.format(
        m=req.method, u=req.url, 
        b=req.body, h=headers,
        r=r.status_code
    )))

session = requests.Session()
session.hooks['response'].append(_show_request_info)

And while we're at it, define some additional small utilities.

In [7]:
def jwt_decode(token: str):
    """Poor man's JWT decoding"""

    def _decode(data: str) -> dict:
        decoded = base64.b64decode(data + '=' * (4 - len(data) % 4)).decode('ascii')
        return json.loads(decoded)

    header, payload, signature = token.split('.')
    return _decode(header), _decode(payload)

## OpenID provider info

Get OpenID provider info from the configuration document.

In [8]:
provider_info = session.get(
    keycloak_base_url + '/realms/master/.well-known/openid-configuration'
).json()
provider_info.keys()

dict_keys(['issuer', 'authorization_endpoint', 'token_endpoint', 'token_introspection_endpoint', 'userinfo_endpoint', 'end_session_endpoint', 'jwks_uri', 'check_session_iframe', 'grant_types_supported', 'response_types_supported', 'subject_types_supported', 'id_token_signing_alg_values_supported', 'id_token_encryption_alg_values_supported', 'id_token_encryption_enc_values_supported', 'userinfo_signing_alg_values_supported', 'request_object_signing_alg_values_supported', 'response_modes_supported', 'registration_endpoint', 'token_endpoint_auth_methods_supported', 'token_endpoint_auth_signing_alg_values_supported', 'claims_supported', 'claim_types_supported', 'claims_parameter_supported', 'scopes_supported', 'request_parameter_supported', 'request_uri_parameter_supported', 'code_challenge_methods_supported', 'tls_client_certificate_bound_access_tokens', 'introspection_endpoint'])

Get the token endpoint URL.

In [9]:
token_endpoint = provider_info["token_endpoint"]
token_endpoint

'http://localhost:9090/auth/realms/master/protocol/openid-connect/token'

# Client Credentials Flow

Create a client in Keycloak with settings that allow enable Client Credentials Grant. We'll also need the client's secret.

In [10]:
cc_client = keycloak_admin.create_client(options={
    "serviceAccountsEnabled": True,
})
cc_client_secret = keycloak_admin.get_client_secret(cc_client)

cc_client, cc_client_secret

INFO:keycloak-admin:Creating client with settings {'id': 'myclient-5b8dfca4', 'serviceAccountsEnabled': True}
INFO:keycloak-admin:Client secret response: '{"type":"secret","value":"7f39376a-0d3e-4a87-b2ef-a9718bee4a76"}'


('myclient-5b8dfca4', '7f39376a-0d3e-4a87-b2ef-a9718bee4a76')

Do `client_credentials` token request.

In [11]:
r = session.post(
    token_endpoint,
    data={
        "grant_type": "client_credentials",
        "client_id": cc_client,
        "client_secret": cc_client_secret,
    }
)
r.raise_for_status()
r.json()

{'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJXVEdTMXJyN1pTY2RWU05DV1d5M0tiVUJXNFVaUDc2ZFZ1V1l4RTdYSnYwIn0.eyJqdGkiOiI0ODdiOTYzYy0yYzI0LTQwMmYtODE3OS02YjE1OTAxYjkxODEiLCJleHAiOjE1NzE3MzYwNTQsIm5iZiI6MCwiaWF0IjoxNTcxNzM1OTk0LCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjkwOTAvYXV0aC9yZWFsbXMvbWFzdGVyIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6IjZiMTRkYTE2LWM5MjgtNDg4Ni05MmE2LTQyYmE3MjU1YzU0NSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im15Y2xpZW50LTViOGRmY2E0IiwiYXV0aF90aW1lIjowLCJzZXNzaW9uX3N0YXRlIjoiMDNlODcxNmItZWRkZS00YWY4LTk5ZWEtNGI4ZDU5ODQ5OThhIiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJlbWFpbCBwcm9maWxlIiwiY2xpZW50SG9zdCI6IjE3Mi4xNy4wLjEiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsImNsaWVudElkIjoibXljbGllbnQtNWI4ZGZjYTQiLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtbXljbGllbnQtNWI4ZGZjYTQiLCJjbGllbnRBZGRyZX

## JWT inspection

Extract access token and inspect it (assuming it is a JWT token).

In [12]:
access_token = r.json()["access_token"]
jwt_decode(access_token)

({'alg': 'RS256',
  'typ': 'JWT',
  'kid': 'WTGS1rr7ZScdVSNCWWy3KbUBW4UZP76dVuWYxE7XJv0'},
 {'jti': '487b963c-2c24-402f-8179-6b15901b9181',
  'exp': 1571736054,
  'nbf': 0,
  'iat': 1571735994,
  'iss': 'http://localhost:9090/auth/realms/master',
  'aud': 'account',
  'sub': '6b14da16-c928-4886-92a6-42ba7255c545',
  'typ': 'Bearer',
  'azp': 'myclient-5b8dfca4',
  'auth_time': 0,
  'session_state': '03e8716b-edde-4af8-99ea-4b8d5984998a',
  'acr': '1',
  'realm_access': {'roles': ['offline_access', 'uma_authorization']},
  'resource_access': {'account': {'roles': ['manage-account',
     'manage-account-links',
     'view-profile']}},
  'scope': 'email profile',
  'clientHost': '172.17.0.1',
  'email_verified': False,
  'clientId': 'myclient-5b8dfca4',
  'preferred_username': 'service-account-myclient-5b8dfca4',
  'clientAddress': '172.17.0.1',
  'email': 'service-account-myclient-5b8dfca4@placeholder.org'})

## Query `userinfo`

Check the access token against the `userinfo` endpoint

In [13]:
r = session.get(
    provider_info["userinfo_endpoint"], 
    headers={"Authorization": "Bearer %s" % access_token}
)
r.raise_for_status()
r.json()

{'sub': '6b14da16-c928-4886-92a6-42ba7255c545',
 'email_verified': False,
 'preferred_username': 'service-account-myclient-5b8dfca4',
 'email': 'service-account-myclient-5b8dfca4@placeholder.org'}

# Resource Owner Password Flow

Create a client that allows resource owner password flow.

In [14]:
pwd_client = keycloak_admin.create_client({
    "publicClient": True,
    "directAccessGrantsEnabled": True,
})
pwd_client

INFO:keycloak-admin:Creating client with settings {'id': 'myclient-9fcf2d4e', 'publicClient': True, 'directAccessGrantsEnabled': True}


'myclient-9fcf2d4e'

In [15]:
user, password = keycloak_admin.create_user()

In [16]:
r = session.post(
    token_endpoint,
    data={
        "grant_type": "password",
        "username": user,
        "password": password,
        "client_id": pwd_client,
    }
)
r.raise_for_status()
r.json()

{'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJXVEdTMXJyN1pTY2RWU05DV1d5M0tiVUJXNFVaUDc2ZFZ1V1l4RTdYSnYwIn0.eyJqdGkiOiJkZjkxMjZmZi1lN2YyLTQxYzUtODdiMi1mMmZkMWQwMDM5OGQiLCJleHAiOjE1NzE3MzYwNTQsIm5iZiI6MCwiaWF0IjoxNTcxNzM1OTk0LCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjkwOTAvYXV0aC9yZWFsbXMvbWFzdGVyIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImFhNGYxZjExLTk5N2EtNDY4Yi05ZDE1LTA1NmMyMjBjMTQ1NSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im15Y2xpZW50LTlmY2YyZDRlIiwiYXV0aF90aW1lIjowLCJzZXNzaW9uX3N0YXRlIjoiNGM5OWQ5ODAtZDVlOC00MzU3LTkxMjQtNjI4NWU3ZGVkNzM3IiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJlbWFpbCBwcm9maWxlIiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huLWQyYzFkNDBiIn0.SzQsIuL1O-IIdJIILGq4uAuh8ZXwTtdSiu7TdFZ3uwnM96FGwbGFxP9InlWd-QJijkd3P_01Dh0OP00HtSWkvBseuRKEMnPcGqhaTl89sMzKGxGgvAfVXJ

## JWT inspection

Extract access token and inspect it (assuming it is a JWT token).

In [17]:
access_token = r.json()["access_token"]
jwt_decode(access_token)

({'alg': 'RS256',
  'typ': 'JWT',
  'kid': 'WTGS1rr7ZScdVSNCWWy3KbUBW4UZP76dVuWYxE7XJv0'},
 {'jti': 'df9126ff-e7f2-41c5-87b2-f2fd1d00398d',
  'exp': 1571736054,
  'nbf': 0,
  'iat': 1571735994,
  'iss': 'http://localhost:9090/auth/realms/master',
  'aud': 'account',
  'sub': 'aa4f1f11-997a-468b-9d15-056c220c1455',
  'typ': 'Bearer',
  'azp': 'myclient-9fcf2d4e',
  'auth_time': 0,
  'session_state': '4c99d980-d5e8-4357-9124-6285e7ded737',
  'acr': '1',
  'realm_access': {'roles': ['offline_access', 'uma_authorization']},
  'resource_access': {'account': {'roles': ['manage-account',
     'manage-account-links',
     'view-profile']}},
  'scope': 'email profile',
  'email_verified': False,
  'preferred_username': 'john-d2c1d40b'})

## Query `userinfo`

Check the access token against the `userinfo` endpoint.

In [18]:
r = session.get(
    provider_info["userinfo_endpoint"], 
    headers={"Authorization": "Bearer %s" % access_token}
)
r.raise_for_status()
r.json()

{'sub': 'aa4f1f11-997a-468b-9d15-056c220c1455',
 'email_verified': False,
 'preferred_username': 'john-d2c1d40b'}

# Authorization Code Flow

Create a client that allows the Authorization Code flow

In [19]:
redirect_uri = "https://example.com/redir"
ac_client = keycloak_admin.create_client({
    "publicClient": False,
    "redirectUris": [redirect_uri]
})
ac_client_secret = keycloak_admin.get_client_secret(ac_client)
ac_client

INFO:keycloak-admin:Creating client with settings {'id': 'myclient-ebda8bec', 'publicClient': False, 'redirectUris': ['https://example.com/redir']}
INFO:keycloak-admin:Client secret response: '{"type":"secret","value":"b08a98e8-8eff-41e7-ba0a-7d6a42d5624c"}'


'myclient-ebda8bec'

And create a user

In [20]:
user, password = keycloak_admin.create_user()

Start Authorization Code flow: we are forwarded to the OpenID provider (log in page).

In [21]:
r = session.get(
    url=provider_info["authorization_endpoint"],
    params={
        "response_type": "code",
        "client_id": ac_client,
        "scope": "openid",
        "redirect_uri": redirect_uri,
        "state": "foobar",
    },
    headers={},
    allow_redirects=False
)
r.raise_for_status()

Extract the form action so we can submit the form (the requests session will take care of the cookies).

In [22]:
form_action = html.unescape(re.search('<form\s+.*?\s+action="(.*?)"', r.text, re.DOTALL).group(1))
form_action

'http://localhost:9090/auth/realms/master/login-actions/authenticate?session_code=nmC60BxdB7AYDTauUUN40p2Ek7OV7lREddQOfDKrBb0&execution=fdf3c29c-a1f3-4d24-82f8-00fb7a8b8115&client_id=myclient-ebda8bec&tab_id=OdTQc95vCT0'

Log in, capture redirect and extract authorization code.

In [23]:
r = session.post(
    form_action, 
    data={"username": user, "password": password},
    allow_redirects=False
)
assert r.status_code == 302
redirect = r.headers['Location']
print(redirect)
session.cookies.clear()

redirect_params = urllib.parse.parse_qs(urllib.parse.urlparse(redirect).query)
auth_code = redirect_params["code"]
auth_code

https://example.com/redir?state=foobar&session_state=712195f9-6dce-4379-9cd7-49c8583c2ecb&code=0c28e3db-08b5-423e-8432-d42a7dc1c538.712195f9-6dce-4379-9cd7-49c8583c2ecb.myclient-ebda8bec


['0c28e3db-08b5-423e-8432-d42a7dc1c538.712195f9-6dce-4379-9cd7-49c8583c2ecb.myclient-ebda8bec']

Exchange authorization code for an access token.

In [24]:
r = session.post(
    url=token_endpoint,
    data={
        "grant_type": "authorization_code",
        "client_id": ac_client,
        "client_secret": ac_client_secret,
        "redirect_uri": redirect_uri,
        "code": auth_code,
    },
    allow_redirects=False
)
r.raise_for_status()
r.json()

{'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJXVEdTMXJyN1pTY2RWU05DV1d5M0tiVUJXNFVaUDc2ZFZ1V1l4RTdYSnYwIn0.eyJqdGkiOiJmNjgzZGFiZC0xNDE1LTQ1NGUtYTFjZi0xMjEzZmRiZmNkYjYiLCJleHAiOjE1NzE3MzYwNTUsIm5iZiI6MCwiaWF0IjoxNTcxNzM1OTk1LCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjkwOTAvYXV0aC9yZWFsbXMvbWFzdGVyIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6IjllMjMyYWM0LTk0NjUtNDRhNC1hY2RkLWJjYTBhMTcxMmQxOCIsInR5cCI6IkJlYXJlciIsImF6cCI6Im15Y2xpZW50LWViZGE4YmVjIiwiYXV0aF90aW1lIjoxNTcxNzM1OTk1LCJzZXNzaW9uX3N0YXRlIjoiNzEyMTk1ZjktNmRjZS00Mzc5LTljZDctNDljODU4M2MyZWNiIiwiYWNyIjoiMSIsImFsbG93ZWQtb3JpZ2lucyI6WyJodHRwczovL2V4YW1wbGUuY29tIl0sInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwgcHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwicHJlZmVycmVkX3VzZXJuYW1lIjoiam9obi04ZDdiMGRmNyJ9.JKhhWaR2d1HxSL0MiJGMFAnuPSFAK9ia29aLotl5a

## JWT inspection

Extract access token and inspect it (assuming it is a JWT token).

In [25]:
access_token = r.json()["access_token"]
jwt_decode(access_token)

({'alg': 'RS256',
  'typ': 'JWT',
  'kid': 'WTGS1rr7ZScdVSNCWWy3KbUBW4UZP76dVuWYxE7XJv0'},
 {'jti': 'f683dabd-1415-454e-a1cf-1213fdbfcdb6',
  'exp': 1571736055,
  'nbf': 0,
  'iat': 1571735995,
  'iss': 'http://localhost:9090/auth/realms/master',
  'aud': 'account',
  'sub': '9e232ac4-9465-44a4-acdd-bca0a1712d18',
  'typ': 'Bearer',
  'azp': 'myclient-ebda8bec',
  'auth_time': 1571735995,
  'session_state': '712195f9-6dce-4379-9cd7-49c8583c2ecb',
  'acr': '1',
  'allowed-origins': ['https://example.com'],
  'realm_access': {'roles': ['offline_access', 'uma_authorization']},
  'resource_access': {'account': {'roles': ['manage-account',
     'manage-account-links',
     'view-profile']}},
  'scope': 'openid email profile',
  'email_verified': False,
  'preferred_username': 'john-8d7b0df7'})

## Query `userinfo`

Check the access token against the `userinfo` endpoint.

In [26]:
r = session.get(
    provider_info["userinfo_endpoint"], 
    headers={"Authorization": "Bearer %s" % access_token}
)
r.raise_for_status()
r.json()

{'sub': '9e232ac4-9465-44a4-acdd-bca0a1712d18',
 'email_verified': False,
 'preferred_username': 'john-8d7b0df7'}