{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "### Carrega pacotes necessários"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "#import pip\n",
    "#pip.main(['install', 'elasticsearch'])\n",
    "# -*- coding: utf-8 -*-\n",
    "import dataiku\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "from time import time\n",
    "import os\n",
    "import sys\n",
    "from dataiku import pandasutils as pdu\n",
    "from elasticsearch import Elasticsearch, helpers\n",
    "import json"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "### Conexão ao ES"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "es = Elasticsearch('http://user:pwd@server_ip:port/')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "es.info()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "### Criação de índice"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Exclui índice, se ele existir\n",
    "indice= \"sim_dss\"\n",
    "doc_type=\"sim-type\"\n",
    "try :\n",
    "    es.indices.delete(index=indice)\n",
    "except :\n",
    "    pass"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Definição de tipo para documentos\n",
    "sim_type = {\n",
    "                \"mappings\":{\n",
    "                    'sim-type': {\n",
    "                        'properties': {\n",
    "                            'NUMERODO':{'type': 'keyword'},\n",
    "                            'TIPOOBITO':{'type': 'keyword'},\n",
    "                            'def_tipo_obito':{'type': 'keyword'},\n",
    "                            'DTOBITO':{'type': 'keyword'},\n",
    "                            'data_obito':{'type': 'keyword'},\n",
    "                            'ano_obito':{'type': 'keyword'},\n",
    "                            'dia_semana_obito':{'type': 'keyword'},\n",
    "                            'NATURAL':{'type': 'keyword'},\n",
    "                            'DTNASC':{'type': 'keyword'},\n",
    "                            'data_nasc':{'type': 'date'},\n",
    "                            'idade_obito_calculado':{'type': 'float'},\n",
    "                            'ano_nasc':{'type': 'integer'},\n",
    "                            'dia_semana_nasc':{'type': 'keyword'},\n",
    "                            'IDADE':{'type': 'keyword'},\n",
    "                            'idade_obito':{'type': 'float'},\n",
    "                            'SEXO':{'type': 'keyword'},\n",
    "                            'def_sexo':{'type': 'keyword'},\n",
    "                            'RACACOR':{'type': 'keyword'},\n",
    "                            'def_raca_cor':{'type': 'keyword'},\n",
    "\n",
    "                            'ESTCIV':{'type': 'keyword'},\n",
    "                            'def_est_civil':{'type': 'keyword'},\n",
    "\n",
    "                            'ESC':{'type': 'keyword'},\n",
    "                            'def_escol':{'type': 'keyword'},\n",
    "\n",
    "                            'OCUP':{'type': 'keyword'},\n",
    "                            'CODBAIRES':{'type': 'keyword'},\n",
    "                            'CODMUNRES':{'type': 'keyword'},\n",
    "                            'LOCOCOR':{'type': 'keyword'},\n",
    "                            'def_loc_ocor':{'type': 'keyword'},\n",
    "\n",
    "                            'CODMUNOCOR':{'type': 'keyword'},\n",
    "                            'IDADEMAE':{'type': 'keyword'},\n",
    "                            'ESCMAE':{'type': 'keyword'},\n",
    "                            'def_escol_mae':{'type': 'keyword'},\n",
    "\n",
    "                            'OCUPMAE':{'type': 'keyword'},\n",
    "                            'QTDFILVIVO':{'type': 'keyword'},\n",
    "                            'QTDFILMORT':{'type': 'keyword'},\n",
    "                            'GRAVIDEZ':{'type': 'keyword'},\n",
    "                            'def_gravidez':{'type': 'keyword'},\n",
    "\n",
    "                            'GESTACAO':{'type': 'keyword'},\n",
    "                            'def_gestacao':{'type': 'keyword'},\n",
    "\n",
    "                            'PARTO':{'type': 'keyword'},\n",
    "                            'def_parto':{'type': 'keyword'},\n",
    "\n",
    "                            'OBITOPARTO':{'type': 'keyword'},\n",
    "                            'def_obito_parto':{'type': 'keyword'},\n",
    "\n",
    "                            'PESO':{'type': 'keyword'},\n",
    "                            'OBITOGRAV':{'type': 'keyword'},\n",
    "                            'def_obito_grav':{'type': 'keyword'},\n",
    "\n",
    "                            'OBITOPUERP':{'type': 'keyword'},\n",
    "                            'def_obito_puerp':{'type': 'keyword'},\n",
    "\n",
    "                            'ASSISTMED':{'type': 'keyword'},\n",
    "                            'def_assist_med':{'type': 'keyword'},\n",
    "\n",
    "                            'EXAME':{'type': 'keyword'},\n",
    "                            'def_exame':{'type': 'keyword'},\n",
    "\n",
    "                            'CIRURGIA':{'type': 'keyword'},\n",
    "                            'def_cirurgia':{'type': 'keyword'},\n",
    "\n",
    "                            'NECROPSIA':{'type': 'keyword'},\n",
    "                            'def_necropsia':{'type': 'keyword'},\n",
    "\n",
    "                            'CAUSABAS':{'type': 'keyword'},\n",
    "                            'LINHAA':{'type': 'keyword'},\n",
    "                            'LINHAB':{'type': 'keyword'},\n",
    "                            'LINHAC':{'type': 'keyword'},\n",
    "                            'LINHAD':{'type': 'keyword'},\n",
    "                            'LINHAII':{'type': 'keyword'},\n",
    "                            'def_circ_obito':{'type': 'keyword'},\n",
    "\n",
    "                            'def_acid_trab':{'type': 'keyword'},\n",
    "\n",
    "                            'def_fonte':{'type': 'keyword'},\n",
    "\n",
    "                            'CODESTAB':{'type': 'keyword'},\n",
    "                            'ATESTANTE':{'type': 'keyword'},\n",
    "                            'UFINFORM':{'type': 'keyword'},\n",
    "                            'HORAOBITO':{'type': 'keyword'},\n",
    "                            'CODBAIOCOR':{'type': 'keyword'},\n",
    "                            'NUMERODN':{'type': 'keyword'},\n",
    "                            'TPASSINA':{'type': 'keyword'},\n",
    "                            'DTATESTADO':{'type': 'keyword'},\n",
    "                            'TPPOS':{'type': 'keyword'},\n",
    "                            'DTINVESTIG':{'type': 'keyword'},\n",
    "                            'CAUSABAS_O':{'type': 'keyword'},\n",
    "                            'DTCADASTRO':{'type': 'keyword'},\n",
    "\n",
    "                            'FONTEINV':{'type': 'keyword'},\n",
    "                            'DTRECEBIM':{'type': 'keyword'},\n",
    "\n",
    "                            'CODINST':{'type': 'keyword'},\n",
    "                            'CB_PRE':{'type': 'keyword'},\n",
    "\n",
    "                            'MORTEPARTO':{'type': 'keyword'},\n",
    "                            'TPOBITOCOR':{'type': 'keyword'},\n",
    "                            'ORIGEM':{'type': 'keyword'},\n",
    "                            'DTCADINF':{'type': 'keyword'},\n",
    "                            'DTCADINV':{'type': 'keyword'},\n",
    "                            'NUMERODV':{'type': 'keyword'},\n",
    "                            'NUMSUS':{'type': 'keyword'},\n",
    "                            'COMUNSVOIM':{'type': 'keyword'},\n",
    "                            'DTRECORIG':{'type': 'keyword'},\n",
    "                            'DTRECORIGA':{'type': 'keyword'},\n",
    "                            'CAUSAMAT':{'type': 'keyword'},\n",
    "                            'ESC2010':{'type': 'keyword'},\n",
    "                            'ESCMAE2010':{'type': 'keyword'},\n",
    "                            'STDOEPIDEM':{'type': 'keyword'},\n",
    "                            'STDONOVA':{'type': 'keyword'},\n",
    "                            'SEMAGESTAC':{'type': 'keyword'},\n",
    "                            'TPMORTEOCO':{'type': 'keyword'},\n",
    "                            'DIFDATA':{'type': 'keyword'},\n",
    "                            'DTCONCASO':{'type': 'keyword'},\n",
    "                            'NUDIASOBIN':{'type': 'keyword'},\n",
    "                            'SERIESCFAL':{'type': 'keyword'},\n",
    "                            'SERIESCMAE':{'type': 'keyword'},\n",
    "                            'CODMUNCART':{'type': 'keyword'},\n",
    "                            'CODCART':{'type': 'keyword'},\n",
    "                            'NUMREGCART':{'type': 'keyword'},\n",
    "                            'DTREGCART':{'type': 'keyword'},\n",
    "                            'DTCONINV':{'type': 'keyword'},\n",
    "                            'CODMUNNATU':{'type': 'keyword'},\n",
    "                            'ESTABDESCR':{'type': 'keyword'},\n",
    "                            'CRM':{'type': 'keyword'},\n",
    "                            'NUMEROLOTE':{'type': 'keyword'},\n",
    "                            'STCODIFICA':{'type': 'keyword'},\n",
    "                            'CODIFICADO':{'type': 'keyword'},\n",
    "                            'VERSAOSIST':{'type': 'keyword'},\n",
    "                            'VERSAOSCB':{'type': 'keyword'},\n",
    "                            'ATESTADO':{'type': 'keyword'},\n",
    "                            'ESCMAEAGR1':{'type': 'keyword'},\n",
    "                            'ESCFALAGR1':{'type': 'keyword'},\n",
    "                            'NUDIASOBCO':{'type': 'keyword'},\n",
    "                            'FONTES':{'type': 'keyword'},\n",
    "                            'TPRESGINFO':{'type': 'keyword'},\n",
    "                            'TPNIVELINV':{'type': 'keyword'},\n",
    "                            'NUDIASINF':{'type': 'keyword'},\n",
    "                            'FONTESINF':{'type': 'keyword'},\n",
    "                            'ALTCAUSA':{'type': 'keyword'},\n",
    "\n",
    "                            'res_MUNNOME':{'type': 'keyword'},\n",
    "                            'res_MUNNOMEX':{'type': 'keyword'},\n",
    "                            'res_CAPITAL':{'type': 'keyword'},\n",
    "                            'res_FRONTEIRA':{'type': 'keyword'},\n",
    "                            'res_AMAZONIA':{'type': 'keyword'},\n",
    "                            'res_LATITUDE':{'type': 'float'},\n",
    "                            'res_LONGITUDE':{'type': 'float'},\n",
    "                            'res_ALTITUDE':{'type': 'float'},\n",
    "                            'res_AREA':{'type': 'float'},\n",
    "                            'res_codigo_adotado':{'type': 'keyword'},\n",
    "                            'res_SIGLA_UF':{'type': 'keyword'},\n",
    "                            'res_CODIGO_UF':{'type': 'keyword'},\n",
    "                            'res_NOME_UF':{'type': 'keyword'},\n",
    "\n",
    "                            'res_MSAUDCOD':{'type': 'keyword'},\n",
    "                            'res_RSAUDCOD':{'type': 'keyword'},\n",
    "                            'res_CSAUDCOD':{'type': 'keyword'},\n",
    "\n",
    "                            'ocor_MUNNOME':{'type': 'keyword'},\n",
    "                            'ocor_MUNNOMEX':{'type': 'keyword'},\n",
    "                            'ocor_CAPITAL':{'type': 'keyword'},\n",
    "                            'ocor_FRONTEIRA':{'type': 'keyword'},\n",
    "                            'ocor_AMAZONIA':{'type': 'keyword'},\n",
    "                            'ocor_LATITUDE':{'type': 'float'},\n",
    "                            'ocor_LONGITUDE':{'type': 'float'},\n",
    "                            'ocor_ALTITUDE':{'type': 'float'},\n",
    "                            'ocor_AREA':{'type': 'float'},\n",
    "                            'ocor_codigo_adotado':{'type': 'keyword'},\n",
    "                            'ocor_SIGLA_UF':{'type': 'keyword'},\n",
    "                            'ocor_CODIGO_UF':{'type': 'keyword'},\n",
    "                            'ocor_NOME_UF':{'type': 'keyword'},\n",
    "\n",
    "                            'ocor_MSAUDCOD':{'type': 'keyword'},\n",
    "                            'ocor_RSAUDCOD':{'type': 'keyword'},\n",
    "                            'ocor_CSAUDCOD':{'type': 'keyword'},\n",
    "\n",
    "                            'ocor_coordenadas' : {'type': 'geo_point'},\n",
    "                            'res_coordenadas' : {'type': 'geo_point'},\n",
    "\n",
    "                            'causabas_capitulo':{'type': 'keyword'},\n",
    "                            'causabas_grupo':{'type': 'keyword'},\n",
    "                            'causabas_categoria':{'type': 'keyword'},\n",
    "                            'causabas_subcategoria':{'type': 'keyword'},\n",
    "                        }\n",
    "                    }\n",
    "                }\n",
    "            }"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "#cria índice no Elasticsearch\n",
    "es.indices.create(index=indice,body=sim_type)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "### Carrega dados preparados/transformados do Dataiku"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "data_prepared = dataiku.Dataset(\"DORES_preparados\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "### Recupera métricas (record_count) do dataset no Dataiku"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_metric(project_name,dataset_name,metric_ids):\n",
    "    client = dataiku.api_client()\n",
    "    current_project = client.get_project(project_name)\n",
    "    dataset = current_project.get_dataset(dataset_name)\n",
    "    metrics = dataset.compute_metrics(partition='ALL', metric_ids=metric_ids)\n",
    "    metrics = [{'metric':m[\"metricId\"],'value':int(m[\"value\"])} for m in metrics[\"result\"][\"computed\"] if m[\"metricId\"] in metric_ids][0]\n",
    "\n",
    "    return metrics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "def record_count(project_name,dataset_name):\n",
    "    return get_metric(project_name,dataset_name,['records:COUNT_RECORDS'])['value']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "nrows = record_count('MERGESIM','DORES_preparados')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "### Gera json para indexação"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "def geraJson(df):\n",
    "    return json.loads(df.T.to_json())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Indexação em lote"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tamanho do chunk\n",
    "chunksize = 10000\n",
    "\n",
    "#número total de chunks a serem indexados\n",
    "nchunks = nrows/chunksize\n",
    "\n",
    "#imprime o número total de documentos a serem indexados\n",
    "print(\"Documentos: %i\\n\"%nrows)\n",
    "\n",
    "res_bulk=[]\n",
    "\n",
    "for chunk,df in enumerate(data_prepared.iter_dataframes(chunksize=chunksize)):\n",
    "\n",
    "    #gera o json do chunk de dados atual (formato pronto para indexação)\n",
    "    data_json = geraJson(df)\n",
    "\n",
    "    #imprime o número do chunk atual e o total de chunks a serem indexados\n",
    "    print(\"Chunk: %i/%i\"%(chunk,nchunks))\n",
    "\n",
    "    #cria lista de ações para indexação de cada documento do chunk atual\n",
    "    lista=[]\n",
    "    for i, item in enumerate(data_json.values()):\n",
    "        data_dict = {\n",
    "            '_op_type': 'index',\n",
    "            '_index': indice,\n",
    "            '_type': doc_type,\n",
    "            '_source': item\n",
    "        }\n",
    "        lista.append(data_dict)\n",
    "\n",
    "    #indexa todos os documentos do chunk atual (bulk indexa em chunks)\n",
    "    res = helpers.bulk(client=es, actions=lista, chunk_size=1000, raise_on_error=False, raise_on_exception = False)\n",
    "    res_bulk.append(res)\n",
    "\n",
    "    print(res)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {},
   "outputs": [],
   "source": [
    "res_df = pd.DataFrame(res_bulk)\n",
    "res_df.columns = ['indexed_chunksize', 'errors']\n",
    "\n",
    "# Write recipe outputs\n",
    "res_Elasticsearch = dataiku.Dataset(\"bulk_elasticsearch\")\n",
    "res_Elasticsearch.write_with_schema(res_df)"
   ]
  }
 ],
 "metadata": {
  "associatedRecipe": "compute_DORJ2000-2014_from_DBC_prepared_Elasticsearch",
  "creator": "admin",
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  },
  "tags": [
   "recipe-editor"
  ]
 },
 "nbformat": 4,
 "nbformat_minor": 1
}