import csv import datetime import uuid import os import requests from google.cloud import bigquery from google.cloud import storage from itertools import islice from datetime import timedelta from dateutil.relativedelta import relativedelta query_range = 5 now = datetime.datetime.now() delta = now - relativedelta(days=query_range) year = now.strftime("%Y") month = now.strftime("%m") day = now.strftime("%d") # Finalized data # Appends finalized data to the last day of the previous month # month_end = now.replace(day=1) - timedelta(days=1) # delta = now.replace(day=1) - timedelta(days=query_range) # year = month_end.strftime("%Y") # month = month_end.strftime("%m") # day = month_end.strftime("%d") report_prefix=f"{year}/{month}/{day}/{uuid.uuid4()}" # Required vars to update CLIENT_ID = os.getenv('client_id') # C.R.C service account client_id CLIENT_SECRET = os.getenv('client_secret') # C.R.C service account client secret INTEGRATION_ID = "CHANGE-ME" # Cost management integration_id BUCKET = "CHANGE-ME" # Filtered data GCP Bucket PROJECT_ID = "CHANGE-ME" # Your project ID DATASET = "CHANGE-ME" # Your dataset name TABLE_ID = "CHANGE-ME" # Your table ID gcp_big_query_columns = [ "billing_account_id", "service.id", "service.description", "sku.id", "sku.description", "usage_start_time", "usage_end_time", "project.id", "project.name", "project.labels", "project.ancestry_numbers", "labels", "system_labels", "location.location", "location.country", "location.region", "location.zone", "export_time", "cost", "currency", "currency_conversion_rate", "usage.amount", "usage.unit", "usage.amount_in_pricing_units", "usage.pricing_unit", "credits", "invoice.month", "cost_type", "resource.name", "resource.global_name", ] table_name = ".".join([PROJECT_ID, DATASET, TABLE_ID]) BATCH_SIZE = 200000 def batch(iterable, n): """Yields successive n-sized chunks from iterable""" it = iter(iterable) while chunk := tuple(islice(it, n)): yield chunk def build_query_select_statement(): """Helper to build query select statement.""" columns_list = gcp_big_query_columns.copy() columns_list = [ f"TO_JSON_STRING({col})" if col in ("labels", "system_labels", "project.labels", "credits") else col for col in columns_list ] columns_list.append("DATE(_PARTITIONTIME) as partition_date") return ",".join(columns_list) def get_service_account_token(): """Get correct authentication token for your C.R.C service account""" token_url = 'https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token' token_headers = { 'Content-Type': 'application/x-www-form-urlencoded' } token_data = { "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "grant_type": "client_credentials" } return requests.post(token_url, headers=token_headers, data=token_data) def create_reports(query_date): query = f"SELECT {build_query_select_statement()} FROM {table_name} WHERE DATE(_PARTITIONTIME) = {query_date} AND sku.description LIKE '%RedHat%' OR sku.description LIKE '%Red Hat%' OR sku.description LIKE '%RHEL%' OR service.description LIKE '%Red Hat%' ORDER BY usage_start_time" client = bigquery.Client() query_job = client.query(query).result() column_list = gcp_big_query_columns.copy() column_list.append("partition_date") daily_files = [] storage_client = storage.Client() bucket = storage_client.bucket(BUCKET) for i, rows in enumerate(batch(query_job, BATCH_SIZE)): csv_file = f"{report_prefix}/{query_date}_part_{str(i)}.csv" daily_files.append(csv_file) blob = bucket.blob(csv_file) with blob.open(mode='w') as f: writer = csv.writer(f) writer.writerow(column_list) writer.writerows(rows) return daily_files def post_data(files_list): # Post CSV's to console.redhat.com API url = "https://console.redhat.com/api/cost-management/v1/ingress/reports/" json_data = {"source": INTEGRATION_ID, "reports_list": files_list, "bill_year": year, "bill_month": month} token_response = get_service_account_token() headers = { 'Authorization': f'Bearer {token_response.json().get("access_token")}', 'Accept': 'application/json' } resp = requests.post(url, json=json_data, headers=headers) return resp def get_filtered_data(request): files_list = [] query_dates = [delta + datetime.timedelta(days=x) for x in range(query_range)] for query_date in query_dates: files_list += create_reports(query_date.date()) resp = post_data(files_list) return f'Files posted! {resp}'