# Copyright 2023-2024, Quantum Computing Incorporated
"""Client for QCi's auth API."""
from copy import deepcopy
from datetime import datetime
import os
from typing import Optional
import requests
from requests.compat import urljoin
from qci_client.auth import types
from qci_client.utilities import raise_for_status
TOKEN_EXPIRATION_MARGIN: float = 10 * 60.0 # seconds.
[docs]
class AuthClient:
"""Used to authenticate to QCi applications."""
def __init__(
self,
*,
url: Optional[str] = None,
api_token: Optional[str] = None,
timeout: Optional[float] = None,
):
"""
Handles authentication against QCi cloud APIs.
:param url: url basepath to API endpoint, including scheme, if None, then falls
back to QCI_API_URL environment variable
:param api_token: refresh token for authenticating to API, if None, then falls
back to QCI_TOKEN environment variable
:param timeout: number of seconds before timing out requests, None waits
indefinitely
"""
if not url:
self._url = os.getenv("QCI_API_URL", "")
else:
self._url = url
if not self._url:
raise ValueError(
"must specify url argument or QCI_API_URL environment variable"
)
if self._url[-1] != "/":
self._url = self._url + "/"
if not api_token:
self._refresh_token = os.getenv("QCI_TOKEN", "")
else:
self._refresh_token = api_token
if not self._refresh_token:
raise AssertionError(
"must specify api_token argument or QCI_TOKEN environment variable"
)
self._timeout = timeout
self._access_token_info: Optional[types.AccessTokensPostResponseBody] = None
@property
def url(self) -> str:
"""Return API URL."""
return self._url
@property
def api_token(self) -> str:
"""Return API token."""
return self._refresh_token
@property
def timeout(self) -> Optional[float]:
"""Return timeout setting."""
return self._timeout
@property
def access_tokens_url(self) -> str:
"""URL used for obtaining access tokens."""
return self.url + "auth/v1/access-tokens/"
@property
def access_token_info(self) -> types.AccessTokensPostResponseBody:
"""Return user's access token info, retrieving anew when absent or expired."""
if self._access_token_info:
# Authenticating with an expired token simply returns a 401: Status
# Unauthorized, so proactively check here for expiration impending or
# already happened. Expiration times look like "2023-07-15T08:16:59Z".
expiration = datetime.strptime(
self._access_token_info["expires_at_rfc3339"], "%Y-%m-%dT%H:%M:%SZ"
)
seconds_to_expiration = (expiration - datetime.utcnow()).total_seconds()
# Renew access token if it has expired or expiration is impending.
if seconds_to_expiration < TOKEN_EXPIRATION_MARGIN:
self._access_token_info = None
if not self._access_token_info:
# Retrieve new access token info.
self._access_token_info = self.post_access_tokens()
return deepcopy(self._access_token_info)
@property
def access_token(self) -> str:
"""Return user's access token, refreshing if expired or near expiration."""
return self.access_token_info["access_token"]
@property
def expires_at_rfc3339(self) -> str:
"""Return expiration of user's access token."""
return self.access_token_info["expires_at_rfc3339"]
@property
def token_type(self) -> str:
"""Return type of user's access token."""
return self.access_token_info["token_type"]
@property
def organization_id(self) -> str:
"""Return user's organization ID."""
return self.access_token_info["organization_id"]
@property
def user_id(self) -> str:
"""Return user's user ID."""
return self.access_token_info["user_id"]
@property
def headers_without_authorization(self) -> dict:
"""
HTTP headers without bearer token in Authorization header, but with
Content-Type, Connection, and optional X-Request-Timeout-Nano headers.
"""
headers = {
"Content-Type": "application/json",
# Simple, sessionless requests, so close connection proactively.
"Connection": "close",
}
if self.timeout is not None:
# Tell server when client will stop waiting for response.
headers["X-Request-Timeout-Nano"] = str(int(10**9 * self.timeout))
return headers
@property
def headers(self) -> dict:
"""HTTP headers with bearer token in Authorization header."""
headers = self.headers_without_authorization
headers["Authorization"] = f"Bearer {self.access_token}"
return headers
@property
def headers_without_connection_close(self):
"""Headers with cached bearer token, but without connection closing."""
headers = self.headers
headers.pop("Connection", None)
return headers
[docs]
def get_access_tokens_health(self) -> types.AccessTokensHealthGetResponseBody:
"""GET health."""
response = requests.get(
urljoin(self.access_tokens_url, "health"),
headers=self.headers_without_authorization,
timeout=self.timeout,
)
raise_for_status(response=response)
return response.json()
[docs]
def get_access_tokens_version(self) -> types.AccessTokensVersionGetResponseBody:
"""GET version."""
response = requests.get(
urljoin(self.access_tokens_url, "version"),
headers=self.headers_without_authorization,
timeout=self.timeout,
)
raise_for_status(response=response)
return response.json()
[docs]
def post_access_tokens(self) -> types.AccessTokensPostResponseBody:
"""
Authorize user via refresh token used to retrieve finite-lived access_token.
"""
json: types.AccessTokensPostRequestBody = {"refresh_token": self._refresh_token}
response = requests.post(
self.access_tokens_url,
headers=self.headers_without_authorization,
json=json,
timeout=self.timeout,
)
raise_for_status(response=response)
return response.json()