Source code for qci_client.auth.client

# Copyright 2023-2024, Quantum Computing Incorporated
"""Client for QCi's auth API."""

from copy import deepcopy
from datetime import datetime
import os
from typing import Optional

import requests
from requests.compat import urljoin

from qci_client.auth import types
from qci_client.utilities import raise_for_status

TOKEN_EXPIRATION_MARGIN: float = 10 * 60.0  # seconds.


[docs] class AuthClient: """Used to authenticate to QCi applications.""" def __init__( self, *, url: Optional[str] = None, api_token: Optional[str] = None, timeout: Optional[float] = None, ): """ Handles authentication against QCi cloud APIs. :param url: url basepath to API endpoint, including scheme, if None, then falls back to QCI_API_URL environment variable :param api_token: refresh token for authenticating to API, if None, then falls back to QCI_TOKEN environment variable :param timeout: number of seconds before timing out requests, None waits indefinitely """ if not url: self._url = os.getenv("QCI_API_URL", "") else: self._url = url if not self._url: raise ValueError( "must specify url argument or QCI_API_URL environment variable" ) if self._url[-1] != "/": self._url = self._url + "/" if not api_token: self._refresh_token = os.getenv("QCI_TOKEN", "") else: self._refresh_token = api_token if not self._refresh_token: raise AssertionError( "must specify api_token argument or QCI_TOKEN environment variable" ) self._timeout = timeout self._access_token_info: Optional[types.AccessTokensPostResponseBody] = None @property def url(self) -> str: """Return API URL.""" return self._url @property def api_token(self) -> str: """Return API token.""" return self._refresh_token @property def timeout(self) -> Optional[float]: """Return timeout setting.""" return self._timeout @property def access_tokens_url(self) -> str: """URL used for obtaining access tokens.""" return self.url + "auth/v1/access-tokens/" @property def access_token_info(self) -> types.AccessTokensPostResponseBody: """Return user's access token info, retrieving anew when absent or expired.""" if self._access_token_info: # Authenticating with an expired token simply returns a 401: Status # Unauthorized, so proactively check here for expiration impending or # already happened. Expiration times look like "2023-07-15T08:16:59Z". expiration = datetime.strptime( self._access_token_info["expires_at_rfc3339"], "%Y-%m-%dT%H:%M:%SZ" ) seconds_to_expiration = (expiration - datetime.utcnow()).total_seconds() # Renew access token if it has expired or expiration is impending. if seconds_to_expiration < TOKEN_EXPIRATION_MARGIN: self._access_token_info = None if not self._access_token_info: # Retrieve new access token info. self._access_token_info = self.post_access_tokens() return deepcopy(self._access_token_info) @property def access_token(self) -> str: """Return user's access token, refreshing if expired or near expiration.""" return self.access_token_info["access_token"] @property def expires_at_rfc3339(self) -> str: """Return expiration of user's access token.""" return self.access_token_info["expires_at_rfc3339"] @property def token_type(self) -> str: """Return type of user's access token.""" return self.access_token_info["token_type"] @property def organization_id(self) -> str: """Return user's organization ID.""" return self.access_token_info["organization_id"] @property def user_id(self) -> str: """Return user's user ID.""" return self.access_token_info["user_id"] @property def headers_without_authorization(self) -> dict: """ HTTP headers without bearer token in Authorization header, but with Content-Type, Connection, and optional X-Request-Timeout-Nano headers. """ headers = { "Content-Type": "application/json", # Simple, sessionless requests, so close connection proactively. "Connection": "close", } if self.timeout is not None: # Tell server when client will stop waiting for response. headers["X-Request-Timeout-Nano"] = str(int(10**9 * self.timeout)) return headers @property def headers(self) -> dict: """HTTP headers with bearer token in Authorization header.""" headers = self.headers_without_authorization headers["Authorization"] = f"Bearer {self.access_token}" return headers @property def headers_without_connection_close(self): """Headers with cached bearer token, but without connection closing.""" headers = self.headers headers.pop("Connection", None) return headers
[docs] def get_access_tokens_health(self) -> types.AccessTokensHealthGetResponseBody: """GET health.""" response = requests.get( urljoin(self.access_tokens_url, "health"), headers=self.headers_without_authorization, timeout=self.timeout, ) raise_for_status(response=response) return response.json()
[docs] def get_access_tokens_version(self) -> types.AccessTokensVersionGetResponseBody: """GET version.""" response = requests.get( urljoin(self.access_tokens_url, "version"), headers=self.headers_without_authorization, timeout=self.timeout, ) raise_for_status(response=response) return response.json()
[docs] def post_access_tokens(self) -> types.AccessTokensPostResponseBody: """ Authorize user via refresh token used to retrieve finite-lived access_token. """ json: types.AccessTokensPostRequestBody = {"refresh_token": self._refresh_token} response = requests.post( self.access_tokens_url, headers=self.headers_without_authorization, json=json, timeout=self.timeout, ) raise_for_status(response=response) return response.json()