{ "cells": [ { "cell_type": "code", "execution_count": 17, "metadata": { "tags": [ "parameters" ] }, "outputs": [], "source": [ "# Generate triples parameter\n", "\n", "#kgtk_path takes in the directory which contains the kgtk subgraph\n", "kgtk_path = '/Users/amandeep/Documents/kypher/wikidata_os_v5'\n", "kgtk_file_name = 'all_and_qualifiers.sorted.tsv.gz'\n", "triple_filename= 'all.ttl'\n", "triple_generation_log = 'triple_generation_log.txt'\n", "properties_file_path = f'{kgtk_path}/all.metadata.property.datatypes.tsv.gz'\n", "\n", "# Load triples to blazegraph\n", "wikibase_ui_port = '10001'\n", "wikibase_sparql_port = '10002'\n", "wikibase_proxy_port = '10003'\n", "wikibase_qs_port = '10005'\n", "wikibase_volume = '.'\n", "docker_name = 'blazegraphpipeline'\n", "create_new = True\n", "stop_docker = \"No\"\n", "blazegraph_image = 'wikibase/wdqs:0.3.10'\n", "ttl_path = ''\n", "query_service_name = 'ISI SPARQL Query Service'\n", "\n", "#Parameterize whether you want to run just the generate_wikidata_triples part or loading to blazegraph part\n", "gen_triples = True\n", "load_triples = True\n", "\n", "#Create new image\n", "create_image = True\n", "image_tag = 'blazegraph_image'\n", "dockerfile_path = './0.3.10/'" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "import os\n", "import re\n", "import subprocess\n", "import gzip\n", "import subprocess\n", "import socket\n", "import sys\n", "import shutil\n", "import time\n", "import glob\n", "import json\n", "from IPython.display import display, Markdown, HTML\n", "from pathlib import Path\n", "\n", "wikibase_volume = f'{kgtk_path}/docker_volume'\n", "\n", "Path(wikibase_volume).mkdir(parents=True, exist_ok=True)\n", "\n", "input_file_path = f'{kgtk_path}/{kgtk_file_name}'" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "'''\n", "Utility class to print stuff in Bold.\n", "'''\n", "class color:\n", " PURPLE = '\\033[95m'\n", " CYAN = '\\033[96m'\n", " DARKCYAN = '\\033[36m'\n", " BLUE = '\\033[94m'\n", " GREEN = '\\033[92m'\n", " YELLOW = '\\033[93m'\n", " RED = '\\033[91m'\n", " BOLD = '\\033[1m'\n", " UNDERLINE = '\\033[4m'\n", " END = '\\033[0m'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate Wikidata triples" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[1m------------Head of the KGTK edge file-------------\u001b[0m\n", "\n", "id\tnode1\tlabel\tnode2\n", "P10-P1628-32b85d-7927ece6-0\tP10\tP1628\t\"http://www.w3.org/2006/vcard/ns#Video\"\n", "P10-P1628-acf60d-b8950832-0\tP10\tP1628\t\"https://schema.org/video\"\n", "P10-P1629-Q34508-bcc39400-0\tP10\tP1629\tQ34508\n", "P10-P1659-P1651-c4068028-0\tP10\tP1659\tP1651\n", "P10-P1659-P18-5e4b9c4f-0\tP10\tP1659\tP18\n", "P10-P1659-P4238-d21d1ac0-0\tP10\tP1659\tP4238\n", "P10-P1659-P51-86aca4c5-0\tP10\tP1659\tP51\n", "P10-P1855-Q15075950-7eff6d65-0\tP10\tP1855\tQ15075950\n", "P10-P1855-Q15075950-7eff6d65-0-P10-54b214-0\tP10-P1855-Q15075950-7eff6d65-0\tP10\t\"Smoorverliefd 12 september.webm\"\n", "P10-P1855-Q15075950-7eff6d65-0-P3831-Q622550-0\tP10-P1855-Q15075950-7eff6d65-0\tP3831\tQ622550\n", "P10-P1855-Q69063653-c8cdb04c-0\tP10\tP1855\tQ69063653\n", "P10-P1855-Q69063653-c8cdb04c-0-P10-6fb08f-0\tP10-P1855-Q69063653-c8cdb04c-0\tP10\t\"Couch Commander.webm\"\n", "P10-P1855-Q7378-555592a4-0\tP10\tP1855\tQ7378\n", "P10-P1855-Q7378-555592a4-0-P10-8a982d-0\tP10-P1855-Q7378-555592a4-0\tP10\t\"Elephants Dream (2006).webm\"\n", "P10-P2302-Q21502404-d012aef4-0\tP10\tP2302\tQ21502404\n", "P10-P2302-Q21502404-d012aef4-0-P1793-f4c2ed-0\tP10-P2302-Q21502404-d012aef4-0\tP1793\t\"(?i).+\\\\\\\\.(webm\\\\|ogv\\\\|ogg\\\\|gif)\"\n", "P10-P2302-Q21502404-d012aef4-0-P2316-Q21502408-0\tP10-P2302-Q21502404-d012aef4-0\tP2316\tQ21502408\n", "P10-P2302-Q21502404-d012aef4-0-P2916-cb0917-0\tP10-P2302-Q21502404-d012aef4-0\tP2916\t'filename with extension: webm, ogg, ogv, or gif (case insensitive)'@en\n", "P10-P2302-Q21510851-5224fe0b-0\tP10\tP2302\tQ21510851\n", "gzcat: error writing to output: Broken pipe\n", "gzcat: /Users/amandeep/Documents/kypher/wikidata_os_v5/all_and_qualifiers.sorted.tsv.gz: uncompress failed\n", "gzip: can't stat: triple_output_save_path (triple_output_save_path): No such file or directory\n", "\n", "\u001b[1mThe triple file is generated and saved at:\u001b[0m /Users/amandeep/Documents/kypher/wikidata_os_v5/all.ttl.gz\n", "\n", "\u001b[1m------------Head of the triple file-------------\u001b[0m\n", "\n", "gzcat: can't stat: /Users/amandeep/Documents/kypher/wikidata_os_v5/all.ttl.gz (/Users/amandeep/Documents/kypher/wikidata_os_v5/all.ttl.gz.gz): No such file or directory\n" ] } ], "source": [ "##generate_wikidata_triples\n", "#Run only generate triples\n", "'''\n", "1. This cell will run only if you just want to generate triples aligned to wikidata schema. \n", "It will first concatenate the KGTK edge files and then will use the KGTK generate_wikidata_triples \n", "command to generate triples for the concatenated file.\n", "\n", "2. The generate_wikidata_triples takes in the properities file path as a parameter. \n", "The properties file should have the data_type mentioned for each of the property used in the KGTK edge file.\n", "\n", "3. The generated triple file is then gzipped.\n", "'''\n", "\n", "if gen_triples:\n", " print(color.BOLD + '------------Head of the KGTK edge file-------------' + color.END)\n", " \n", " print()\n", " \n", " !gzcat $input_file_path | head -n 20\n", "\n", " triple_output_save_path = os.path.join(kgtk_path,triple_filename) # Name of the output triple file\n", " log_save_path = os.path.join(kgtk_path,triple_generation_log) # Name of the log file\n", " \n", " # generate the triples\n", " !kgtk generate_wikidata_triples -i $input_file_path \\\n", " -ap alias -lp label -dp description \\\n", " -pf $properties_file_path \\\n", " -n 1000 \\\n", " --debug \\\n", " -gt yes -gz yes -w yes \\\n", " -log $log_save_path > $triple_output_save_path\n" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "\u001b[1mThe triple file is generated and saved at:\u001b[0m /Users/amandeep/Documents/kypher/wikidata_os_v5/all.ttl.gz\n", "\n", "\u001b[1m------------Head of the triple file-------------\u001b[0m\n", "\n", "@prefix wikibase: .\n", "@prefix wd: .\n", "@prefix wdt: .\n", "@prefix wdtn: .\n", "@prefix wdno: .\n", "@prefix wds: .\n", "@prefix wdv: .\n", "@prefix wdref: .\n", "@prefix p: .\n", "@prefix pr: .\n", "@prefix prv: .\n", "@prefix prn: .\n", "@prefix ps: .\n", "@prefix psv: .\n", "@prefix psn: .\n", "@prefix pq: .\n", "@prefix pqv: .\n", "@prefix pqn: .\n", "@prefix prov: .\n", "@prefix skos: .\n", "gzcat: error writing to output: Broken pipe\n", "gzcat: /Users/amandeep/Documents/kypher/wikidata_os_v5/all.ttl.gz: uncompress failed\n" ] } ], "source": [ "# gzip the triple file\n", "!gzip -f $triple_output_save_path\n", "\n", "print()\n", "\n", "print(color.BOLD + 'The triple file is generated and saved at:' + color.END,end = ' ')\n", "print(triple_output_save_path + '.gz') \n", "triple_path = triple_output_save_path + '.gz'\n", "\n", "print()\n", "\n", "print(color.BOLD + '------------Head of the triple file-------------' + color.END)\n", "\n", "print()\n", "\n", "!gzcat -cd $triple_path | head -n 20" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load Triples" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if create_image:\n", " subprocess.call(['docker','build','-t','{}'.format(image_tag),'{}'.format(dockerfile_path)])\n", " blazegraph_image = image_tag" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "# Exception Functions\n", "class PortInUseError(BaseException):\n", " \"\"\"\n", " Exception class for generating error if the passed ports are already in use.\n", " \n", " \"\"\"\n", " def __init__(self,value):\n", " self.value = value\n", "\n", "\n", "class DockerNameInUse(BaseException):\n", " \"\"\"\n", " Exception class for generating error if the passed Docker Name is already in use.\n", " \n", " \"\"\"\n", " def __init__(self,value):\n", " self.value = value\n" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "class BlazegraphLoad():\n", " '''\n", " The class is used to create a new or use an existing wikibase-docker instance to load \n", " a given gzipped ttl file to a blazegraph triple store.\n", " '''\n", " def __init__(self,ttl_path,wikibase_ui_port,wikibase_sparql,wikibase_proxy,wikibase_qs,wikibase_volume,\n", " create_new,docker_name,stop_docker,blazegraph_image,query_service_name):\n", " '''\n", " Initializing the class variables and Setting the environment variables \n", " that will be used by the docker-compose.pipeline.yml file.\n", " \n", " '''\n", " self.ttl_path = ttl_path\n", " self.wikibase_ui_port = str(wikibase_ui_port)\n", " self.wikibase_sparql = str(wikibase_sparql)\n", " self.wikibase_proxy = str(wikibase_proxy)\n", " self.wikibase_qs = str(wikibase_qs)\n", " self.wikibase_volume = wikibase_volume\n", " self.create_new = create_new\n", " self.docker_name = docker_name\n", " self.stop_docker = stop_docker\n", " self.blazegraph_image = blazegraph_image\n", " self.query_service_name = query_service_name\n", " os.environ['WIKIBASE_UI'] = self.wikibase_ui_port\n", " os.environ['WIKIBASE_SPARQL'] = self.wikibase_sparql\n", " os.environ['WIKIBASE_PROXY'] = self.wikibase_proxy\n", " os.environ['WIKIBASE_QS'] = self.wikibase_qs\n", " os.environ['WIKIBASE_VOLUME'] = self.wikibase_volume\n", " os.environ['BLAZEGRAPH_IMAGE'] = self.blazegraph_image\n", " os.environ['QUERY_SERVICE_NAME'] = self.query_service_name\n", "\n", " @staticmethod\n", " def check_availability():\n", " '''\n", " 1. The function checks whether the passed ports are available or not. If anyone of the passed port\n", " is not available, then it will generate an error.\n", " 2. The functions also checks if the passed docker name is available or not. If the docker name is \n", " already in use it will generate an error.\n", " '''\n", "\n", " wikibase_ui = os.getenv('WIKIBASE_UI')\n", " wikibase_sparql = os.getenv('WIKIBASE_SPARQL')\n", " wikibase_proxy = os.getenv('WIKIBASE_PROXY')\n", " wikibase_qs = os.getenv('WIKIBASE_QS')\n", " with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:\n", " wikibase_ui_usage = s.connect_ex(('localhost', int(wikibase_ui))) == 0\n", " wikibase_sparql_usage = s.connect_ex(('localhost', int(wikibase_sparql))) == 0\n", " wikibase_proxy_usage = s.connect_ex(('localhost', int(wikibase_proxy))) == 0\n", " wikibase_qs_usage = s.connect_ex(('localhost', int(wikibase_qs))) == 0\n", " docker_name_availability = subprocess.Popen(['docker', 'ps', '--filter', 'name={}'.format(docker_name)],\n", " stdin=subprocess.PIPE, stdout=subprocess.PIPE)\n", " try:\n", " if create_new:\n", " if wikibase_ui_usage:\n", " raise PortInUseError('Wikibase UI Port is in use')\n", " if wikibase_sparql_usage:\n", " raise PortInUseError('Wikibase Sparql Port is in use')\n", " if wikibase_proxy_usage:\n", " raise PortInUseError('Wikibase Proxy Port is in use')\n", " if wikibase_qs_usage:\n", " raise PortInUseError('Wikibase QS Port is in use')\n", " if len(docker_name_availability.communicate()[0]) > 126:\n", " raise DockerNameInUse('Try changing docker container name')\n", " print(docker_name_availability)\n", " except PortInUseError as Argument:\n", " raise ('Error Message:', Argument)\n", " sys.exit(1)\n", "\n", " except DockerNameInUse as Argument:\n", " raise ('Error Message:', Argument)\n", " sys.exit(1)\n", " return True\n", "\n", " @staticmethod\n", " def load_data():\n", " '''\n", " The function is used to load a gzipped ttl file to the Blazegraph triple store.\n", " '''\n", " l_data = subprocess.Popen(\n", " ['docker', 'exec', '{}_wdqs_1'.format(docker_name), '/wdqs/loadData.sh', '-n', 'wdq', '-d',\n", " '/instancestore/wikibase/mungeOut'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)\n", " print(l_data.communicate()[0])\n", "\n", " def driver_fn(self):\n", " '''\n", " This is the main driver function which first checks if the user wants to create a new docker instance.\n", " \n", " '''\n", " if self.create_new:\n", " all_parameters = self.check_availability() # checks the availability of the ports and the docker name\n", " if all_parameters:\n", " # creates a new docker container\n", " create_docker = subprocess.Popen(\n", " ['docker-compose', '-f', 'docker-compose.pipeline.yml', '-p', docker_name, 'up', '-d'],\n", " stdin=subprocess.PIPE, stdout=subprocess.PIPE)\n", " create_docker.communicate()\n", "\n", " if self.stop_docker == 'Yes' or self.stop_docker == 'yes':\n", " docker_stop = subprocess.Popen(\n", " ['docker-compose', '-f', 'docker-compose.pipeline.yml', '-p', docker_name, 'down', '-v'],\n", " stdin=subprocess.PIPE, stdout=subprocess.PIPE)\n", " docker_stop.communicate()\n", " sys.exit(1)\n", "\n", " # checks if the directory which needs to be mounted on the docker container is present. \n", " # If not recursively creates a directory. Also moves the triple file that needs to be loaded to this directory\n", " if os.path.isdir(os.getenv('WIKIBASE_VOLUME') + '/mungeOut'):\n", " shutil.copy(ttl_path, os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut/wikidump-000000001.ttl.gz'))\n", " else:\n", " os.makedirs(os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut'))\n", " shutil.copy(ttl_path, os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut/wikidump-000000001.ttl.gz'))\n", "\n", " time.sleep(40) # Wait time to let the docker containers start before the loading function is called\n", "\n", " self.load_data()\n", " os.remove(os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut/wikidump-000000001.ttl.gz'))" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[1m------------Log output of loading the triple file to Blazegraph-------------\u001b[0m\n", "\n", "\n", "b'Processing wikidump-000000001.ttl.gz\\nblazegraph™ by SYSTAP</title\\n></head\\n><body<p>totalElapsed=24274ms, elapsed=24030ms, connFlush=0ms, batchResolve=0, whereClause=0ms, deleteClause=0ms, insertClause=0ms</p\\n><hr><p>COMMIT: totalElapsed=57155ms, commitTime=1611257027095, mutationCount=2007140</p\\n></html\\n>File wikidump-000000002.ttl.gz not found, terminating\\n'\n" ] } ], "source": [ "# Run only load triples\n", "'''\n", "1. This cell is used to load a given triple file to blazegraph triple store.\n", "\n", "2. It will run only if the parameter only_load_triples is set to True\n", "'''\n", "if (gen_triples and load_triples) or load_triples:\n", " if gen_triples:\n", " ttl_path = triple_path\n", " print(color.BOLD + '------------Log output of loading the triple file to Blazegraph-------------' + color.END)\n", " print()\n", " loader_obj = BlazegraphLoad(ttl_path,wikibase_ui_port,wikibase_sparql_port,wikibase_proxy_port,wikibase_qs_port,\n", " wikibase_volume,create_new,docker_name,stop_docker,blazegraph_image,query_service_name)\n", " loader_obj.driver_fn()" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<a href=\"http://localhost:10002\">Sparql Endpoint</a>" ], "text/plain": [ "<IPython.core.display.HTML object>" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate a link to SPARQL ENDPOINT only if triples are loaded to Blazegraph\n", "\n", "if load_triples:\n", " s = \"\"\"<a href=\"http://localhost:{}\">Sparql Endpoint</a>\"\"\".format(wikibase_sparql_port)\n", " display(HTML(s))\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "celltoolbar": "Tags", "kernelspec": { "display_name": "kgtk-env", "language": "python", "name": "kgtk-env" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.9" } }, "nbformat": 4, "nbformat_minor": 4 }