import os import re import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import matplotlib.ticker as mticker # --- Configurações de Caminhos e Nomes de Arquivo --- # Definindo os caminhos para os arquivos de entrada e nomes para os arquivos CSV intermediários e finais. # É uma boa prática centralizar essas configurações para facilitar a manutenção. # Link do arquivo original: https://cdn3.gnarususercontent.com.br/2927-pandas-selecao-agrupamento-dados/POP2022_Municipios.xls # Link do arquivo original: https://cdn3.gnarususercontent.com.br/2927-pandas-selecao-agrupamento-dados/1-SEEG10_GERAL-BR_UF_2022.10.27-FINAL-SITE.xlsx path_pop = r"D:\Python para Data Science\POP2022_Municipios.xls" path_emissoes = r"D:\Python para Data Science\1-SEEG10_GERAL-BR_UF_2022.10.27-FINAL-SITE.xlsx" sheet_emissoes = "GEE Estados" csv_municipios = "dados_municipais.csv" csv_emissoes = "gee_emissoes_estados.csv" csv_final = "dados_combinados.csv" # --- Funções de Carregamento e Pré-processamento de Dados --- def carregar_dados_populacao(path_pop, csv_municipios): """ Carrega os dados de população de um arquivo Excel do IBGE (POP2022_Municipios.xls), realiza a limpeza e agregação por UF, e salva o resultado em um arquivo CSV. Se o arquivo CSV já existir, carrega os dados diretamente dele para otimizar. Args: path_pop (str): Caminho completo para o arquivo Excel de população. csv_municipios (str): Nome do arquivo CSV para salvar/carregar os dados agregados de população. Returns: pd.DataFrame: Um DataFrame contendo a população agregada por UF. Colunas: 'UF', 'COD. UF', 'POPULAÇÃO'. """ if not os.path.exists(csv_municipios): print(f"Processando dados de população de '{os.path.basename(path_pop)}'...") # Tenta encontrar a linha de cabeçalho correta no Excel. # Isso é útil pois alguns arquivos Excel têm metadados antes do cabeçalho real. header_row = 0 for i in range(10): # Limita a busca às primeiras 10 linhas para evitar loops infinitos df_test = pd.read_excel(path_pop, skiprows=i) if df_test.columns.str.contains("POPULAÇÃO").any(): header_row = i break df_pop = pd.read_excel(path_pop, skiprows=header_row) # Filtra linhas onde a coluna 'POPULAÇÃO' não é nula, garantindo dados válidos. df_pop = df_pop[df_pop["POPULAÇÃO"].notna()] # Limpa a coluna 'POPULAÇÃO': # 1. Converte para string para aplicar operações de string. # 2. Remove qualquer texto entre parênteses (ex: "(Estimativa)"). # 3. Remove pontos de milhar (ex: "1.000.000" vira "1000000"). # 4. Remove espaços em branco extras. df_pop["POPULAÇÃO"] = ( df_pop["POPULAÇÃO"] .astype(str) .str.replace(r"\(.*?\)", "", regex=True) .str.replace(".", "", regex=False) .str.strip() ) # Converte a coluna 'POPULAÇÃO' para numérico, tratando erros como NaN e preenchendo com 0, # depois converte para inteiro. df_pop["POPULAÇÃO"] = pd.to_numeric(df_pop["POPULAÇÃO"], errors="coerce").fillna(0).astype(int) # Remove colunas desnecessárias para a agregação por estado. df_pop = df_pop.drop(columns=["COD. MUNIC", "NOME DO MUNICÍPIO"], errors='ignore') # Agrega a população somando por 'UF' e 'COD. UF'. df_pop_agg = df_pop.groupby(["UF", "COD. UF"], as_index=False)["POPULAÇÃO"].sum() df_pop_agg.to_csv(csv_municipios, index=False) print(f"Dados de população processados e salvos em '{csv_municipios}'.") else: # Se o CSV já existe, carrega diretamente, economizando tempo. df_pop_agg = pd.read_csv(csv_municipios) print(f"Dados de população carregados de '{csv_municipios}'.") return df_pop_agg def carregar_emissoes(path_emissoes, sheet_emissoes, csv_emissoes): """ Carrega os dados de emissões de GEE de um arquivo Excel (SEEG), realiza a limpeza e agregação por Estado, e salva o resultado em um arquivo CSV. Se o arquivo CSV já existir, carrega os dados diretamente dele para otimizar. Args: path_emissoes (str): Caminho completo para o arquivo Excel de emissões. sheet_emissoes (str): Nome da aba dentro do arquivo Excel a ser lida. csv_emissoes (str): Nome do arquivo CSV para salvar/carregar os dados agregados de emissões. Returns: pd.DataFrame: Um DataFrame contendo as emissões agregadas por Estado. Colunas: 'Estado' e colunas para cada ano de emissão. """ if not os.path.exists(csv_emissoes): print(f"Processando dados de emissões de '{os.path.basename(path_emissoes)}'...") df_emissoes = pd.read_excel(path_emissoes, sheet_name=sheet_emissoes) # Define as colunas a serem descartadas, que são detalhes de nível mais baixo # não necessários para a agregação por estado. colunas_drop = [ "Nível 1 - Setor", "Nível 2", "Nível 3", "Nível 4", "Nível 5", "Nível 6", "Emissão / Remoção / Bunker", "Gás", "Atividade Econômica", "Produto" ] df_emissoes = df_emissoes.drop(columns=colunas_drop, errors='ignore') # Identifica as colunas que representam anos (assumindo formato 'YYYY'). col_ano = [col for col in df_emissoes.columns if re.match(r'^\d{4}$', str(col))] # Agrega as emissões somando por 'Estado' e para cada ano. df_emissoes_agg = df_emissoes.groupby("Estado", as_index=False)[col_ano].sum() df_emissoes_agg.to_csv(csv_emissoes, index=False) print(f"Dados de emissões processados e salvos em '{csv_emissoes}'.") else: # Se o CSV já existe, carrega diretamente. df_emissoes_agg = pd.read_csv(csv_emissoes) print(f"Dados de emissões carregados de '{csv_emissoes}'.") return df_emissoes_agg def combinar_dados(df_pop_agg, df_emissoes_agg, csv_final): """ Combina os DataFrames de população e emissões, calcula as emissões totais e per capita por UF, e salva o resultado final em um arquivo CSV. Se o arquivo CSV final já existir, carrega os dados diretamente dele. Args: df_pop_agg (pd.DataFrame): DataFrame com dados de população agregados por UF. df_emissoes_agg (pd.DataFrame): DataFrame com dados de emissões agregados por Estado/UF. csv_final (str): Nome do arquivo CSV para salvar/carregar os dados combinados. Returns: pd.DataFrame: O DataFrame final combinado com população, emissões anuais, emissões totais e emissões per capita por UF. """ if not os.path.exists(csv_final): print("Combinando dados de população e emissões...") # Combina os DataFrames usando 'UF' e 'Estado' como chaves de junção. # 'how="inner"' garante que apenas UFs presentes em ambos os DataFrames sejam incluídas. df_final = pd.merge(df_pop_agg, df_emissoes_agg, left_on="UF", right_on="Estado", how="inner") # A coluna 'Estado' é redundante após a junção, pois 'UF' já a representa. df_final = df_final.drop(columns=["Estado"]) # Identifica as colunas de ano para calcular as emissões totais. anos = [col for col in df_final.columns if re.match(r'^\d{4}$', col)] # Calcula as emissões totais somando as emissões de todos os anos para cada UF. df_final["EMISSÕES_TOTAIS"] = df_final[anos].sum(axis=1) # Calcula as emissões per capita. Adiciona uma verificação para evitar divisão por zero. df_final["EMISSÕES_PER_CAPITA"] = df_final.apply( lambda row: row["EMISSÕES_TOTAIS"] / row["POPULAÇÃO"] if row["POPULAÇÃO"] != 0 else 0, axis=1 ) df_final.to_csv(csv_final, index=False) print(f"Dados combinados e salvos em '{csv_final}'.") else: # Se o CSV final já existe, carrega diretamente. df_final = pd.read_csv(csv_final) print(f"Dados combinados carregados de '{csv_final}'.") return df_final # --- Funções Auxiliares para Interação com o Usuário --- def parse_indices_entrada(entrada, max_val): """ Converte uma string de entrada do usuário (ex: "1,3,5" ou "1:3") em uma lista de índices inteiros. Valida os índices para garantir que estejam dentro de um limite permitido. Args: entrada (str): A string de entrada do usuário contendo índices (base 1). Pode ser vazia para selecionar todos. max_val (int): O valor máximo permitido para os índices (total de opções disponíveis). Returns: list: Uma lista de índices (base 0) únicos e ordenados. """ if not entrada.strip(): return list(range(max_val)) # Retorna todos os índices se a entrada for vazia indices = set() # Usa um set para garantir índices únicos for parte in entrada.split(","): parte = parte.strip() if parte.isdigit(): i = int(parte) - 1 # Converte para índice base 0 if 0 <= i < max_val: indices.add(i) return sorted(list(indices)) def parse_anos_indices_entrada(entrada, anos_disponiveis): """ Converte uma string de entrada do usuário (ex: "2010", "1,3", "2010:2015") em uma lista de anos válidos presentes nos dados. Args: entrada (str): A string de entrada do usuário contendo anos ou índices de anos. Pode ser vazia para selecionar todos os anos. anos_disponiveis (list): Uma lista de strings representando os anos disponíveis nas colunas do DataFrame (ex: ['1990', '1991', ...]). Returns: list: Uma lista de strings de anos únicos e ordenados que correspondem aos anos disponíveis. """ if not entrada.strip(): return anos_disponiveis # Retorna todos os anos se a entrada for vazia anos_selecionados = set() partes = entrada.split(",") for parte in partes: parte = parte.strip() if ':' in parte: # Lida com intervalos de anos (ex: "2010:2015") start_str, end_str = parte.split(":") try: start_year = int(start_str.strip()) end_year = int(end_str.strip()) # Garante que o ano inicial seja menor ou igual ao final if start_year > end_year: start_year, end_year = end_year, start_year # Adiciona todos os anos disponíveis dentro do intervalo for ano_disp in anos_disponiveis: if start_year <= int(ano_disp) <= end_year: anos_selecionados.add(ano_disp) except ValueError: continue # Ignora partes inválidas do intervalo else: if parte.isdigit(): # Tenta interpretar como um ano diretamente if parte in anos_disponiveis: anos_selecionados.add(parte) else: # Se não for um ano, tenta como índice (base 1) idx = int(parte) - 1 if 0 <= idx < len(anos_disponiveis): anos_selecionados.add(anos_disponiveis[idx]) else: # Se não for dígito, verifica se é um ano literal (ex: '2000') if parte in anos_disponiveis: anos_selecionados.add(parte) return sorted(list(anos_selecionados)) # --- Funções de Menu e Visualização --- def mostrar_visao_geral(df): """ Exibe uma visão geral dos dados, incluindo população, emissões totais, emissões per capita e a porcentagem de representação das emissões de cada UF. Os dados são ordenados pelas emissões totais de forma decrescente. Args: df (pd.DataFrame): O DataFrame final combinado com os dados. """ print("\n--- Visão Geral das Emissões por UF (2000-2022) ---") df_display = df.copy() # Cria uma cópia para evitar SettingWithCopyWarning # Calcula a porcentagem de representação das emissões totais de cada UF. df_display["% REPRESENTAÇÃO"] = 100 * df_display["EMISSÕES_TOTAIS"] / df_display["EMISSÕES_TOTAIS"].sum() # Ordena o DataFrame pelas emissões totais para facilitar a visualização dos maiores emissores. df_display = df_display.sort_values(by="EMISSÕES_TOTAIS", ascending=False) # Exibe as colunas relevantes com formatação de ponto flutuante. print(df_display[["UF", "COD. UF", "POPULAÇÃO", "EMISSÕES_TOTAIS", "EMISSÕES_PER_CAPITA", "% REPRESENTAÇÃO"]] .to_string(index=False, float_format="%.2f")) def consulta_detalhada(df): """ Permite ao usuário consultar dados detalhados de emissões por UF e ano. O usuário pode selecionar UFs e anos específicos (ou intervalos). Args: df (pd.DataFrame): O DataFrame final combinado com os dados. """ print("\n--- Consulta Detalhada por UF e Ano ---") ufs = df["UF"].unique() # Lista as UFs disponíveis para seleção. print("UFs disponíveis:") for i, uf in enumerate(ufs, 1): print(f"{i}: {uf}", end='\t' if i % 5 != 0 else '\n') # Formata a exibição em colunas print() # Nova linha após a lista de UFs entrada_ufs = input("Digite os números das UF desejadas (ex: 1,3,5 ou ENTER para todas): ") indices_ufs = parse_indices_entrada(entrada_ufs, len(ufs)) ufs_selecionadas = [ufs[i] for i in indices_ufs] anos_disponiveis = sorted([col for col in df.columns if re.match(r'^\d{4}$', col)]) # Lista os anos disponíveis para seleção. print("\nAnos disponíveis:") for i, ano in enumerate(anos_disponiveis, 1): print(f"[{i}] {ano}", end='\t') if i % 6 == 0: print() print() # Nova linha após a lista de anos entrada_anos = input("Digite os anos ou índices (ex: 1,3 ou 2010:2015 ou ENTER para todos): ") anos_selecionados = parse_anos_indices_entrada(entrada_anos, anos_disponiveis) # Filtra o DataFrame pelas UFs selecionadas. df_filtrado = df[df["UF"].isin(ufs_selecionadas)] registros = [] # Itera sobre as UFs e anos selecionados para montar os registros detalhados. for uf in ufs_selecionadas: df_uf = df_filtrado[df_filtrado["UF"] == uf] if df_uf.empty: continue # Pula se a UF não tiver dados (improvável após o merge inner) pop = df_uf["POPULAÇÃO"].values[0] # População é a mesma para todos os anos da UF cod_uf = df_uf["COD. UF"].values[0] for ano_str in anos_selecionados: ano_int = int(ano_str) emissao = df_uf[ano_str].values[0] percapita = emissao / pop if pop else 0 # Evita divisão por zero registros.append({ "UF": uf, "COD. UF": cod_uf, "POPULAÇÃO": pop, "ANO": ano_int, "EMISSÕES": emissao, "EMISSÕES_PER_CAPITA": percapita }) df_result = pd.DataFrame(registros) if df_result.empty: print("Nenhum dado encontrado para a seleção. Tente novamente.") return # Exibe o resultado detalhado. print("\n--- Resultados da Consulta ---") print(df_result.to_string(index=False, float_format="%.2f")) def graficos_estatisticos(df): """ Gera um gráfico de linha mostrando a evolução das emissões de GEE ao longo dos anos para as UFs selecionadas pelo usuário. Inclui uma linha para a média geral das emissões. Args: df (pd.DataFrame): O DataFrame final combinado com os dados. """ print("\n--- Gráficos Estatísticos: Evolução das Emissões por UF ---") ufs = df["UF"].unique() # Lista as UFs disponíveis para seleção. print("UFs disponíveis:") for i, uf in enumerate(ufs, 1): print(f"{i}: {uf}", end='\t' if i % 5 != 0 else '\n') print() entrada_ufs = input("Digite os números das UF desejadas (ex: 1,3,5 ou ENTER para todas): ") indices_ufs = parse_indices_entrada(entrada_ufs, len(ufs)) ufs_selecionadas = [ufs[i] for i in indices_ufs] # Identifica as colunas de ano e converte para tipo inteiro para o gráfico. anos_str = sorted([col for col in df.columns if re.match(r'^\d{4}$', col)]) anos = list(map(int, anos_str)) # Prepara o DataFrame para plotagem: seleciona UFs e anos, e "derrete" (melt) os anos # para um formato longo, adequado para seaborn. df_plot = df[df["UF"].isin(ufs_selecionadas)][["UF"] + anos_str] df_long = df_plot.melt(id_vars="UF", var_name="Ano", value_name="Emissões") df_long["Ano"] = pd.to_numeric(df_long["Ano"]) # Calcula a média geral das emissões para exibir no gráfico. media = df_long["Emissões"].mean() plt.figure(figsize=(14, 8)) # Define o tamanho da figura para melhor visualização # Cria o gráfico de linha usando seaborn, mostrando a evolução das emissões por UF. sns.lineplot(data=df_long, x="Ano", y="Emissões", hue="UF", marker="o", linewidth=2) # Adiciona uma linha horizontal para a média geral. plt.axhline(y=media, color="orange", linestyle="--", label=f"Média: {media:,.2f}") # Adiciona rótulos para as UFs nos seus pontos de maior emissão. for uf in ufs_selecionadas: df_uf = df_long[df_long["UF"] == uf] # Encontra a linha com a emissão máxima para a UF. max_row = df_uf[df_uf["Emissões"] == df_uf["Emissões"].max()] if not max_row.empty: x = max_row["Ano"].values[0] y = max_row["Emissões"].values[0] plt.text(x, y, uf, fontsize=10, fontweight="bold", ha="left", color="black") plt.title("Evolução das Emissões de GEE por UF (ton CO2e)", fontsize=16) plt.xlabel("Ano", fontsize=12) plt.ylabel("Emissões (ton CO2e)", fontsize=12) plt.xticks(anos, rotation=45) # Exibe todos os anos no eixo X e rotaciona para legibilidade plt.ticklabel_format(style='plain', axis='y', useOffset=False) # Garante formato numérico sem notação científica # Formata os ticks do eixo Y para mostrar vírgulas em números grandes formatter = mticker.FormatStrFormatter('%.0f') plt.gca().yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: format(int(x), ','))) plt.grid(True, linestyle="--", alpha=0.5) # Adiciona um grid suave plt.legend(title="UF", bbox_to_anchor=(1.05, 1), loc='upper left') # Move a legenda para fora do gráfico plt.tight_layout() # Ajusta o layout para evitar sobreposição plt.show() def menu_principal(df_final): """ Exibe o menu principal para o usuário e gerencia a navegação entre as opções de análise de dados (visão geral, consulta detalhada, gráficos estatísticos). Args: df_final (pd.DataFrame): O DataFrame final combinado com os dados. """ while True: print("\n--- Menu Principal ---") print("1 - Visão Geral das Emissões") print("2 - Consulta Detalhada por UF e Ano") print("3 - Gráficos Estatísticos de Evolução") print("0 - Sair") opcao = input("Escolha uma opção: ").strip() if opcao == "1": mostrar_visao_geral(df_final) elif opcao == "2": consulta_detalhada(df_final) elif opcao == "3": graficos_estatisticos(df_final) elif opcao == "0": print("Saindo do programa. Até mais!") break else: print("Opção inválida. Por favor, digite um número entre 0 e 3.") # --- Execução Principal do Script --- if __name__ == "__main__": print("Iniciando processamento dos dados...") # Carrega e processa os dados de população. df_pop_agg = carregar_dados_populacao(path_pop, csv_municipios) # Carrega e processa os dados de emissões. df_emissoes_agg = carregar_emissoes(path_emissoes, sheet_emissoes, csv_emissoes) # Combina os dados de população e emissões. df_final = combinar_dados(df_pop_agg, df_emissoes_agg, csv_final) print(f"\nTodos os dados foram carregados e combinados. Total de registros: {len(df_final)} UF(s).") # Inicia o menu interativo para o usuário. menu_principal(df_final)