"""This file was generated using `langgraph-gen` version 0.0.3.
This file provides a placeholder implementation for the corresponding stub.
Replace the placeholder implementation with your own logic.
"""
from typing_extensions import TypedDict
from aws_cost.stub import CostAgent
from typing_extensions import Annotated, TypedDict
from langgraph.graph.message import add_messages
from datetime import datetime, timedelta
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate
from langgraph.constants import START, END
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles
from pydantic import BaseModel, Field
import pandas as pd
import plotly.express as px
import plotly.io as pio
import boto3
import logging
import sys
import base64
import random
import chat
import os
import json
import traceback
import asyncio
import aws_cost.reflection_agent as reflection_agent
import string
import trans
import utils
logging.basicConfig(
level=logging.INFO, # Default to INFO level
format='%(filename)s:%(lineno)d | %(message)s',
handlers=[
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger("cost_analysis")
aws_access_key = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
aws_session_token = os.environ.get('AWS_SESSION_TOKEN')
aws_region = os.environ.get('AWS_DEFAULT_REGION', 'us-west-2')
index = 0
def add_notification(containers, message):
global index
containers['notification'][index].info(message)
index += 1
def get_url(figure, prefix):
# Convert fig_pie to base64 image
img_bytes = pio.to_image(figure, format="png")
base64_image = base64.b64encode(img_bytes).decode('utf-8')
random_id = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=8))
image_filename = f'{prefix}_{random_id}.png'
# Convert base64 string back to bytes for S3 upload
image_bytes = base64.b64decode(base64_image)
url = chat.upload_to_s3(image_bytes, image_filename)
logger.info(f"Uploaded image to S3: {url}")
return url
def get_prompt_template(prompt_name: str) -> str:
template = open(os.path.join(os.path.dirname(__file__), f"{prompt_name}.md")).read()
# logger.info(f"template: {template}")
return template
def get_summary(figure, instruction):
img_bytes = pio.to_image(figure, format="png")
base64_image = base64.b64encode(img_bytes).decode('utf-8')
summary = chat.summary_image(base64_image, instruction)
summary = summary.split("")[1].split("")[0]
logger.info(f"summary: {summary}")
return summary
# Reflection
class Reflection(BaseModel):
missing: str = Field(description="Critique of what is missing.")
advisable: str = Field(description="Critique of what is helpful for better answer")
superfluous: str = Field(description="Critique of what is superfluous")
class Research(BaseModel):
"""Provide reflection and then follow up with search queries to improve the answer."""
reflection: Reflection = Field(description="Your reflection on the initial answer.")
search_queries: list[str] = Field(
description="1-3 search queries for researching improvements to address the critique of your current answer."
)
def reflect(draft):
logger.info(f"###### reflect ######")
reflection = []
search_queries = []
for attempt in range(5):
llm = chat.get_chat(extended_thinking="Disable")
structured_llm = llm.with_structured_output(Research, include_raw=True)
info = structured_llm.invoke(draft)
logger.info(f'attempt: {attempt}, info: {info}')
if not info['parsed'] == None:
parsed_info = info['parsed']
reflection = [parsed_info.reflection.missing, parsed_info.reflection.advisable]
logger.info(f"reflection: {reflection}")
search_queries = parsed_info.search_queries
logger.info(f"search_queries: {search_queries}")
break
return {
"reflection": reflection,
"search_queries": search_queries
}
def revise_draft(draft, context):
logger.info(f"###### revise_draft ######")
system = (
"당신은 보고서를 잘 작성하는 논리적이고 똑똑한 AI입니다."
"당신이 작성하여야 할 보고서 의 소제목과 기본 포맷을 유지한 상태에서, 다음의 의 내용을 추가합니다."
"초등학생도 쉽게 이해하도록 풀어서 씁니다."
)
human = (
"{draft}"
"{context}"
)
reflection_prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("human", human)
]
)
reflect = reflection_prompt | chat.get_chat(extended_thinking="Disable")
result = reflect.invoke({
"draft": draft,
"context": context
})
logger.info(f"result: {result.content}")
return result.content
#########################################################
# Cost Agent
#########################################################
class CostState(TypedDict):
service_costs: dict
region_costs: dict
daily_costs: dict
additional_context: list[str]
appendix: list[str]
iteration: int
reflection: list[str]
final_response: str
# Define stand-alone functions
status_msg = []
def get_status_msg(status):
global status_msg
status_msg.append(status)
if status != "end":
status = " -> ".join(status_msg)
return "[status]\n" + status + "..."
else:
status = " -> ".join(status_msg)
return "[status]\n" + status
response_msg = []
def service_cost(state: CostState, config) -> dict:
logger.info(f"###### service_cost ######")
logger.info(f"Getting cost analysis...")
days = 30
request_id = config.get("configurable", {}).get("request_id", "")
containers = config.get("configurable", {}).get("containers", None)
try:
if chat.debug_mode == "Enable":
containers["status"].info(get_status_msg("service_cost"))
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# cost explorer
if aws_access_key and aws_secret_key:
ce = boto3.client(
service_name='ce',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
aws_session_token=aws_session_token
)
else:
ce = boto3.client(
service_name='ce'
)
# service cost
service_response = ce.get_cost_and_usage(
TimePeriod={
'Start': start_date.strftime('%Y-%m-%d'),
'End': end_date.strftime('%Y-%m-%d')
},
Granularity='MONTHLY',
Metrics=['UnblendedCost'],
GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
)
logger.info(f"service_response: {service_response}")
except Exception as e:
logger.info(f"Error in cost analysis: {str(e)}")
return None
service_costs = pd.DataFrame([
{
'SERVICE': group['Keys'][0],
'cost': float(group['Metrics']['UnblendedCost']['Amount'])
}
for group in service_response['ResultsByTime'][0]['Groups']
])
logger.info(f"Service Costs: {service_costs}")
if chat.debug_mode == "Enable":
value = service_costs.to_string()
add_notification(containers, value)
response_msg.append(value)
# service cost (pie chart)
fig_pie = px.pie(
service_costs,
values='cost',
names='SERVICE',
color='SERVICE',
title='Service Cost',
template='plotly_white', # Clean background
color_discrete_sequence=px.colors.qualitative.Set3 # Color palette
)
url = get_url(fig_pie, "service_cost")
task = "AWS 서비스 사용량"
output_images = f"\n\n"
key = f"artifacts/{request_id}_steps.md"
time = f"# {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
instruction = f"이 이미지는 {task}에 대한 그래프입니다. 하나의 문장으로 이 그림에 대해 500자로 설명하세요."
summary = get_summary(fig_pie, instruction)
body = f"### {task}\n\n{output_images}\n\n{summary}\n\n"
chat.updata_object(key, time + body, 'append')
if chat.debug_mode == "Enable":
value = body
add_notification(containers, value)
response_msg.append(value)
appendix = state["appendix"] if "appendix" in state else []
appendix.append(body)
return {
"appendix": appendix,
"service_costs": service_response,
}
def region_cost(state: CostState, config) -> dict:
logger.info(f"###### region_cost ######")
logger.info(f"Getting cost analysis...")
days = 30
request_id = config.get("configurable", {}).get("request_id", "")
containers = config.get("configurable", {}).get("containers", None)
try:
if chat.debug_mode == "Enable":
containers["status"].info(get_status_msg("region_cost"))
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# cost explorer
if aws_access_key and aws_secret_key:
ce = boto3.client(
service_name='ce',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
aws_session_token=aws_session_token
)
else:
ce = boto3.client(
service_name='ce'
)
# region cost
region_response = ce.get_cost_and_usage(
TimePeriod={
'Start': start_date.strftime('%Y-%m-%d'),
'End': end_date.strftime('%Y-%m-%d')
},
Granularity='MONTHLY',
Metrics=['UnblendedCost'],
GroupBy=[{'Type': 'DIMENSION', 'Key': 'REGION'}]
)
logger.info(f"region_response: {region_response}")
except Exception as e:
logger.info(f"Error in cost analysis: {str(e)}")
return None
region_costs = pd.DataFrame([
{
'REGION': group['Keys'][0],
'cost': float(group['Metrics']['UnblendedCost']['Amount'])
}
for group in region_response['ResultsByTime'][0]['Groups']
])
logger.info(f"Region Costs: {region_costs}")
if chat.debug_mode == "Enable":
value = region_costs.to_string()
add_notification(containers, f"region_cost: {value}")
response_msg.append(value)
# region cost (bar chart)
fig_bar = px.bar(
region_costs,
x='REGION',
y='cost',
color='REGION',
title='Region Cost',
template='plotly_white', # Clean background
color_discrete_sequence=px.colors.qualitative.Set3 # Color palette
)
url = get_url(fig_bar, "region_costs")
task = "AWS 리전별 사용량"
output_images = f"\n\n"
key = f"artifacts/{request_id}_steps.md"
time = f"# {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
instruction = f"이 이미지는 {task}에 대한 그래프입니다. 하나의 문장으로 이 그림에 대해 500자로 설명하세요. 여기서 비용 단위는 dollar 입니다."
summary = get_summary(fig_bar, instruction)
body = f"### {task}\n\n{output_images}\n\n{summary}\n\n"
chat.updata_object(key, time + body, 'append')
if chat.debug_mode == "Enable":
value = body
add_notification(containers, f"{value}")
response_msg.append(time + value)
appendix = state["appendix"] if "appendix" in state else []
appendix.append(body)
return {
"appendix": appendix,
"region_costs": region_response
}
def daily_cost(state: CostState, config) -> dict:
logger.info(f"###### daily_cost ######")
logger.info(f"Getting cost analysis...")
days = 30
request_id = config.get("configurable", {}).get("request_id", "")
containers = config.get("configurable", {}).get("containers", None)
try:
if chat.debug_mode == "Enable":
containers["status"].info(get_status_msg("daily_cost"))
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# cost explorer
if aws_access_key and aws_secret_key:
ce = boto3.client(
service_name='ce',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
aws_session_token=aws_session_token
)
else:
ce = boto3.client(
service_name='ce'
)
# Daily Cost
daily_response = ce.get_cost_and_usage(
TimePeriod={
'Start': start_date.strftime('%Y-%m-%d'),
'End': end_date.strftime('%Y-%m-%d')
},
Granularity='DAILY',
Metrics=['UnblendedCost'],
GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
)
logger.info(f"Daily Cost: {daily_response}")
except Exception as e:
logger.info(f"Error in cost analysis: {str(e)}")
return None
daily_costs = []
for time_period in daily_response['ResultsByTime']:
date = time_period['TimePeriod']['Start']
for group in time_period['Groups']:
daily_costs.append({
'date': date,
'SERVICE': group['Keys'][0],
'cost': float(group['Metrics']['UnblendedCost']['Amount'])
})
daily_costs_df = pd.DataFrame(daily_costs)
logger.info(f"Daily Costs: {daily_costs_df}")
if chat.debug_mode == "Enable":
value = daily_costs_df.to_string()
add_notification(containers, f"daily_cost: {value}")
response_msg.append(value)
# daily trend cost (line chart)
fig_line = px.line(
daily_costs_df,
x='date',
y='cost',
color='SERVICE',
title='Daily Cost Trend',
template='plotly_white', # Clean background
markers=True, # Add markers to data points
line_shape='spline' # Smooth curve display
)
url = get_url(fig_line, "daily_costs")
task = "AWS 일자별 사용량"
output_images = f"\n\n"
key = f"artifacts/{request_id}_steps.md"
time = f"# {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
instruction = f"이 이미지는 {task}에 대한 그래프입니다. 하나의 문장으로 이 그림에 대해 500자로 설명하세요. 여기서 비용 단위는 dollar 입니다."
summary = get_summary(fig_line, instruction)
body = f"### {task}\n\n{output_images}\n\n{summary}\n\n"
chat.updata_object(key, time + body, 'append')
if chat.debug_mode == "Enable":
value = body
add_notification(containers, f"{value}")
response_msg.append(value)
appendix = state["appendix"] if "appendix" in state else []
appendix.append(body)
return {
"appendix": appendix,
"daily_costs": daily_response
}
def generate_insight(state: CostState, config) -> dict:
logger.info(f"###### generate_insight ######")
prompt_name = "cost_insight"
request_id = config.get("configurable", {}).get("request_id", "")
additional_context = state["additional_context"] if "additional_context" in state else []
containers = config.get("configurable", {}).get("containers", None)
system_prompt=get_prompt_template(prompt_name)
logger.info(f"system_prompt: {system_prompt}")
human = (
"다음 AWS 비용 데이터를 분석하여 상세한 인사이트를 제공해주세요:"
"Cost Data:"
"{service_costs}"
"{region_costs}"
"{daily_costs}"
"다음의 additional_context는 관련된 다른 보고서입니다. 이 보고서를 현재 작성하는 보고서에 추가해주세요. 단, 전체적인 문맥에 영향을 주면 안됩니다."
"{additional_context}"
)
prompt = ChatPromptTemplate.from_messages([("system", system_prompt), ("human", human)])
# logger.info(f'prompt: {prompt}')
llm = chat.get_chat(extended_thinking="Disable")
chain = prompt | llm
service_costs = json.dumps(state["service_costs"])
region_costs = json.dumps(state["region_costs"])
daily_costs = json.dumps(state["daily_costs"])
try:
if chat.debug_mode == "Enable":
containers["status"].info(get_status_msg('generate_insight'))
response = chain.invoke(
{
"service_costs": service_costs,
"region_costs": region_costs,
"daily_costs": daily_costs,
"additional_context": additional_context
}
)
logger.info(f"response: {response.content}")
except Exception:
err_msg = traceback.format_exc()
logger.debug(f"error message: {err_msg}")
raise Exception ("Not able to request to LLM")
# logging in step.md
key = f"artifacts/{request_id}_steps.md"
time = f"# {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
chat.updata_object(key, time + response.content, 'append')
# report.md
key = f"artifacts/{request_id}_report.md"
body = "# AWS 사용량 분석\n\n" + response.content + "\n\n"
appendix = state["appendix"] if "appendix" in state else []
values = '\n\n'.join(appendix)
logger.info(f"body: {body}")
chat.updata_object(key, time+body+values, 'prepend')
if chat.debug_mode == "Enable":
value = response.content
add_notification(containers, f"{value}")
response_msg.append(value)
iteration = state["iteration"] if "iteration" in state else 0
return {
"final_response": body+values,
"iteration": iteration+1
}
def reflect_context(state: CostState, config) -> dict:
logger.info(f"###### reflect_context ######")
containers = config.get("configurable", {}).get("containers", None)
if chat.debug_mode == "Enable":
containers["status"].info(get_status_msg("reflect_context"))
# earn reflection from the previous final response
result = reflect(state["final_response"])
logger.info(f"reflection result: {result}")
# logging in step.md
request_id = config.get("configurable", {}).get("request_id", "")
key = f"artifacts/{request_id}_steps.md"
time = f"# {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
body = f"Reflection: {result['reflection']}\n\nSearch Queries: {result['search_queries']}\n\n"
chat.updata_object(key, time + body, 'append')
if chat.debug_mode == "Enable":
value = body
add_notification(containers, f"## Reflection\n\n{value}")
response_msg.append(value)
return {
"reflection": result
}
def mcp_tools(state: CostState, config) -> dict:
logger.info(f"###### mcp_tools ######")
draft = state['final_response']
containers = config.get("configurable", {}).get("containers", None)
mcp_servers = config.get("configurable", {}).get("mcp_servers", None)
appendix = state["appendix"] if "appendix" in state else []
if chat.debug_mode == "Enable":
containers["status"].info(get_status_msg("mcp_tools"))
global status_msg, response_msg
reflection_result, image_url, status_msg, response_msg = asyncio.run(reflection_agent.run_reflection_agent(draft, state["reflection"], mcp_servers, containers, status_msg, response_msg))
logger.info(f"reflection result: {reflection_result}")
value = ""
if image_url:
for url in image_url:
value += f"\n\n"
if value:
logger.info(f"value: {value}")
appendix.append(value)
# logging in step.md
request_id = config.get("configurable", {}).get("request_id", "")
key = f"artifacts/{request_id}_steps.md"
time = f"# {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
body = f"{reflection_result}\n\n"
value = '\n\n'.join(appendix)
chat.updata_object(key, time + body + value, 'append')
if chat.debug_mode == "Enable":
value = body
add_notification(containers, f"## Reflected Report\n\n{value}")
response_msg.append(value)
additional_context = state["additional_context"] if "additional_context" in state else []
additional_context.append(reflection_result)
return {
"additional_context": additional_context
}
def should_end(state: CostState, config) -> str:
logger.info(f"###### should_end ######")
iteration = state["iteration"] if "iteration" in state else 0
containers = config.get("configurable", {}).get("containers", None)
if iteration > config.get("configurable", {}).get("max_iteration", 1):
logger.info(f"max iteration reached!")
if chat.debug_mode == "Enable":
containers["status"].info(get_status_msg("end"))
next = END
else:
logger.info(f"additional information is required!")
next = "reflect_context"
return next
agent = CostAgent(
state_schema=CostState,
impl=[
("service_cost", service_cost),
("region_cost", region_cost),
("daily_cost", daily_cost),
("generate_insight", generate_insight),
("reflect_context", reflect_context),
("mcp_tools", mcp_tools),
("should_end", should_end),
],
)
cost_agent = agent.compile()
def create_final_report(request_id, question, body, urls):
# report.html
output_html = trans.trans_md_to_html(body, question)
chat.create_object(f"artifacts/{request_id}_report.html", output_html)
logger.info(f"url of html: {chat.path}/artifacts/{request_id}_report.html")
urls.append(f"{chat.path}/artifacts/{request_id}_report.html")
output = asyncio.run(utils.generate_pdf_report(body, request_id))
logger.info(f"result of generate_pdf_report: {output}")
if output: # reports/request_id.pdf
pdf_filename = f"artifacts/{request_id}.pdf"
with open(pdf_filename, 'rb') as f:
pdf_bytes = f.read()
chat.upload_to_s3_artifacts(pdf_bytes, f"{request_id}.pdf")
logger.info(f"url of pdf: {chat.path}/artifacts/{request_id}.pdf")
urls.append(f"{chat.path}/artifacts/{request_id}.pdf")
# report.md
key = f"artifacts/{request_id}_report.md"
time = f"# {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
final_result = body + "\n\n" + f"## 최종 결과\n\n"+'\n\n'.join(urls)
chat.create_object(key, time + final_result)
return urls
def run_cost_agent(mcp_servers, st):
logger.info(f"###### run_cost_agent ######")
request_id = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
template = open(os.path.join(os.path.dirname(__file__), f"report.html")).read()
template = template.replace("{request_id}", request_id)
template = template.replace("{sharing_url}", chat.path)
key = f"artifacts/{request_id}.html"
chat.create_object(key, template)
logger.info(f"request_id: {request_id}")
report_url = chat.path + "/artifacts/" + request_id + ".html"
logger.info(f"report_url: {report_url}")
st.info(f"report_url: {report_url}")
# draw a graph
graph_diagram = cost_agent.get_graph().draw_mermaid_png(
draw_method=MermaidDrawMethod.API,
curve_style=CurveStyle.LINEAR
)
random_id = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=8))
image_filename = f'workflow_{random_id}.png'
url = chat.upload_to_s3(graph_diagram, image_filename)
# add plan to report
key = f"artifacts/{request_id}_plan.md"
task = "실행 계획"
output_images = f"\n\n"
body = f"### {task}\n\n{output_images}"
chat.updata_object(key, body, 'prepend')
# show status and response
containers = {
"tools": st.empty(),
"status": st.empty(),
"notification": [st.empty() for _ in range(500)]
}
if chat.debug_mode == "Enable":
containers["status"].info(get_status_msg("(start"))
global index, status_msg, response_msg
index = 0
status_msg = []
response_msg = []
question = "AWS 사용량을 분석하세요."
inputs = {
"messages": [HumanMessage(content=question)],
"final_response": ""
}
config = {
"request_id": request_id,
"recursion_limit": 50,
"max_iteration": 1,
"mcp_servers": mcp_servers,
"containers": containers
}
value = None
for output in cost_agent.stream(inputs, config):
for key, value in output.items():
logger.info(f"--> key: {key}, value: {value}")
logger.info(f"value: {value}")
urls = [report_url]
urls = create_final_report(request_id, "AWS 비용 분석 보고서", value["final_response"], urls)
logger.info(f"urls: {urls}")
if urls and chat.debug_mode == "Enable":
logger.info(f"urls: {urls}")
with st.expander(f"최종 결과"):
st.markdown("\n".join(urls))
return value["final_response"]