{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### Carrega pacotes necessários" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "#import pip\n", "#pip.main(['install', 'elasticsearch'])\n", "# -*- coding: utf-8 -*-\n", "import dataiku\n", "import pandas as pd\n", "import numpy as np\n", "from time import time\n", "import os\n", "import sys\n", "from dataiku import pandasutils as pdu\n", "from elasticsearch import Elasticsearch, helpers\n", "import json" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### Conexão ao ES" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "es = Elasticsearch('http://user:pwd@server_ip:port/')" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "es.info()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### Criação de índice" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "#Exclui índice, se ele existir\n", "indice= \"sim_dss\"\n", "doc_type=\"sim-type\"\n", "try :\n", " es.indices.delete(index=indice)\n", "except :\n", " pass" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "#Definição de tipo para documentos\n", "sim_type = {\n", " \"mappings\":{\n", " 'sim-type': {\n", " 'properties': {\n", " 'NUMERODO':{'type': 'keyword'},\n", " 'TIPOOBITO':{'type': 'keyword'},\n", " 'def_tipo_obito':{'type': 'keyword'},\n", " 'DTOBITO':{'type': 'keyword'},\n", " 'data_obito':{'type': 'keyword'},\n", " 'ano_obito':{'type': 'keyword'},\n", " 'dia_semana_obito':{'type': 'keyword'},\n", " 'NATURAL':{'type': 'keyword'},\n", " 'DTNASC':{'type': 'keyword'},\n", " 'data_nasc':{'type': 'date'},\n", " 'idade_obito_calculado':{'type': 'float'},\n", " 'ano_nasc':{'type': 'integer'},\n", " 'dia_semana_nasc':{'type': 'keyword'},\n", " 'IDADE':{'type': 'keyword'},\n", " 'idade_obito':{'type': 'float'},\n", " 'SEXO':{'type': 'keyword'},\n", " 'def_sexo':{'type': 'keyword'},\n", " 'RACACOR':{'type': 'keyword'},\n", " 'def_raca_cor':{'type': 'keyword'},\n", "\n", " 'ESTCIV':{'type': 'keyword'},\n", " 'def_est_civil':{'type': 'keyword'},\n", "\n", " 'ESC':{'type': 'keyword'},\n", " 'def_escol':{'type': 'keyword'},\n", "\n", " 'OCUP':{'type': 'keyword'},\n", " 'CODBAIRES':{'type': 'keyword'},\n", " 'CODMUNRES':{'type': 'keyword'},\n", " 'LOCOCOR':{'type': 'keyword'},\n", " 'def_loc_ocor':{'type': 'keyword'},\n", "\n", " 'CODMUNOCOR':{'type': 'keyword'},\n", " 'IDADEMAE':{'type': 'keyword'},\n", " 'ESCMAE':{'type': 'keyword'},\n", " 'def_escol_mae':{'type': 'keyword'},\n", "\n", " 'OCUPMAE':{'type': 'keyword'},\n", " 'QTDFILVIVO':{'type': 'keyword'},\n", " 'QTDFILMORT':{'type': 'keyword'},\n", " 'GRAVIDEZ':{'type': 'keyword'},\n", " 'def_gravidez':{'type': 'keyword'},\n", "\n", " 'GESTACAO':{'type': 'keyword'},\n", " 'def_gestacao':{'type': 'keyword'},\n", "\n", " 'PARTO':{'type': 'keyword'},\n", " 'def_parto':{'type': 'keyword'},\n", "\n", " 'OBITOPARTO':{'type': 'keyword'},\n", " 'def_obito_parto':{'type': 'keyword'},\n", "\n", " 'PESO':{'type': 'keyword'},\n", " 'OBITOGRAV':{'type': 'keyword'},\n", " 'def_obito_grav':{'type': 'keyword'},\n", "\n", " 'OBITOPUERP':{'type': 'keyword'},\n", " 'def_obito_puerp':{'type': 'keyword'},\n", "\n", " 'ASSISTMED':{'type': 'keyword'},\n", " 'def_assist_med':{'type': 'keyword'},\n", "\n", " 'EXAME':{'type': 'keyword'},\n", " 'def_exame':{'type': 'keyword'},\n", "\n", " 'CIRURGIA':{'type': 'keyword'},\n", " 'def_cirurgia':{'type': 'keyword'},\n", "\n", " 'NECROPSIA':{'type': 'keyword'},\n", " 'def_necropsia':{'type': 'keyword'},\n", "\n", " 'CAUSABAS':{'type': 'keyword'},\n", " 'LINHAA':{'type': 'keyword'},\n", " 'LINHAB':{'type': 'keyword'},\n", " 'LINHAC':{'type': 'keyword'},\n", " 'LINHAD':{'type': 'keyword'},\n", " 'LINHAII':{'type': 'keyword'},\n", " 'def_circ_obito':{'type': 'keyword'},\n", "\n", " 'def_acid_trab':{'type': 'keyword'},\n", "\n", " 'def_fonte':{'type': 'keyword'},\n", "\n", " 'CODESTAB':{'type': 'keyword'},\n", " 'ATESTANTE':{'type': 'keyword'},\n", " 'UFINFORM':{'type': 'keyword'},\n", " 'HORAOBITO':{'type': 'keyword'},\n", " 'CODBAIOCOR':{'type': 'keyword'},\n", " 'NUMERODN':{'type': 'keyword'},\n", " 'TPASSINA':{'type': 'keyword'},\n", " 'DTATESTADO':{'type': 'keyword'},\n", " 'TPPOS':{'type': 'keyword'},\n", " 'DTINVESTIG':{'type': 'keyword'},\n", " 'CAUSABAS_O':{'type': 'keyword'},\n", " 'DTCADASTRO':{'type': 'keyword'},\n", "\n", " 'FONTEINV':{'type': 'keyword'},\n", " 'DTRECEBIM':{'type': 'keyword'},\n", "\n", " 'CODINST':{'type': 'keyword'},\n", " 'CB_PRE':{'type': 'keyword'},\n", "\n", " 'MORTEPARTO':{'type': 'keyword'},\n", " 'TPOBITOCOR':{'type': 'keyword'},\n", " 'ORIGEM':{'type': 'keyword'},\n", " 'DTCADINF':{'type': 'keyword'},\n", " 'DTCADINV':{'type': 'keyword'},\n", " 'NUMERODV':{'type': 'keyword'},\n", " 'NUMSUS':{'type': 'keyword'},\n", " 'COMUNSVOIM':{'type': 'keyword'},\n", " 'DTRECORIG':{'type': 'keyword'},\n", " 'DTRECORIGA':{'type': 'keyword'},\n", " 'CAUSAMAT':{'type': 'keyword'},\n", " 'ESC2010':{'type': 'keyword'},\n", " 'ESCMAE2010':{'type': 'keyword'},\n", " 'STDOEPIDEM':{'type': 'keyword'},\n", " 'STDONOVA':{'type': 'keyword'},\n", " 'SEMAGESTAC':{'type': 'keyword'},\n", " 'TPMORTEOCO':{'type': 'keyword'},\n", " 'DIFDATA':{'type': 'keyword'},\n", " 'DTCONCASO':{'type': 'keyword'},\n", " 'NUDIASOBIN':{'type': 'keyword'},\n", " 'SERIESCFAL':{'type': 'keyword'},\n", " 'SERIESCMAE':{'type': 'keyword'},\n", " 'CODMUNCART':{'type': 'keyword'},\n", " 'CODCART':{'type': 'keyword'},\n", " 'NUMREGCART':{'type': 'keyword'},\n", " 'DTREGCART':{'type': 'keyword'},\n", " 'DTCONINV':{'type': 'keyword'},\n", " 'CODMUNNATU':{'type': 'keyword'},\n", " 'ESTABDESCR':{'type': 'keyword'},\n", " 'CRM':{'type': 'keyword'},\n", " 'NUMEROLOTE':{'type': 'keyword'},\n", " 'STCODIFICA':{'type': 'keyword'},\n", " 'CODIFICADO':{'type': 'keyword'},\n", " 'VERSAOSIST':{'type': 'keyword'},\n", " 'VERSAOSCB':{'type': 'keyword'},\n", " 'ATESTADO':{'type': 'keyword'},\n", " 'ESCMAEAGR1':{'type': 'keyword'},\n", " 'ESCFALAGR1':{'type': 'keyword'},\n", " 'NUDIASOBCO':{'type': 'keyword'},\n", " 'FONTES':{'type': 'keyword'},\n", " 'TPRESGINFO':{'type': 'keyword'},\n", " 'TPNIVELINV':{'type': 'keyword'},\n", " 'NUDIASINF':{'type': 'keyword'},\n", " 'FONTESINF':{'type': 'keyword'},\n", " 'ALTCAUSA':{'type': 'keyword'},\n", "\n", " 'res_MUNNOME':{'type': 'keyword'},\n", " 'res_MUNNOMEX':{'type': 'keyword'},\n", " 'res_CAPITAL':{'type': 'keyword'},\n", " 'res_FRONTEIRA':{'type': 'keyword'},\n", " 'res_AMAZONIA':{'type': 'keyword'},\n", " 'res_LATITUDE':{'type': 'float'},\n", " 'res_LONGITUDE':{'type': 'float'},\n", " 'res_ALTITUDE':{'type': 'float'},\n", " 'res_AREA':{'type': 'float'},\n", " 'res_codigo_adotado':{'type': 'keyword'},\n", " 'res_SIGLA_UF':{'type': 'keyword'},\n", " 'res_CODIGO_UF':{'type': 'keyword'},\n", " 'res_NOME_UF':{'type': 'keyword'},\n", "\n", " 'res_MSAUDCOD':{'type': 'keyword'},\n", " 'res_RSAUDCOD':{'type': 'keyword'},\n", " 'res_CSAUDCOD':{'type': 'keyword'},\n", "\n", " 'ocor_MUNNOME':{'type': 'keyword'},\n", " 'ocor_MUNNOMEX':{'type': 'keyword'},\n", " 'ocor_CAPITAL':{'type': 'keyword'},\n", " 'ocor_FRONTEIRA':{'type': 'keyword'},\n", " 'ocor_AMAZONIA':{'type': 'keyword'},\n", " 'ocor_LATITUDE':{'type': 'float'},\n", " 'ocor_LONGITUDE':{'type': 'float'},\n", " 'ocor_ALTITUDE':{'type': 'float'},\n", " 'ocor_AREA':{'type': 'float'},\n", " 'ocor_codigo_adotado':{'type': 'keyword'},\n", " 'ocor_SIGLA_UF':{'type': 'keyword'},\n", " 'ocor_CODIGO_UF':{'type': 'keyword'},\n", " 'ocor_NOME_UF':{'type': 'keyword'},\n", "\n", " 'ocor_MSAUDCOD':{'type': 'keyword'},\n", " 'ocor_RSAUDCOD':{'type': 'keyword'},\n", " 'ocor_CSAUDCOD':{'type': 'keyword'},\n", "\n", " 'ocor_coordenadas' : {'type': 'geo_point'},\n", " 'res_coordenadas' : {'type': 'geo_point'},\n", "\n", " 'causabas_capitulo':{'type': 'keyword'},\n", " 'causabas_grupo':{'type': 'keyword'},\n", " 'causabas_categoria':{'type': 'keyword'},\n", " 'causabas_subcategoria':{'type': 'keyword'},\n", " }\n", " }\n", " }\n", " }" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "#cria índice no Elasticsearch\n", "es.indices.create(index=indice,body=sim_type)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### Carrega dados preparados/transformados do Dataiku" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "data_prepared = dataiku.Dataset(\"DORES_preparados\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### Recupera métricas (record_count) do dataset no Dataiku" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "def get_metric(project_name,dataset_name,metric_ids):\n", " client = dataiku.api_client()\n", " current_project = client.get_project(project_name)\n", " dataset = current_project.get_dataset(dataset_name)\n", " metrics = dataset.compute_metrics(partition='ALL', metric_ids=metric_ids)\n", " metrics = [{'metric':m[\"metricId\"],'value':int(m[\"value\"])} for m in metrics[\"result\"][\"computed\"] if m[\"metricId\"] in metric_ids][0]\n", "\n", " return metrics" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "def record_count(project_name,dataset_name):\n", " return get_metric(project_name,dataset_name,['records:COUNT_RECORDS'])['value']" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "nrows = record_count('MERGESIM','DORES_preparados')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### Gera json para indexação" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "def geraJson(df):\n", " return json.loads(df.T.to_json())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Indexação em lote" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "#tamanho do chunk\n", "chunksize = 10000\n", "\n", "#número total de chunks a serem indexados\n", "nchunks = nrows/chunksize\n", "\n", "#imprime o número total de documentos a serem indexados\n", "print(\"Documentos: %i\\n\"%nrows)\n", "\n", "res_bulk=[]\n", "\n", "for chunk,df in enumerate(data_prepared.iter_dataframes(chunksize=chunksize)):\n", "\n", " #gera o json do chunk de dados atual (formato pronto para indexação)\n", " data_json = geraJson(df)\n", "\n", " #imprime o número do chunk atual e o total de chunks a serem indexados\n", " print(\"Chunk: %i/%i\"%(chunk,nchunks))\n", "\n", " #cria lista de ações para indexação de cada documento do chunk atual\n", " lista=[]\n", " for i, item in enumerate(data_json.values()):\n", " data_dict = {\n", " '_op_type': 'index',\n", " '_index': indice,\n", " '_type': doc_type,\n", " '_source': item\n", " }\n", " lista.append(data_dict)\n", "\n", " #indexa todos os documentos do chunk atual (bulk indexa em chunks)\n", " res = helpers.bulk(client=es, actions=lista, chunk_size=1000, raise_on_error=False, raise_on_exception = False)\n", " res_bulk.append(res)\n", "\n", " print(res)" ] }, { "cell_type": "code", "execution_count": 0, "metadata": {}, "outputs": [], "source": [ "res_df = pd.DataFrame(res_bulk)\n", "res_df.columns = ['indexed_chunksize', 'errors']\n", "\n", "# Write recipe outputs\n", "res_Elasticsearch = dataiku.Dataset(\"bulk_elasticsearch\")\n", "res_Elasticsearch.write_with_schema(res_df)" ] } ], "metadata": { "associatedRecipe": "compute_DORJ2000-2014_from_DBC_prepared_Elasticsearch", "creator": "admin", "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.12" }, "tags": [ "recipe-editor" ] }, "nbformat": 4, "nbformat_minor": 1 }