{ "cells": [ { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "/media/vagner/Seagate Expansion Drive/500Gb/DiskExternoVagner/Cursos/CientistaDados/Modulo_4/DW-OSM-RMSP/sample222.json\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/home/vagner/anaconda2/lib/python2.7/site-packages/ipykernel/__main__.py:42: UnicodeWarning: Unicode equal comparison failed to convert both arguments to Unicode - interpreting them as being unequal\n" ] } ], "source": [ "#!/usr/bin/env python\n", "# -*- coding: utf-8 -*-\n", "import xml.etree.cElementTree as ET\n", "import pprint\n", "import re\n", "import codecs\n", "import json\n", "\n", "\n", "osm_file = open(\"/media/vagner/Seagate Expansion Drive/500Gb/\"+\n", " \"DiskExternoVagner/Cursos/CientistaDados/\"+\n", " \"Modulo_4/DW-OSM-RMSP/sample.osm\", \"r\")\n", "\n", "#file_out = \"/media/vagner/Seagate Expansion Drive/500Gb/DiskExternoVagner/Cursos/CientistaDados/Modulo_4/DW-OSM-RMSP/sample2.json\"\n", "file_out = \"/media/vagner/Seagate Expansion Drive/500Gb/DiskExternoVagner/Cursos/CientistaDados/Modulo_4/DW-OSM-RMSP/sample222.json\"\n", "\n", "street_type_re = re.compile(r'^\\S+\\.?',re.IGNORECASE)\n", "\n", "CREATED = [ \"version\", \"changeset\", \"timestamp\", \"user\", \"uid\"]\n", "\n", "problemchars = re.compile(r'[=\\+/&<>;\\'\"\\?%#$@\\,\\. \\t\\r\\n]')\n", "\n", "mapping = { \"Av\": \"Avenida\",\n", " \"Av.\": \"Avenida\",\n", " \"R\": \"Rua\",\n", " \"R.\": \"Rua\", \n", " \"rua\": \"Rua\",\n", " \"r.\": \"Rua\",\n", " \"r\": \"Rua\",\n", " \"Pr.\": u\"Praça\",\n", " \"PR.\": u\"Praça\",\n", " \"Complexo viário\": u\"Complexo Viário\",\n", " \"Complexo viario\": u\"Complexo Viário\"\n", " }\n", "\n", "def update_name(name, mapping):\n", " #print name\n", " m = street_type_re.search(name)\n", " other_street_types=[]\n", " if m:\n", " street_type = m.group()\n", " if street_type in mapping.keys():\n", " name = re.sub(street_type_re, mapping[street_type], name)\n", " else:\n", " other_street_types.append(street_type)\n", " return name\n", "\n", "def shape_element(element):\n", " node = {}\n", " if element.tag == \"node\" or element.tag == \"way\" :\n", " created = {}\n", " for e in element.attrib.keys():\n", " if e in CREATED:\n", " created[e] = element.attrib[e]\n", " elif element.attrib[e] == element.get('lat') or element.attrib[e] == element.get('lon'):\n", " pos = []\n", " pos.append(float(element.get('lat')))\n", " pos.append(float(element.get('lon')))\n", " node['pos'] = pos\n", " else:\n", " node[e] = element.get(e)\n", " node['type'] = element.tag\n", " node['created'] = created\n", " node_refs = []\n", " address = {}\n", " for subtag in element:\n", " if subtag.tag == 'tag':\n", " if re.search(problemchars, subtag.get('k')):\n", " pass\n", " elif re.search(r'\\w+:\\w+:\\w+', subtag.get('k')):\n", " pass\n", " elif subtag.get('k').startswith('addr:'):\n", " address_type = subtag.get('k')[5:]\n", " if address_type == 'street':\n", " address[address_type] = update_name(subtag.get('v'),mapping)\n", " else:\n", " address[address_type] = subtag.get('v')\n", " node['address'] = address\n", " else:\n", " node[subtag.get('k')] = subtag.get('v')\n", " else:\n", " if subtag.tag == 'nd':\n", " node_refs.append(subtag.get('ref'))\n", " else:\n", " pass\n", " if node_refs:\n", " node['node_refs'] = node_refs\n", " return node\n", " else:\n", " return None\n", "\n", " \n", "def process_map(file_in, pretty = False):\n", " print file_out\n", " data = []\n", " with codecs.open(file_out, \"w\") as fo:\n", " for _, element in ET.iterparse(file_in):\n", " el = shape_element(element)\n", " if el:\n", " data.append(el)\n", " if pretty:\n", " fo.write(json.dumps(el, indent=2)+\"\\n\")\n", " else:\n", " fo.write(json.dumps(el) + \"\\n\")\n", " return data\n", "\n", "def test():\n", " \n", " data = process_map(osm_file, False)\n", " #pprint.pprint(data)\n", " \n", "if __name__ == \"__main__\":\n", " test()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python [default]", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.12" } }, "nbformat": 4, "nbformat_minor": 1 }