In [17]:
# Generate triples parameter

#kgtk_path takes in the directory which contains the kgtk subgraph
kgtk_path = '/Users/amandeep/Documents/kypher/wikidata_os_v5'
kgtk_file_name = 'all_and_qualifiers.sorted.tsv.gz'
triple_filename= 'all.ttl'
triple_generation_log = 'triple_generation_log.txt'
properties_file_path = f'{kgtk_path}/all.metadata.property.datatypes.tsv.gz'

# Load triples to blazegraph
wikibase_ui_port = '10001'
wikibase_sparql_port = '10002'
wikibase_proxy_port = '10003'
wikibase_qs_port = '10005'
wikibase_volume = '.'
docker_name = 'blazegraphpipeline'
create_new = True
stop_docker = "No"
blazegraph_image = 'wikibase/wdqs:0.3.10'
ttl_path = ''
query_service_name = 'ISI SPARQL Query Service'

#Parameterize whether you want to run just the generate_wikidata_triples part or loading to blazegraph part
gen_triples = True
load_triples = True

#Create new image
create_image = True
image_tag = 'blazegraph_image'
dockerfile_path = './0.3.10/'

In [18]:
import os
import re
import subprocess
import gzip
import subprocess
import socket
import sys
import shutil
import time
import glob
import json
from IPython.display import display, Markdown, HTML
from pathlib import Path

wikibase_volume = f'{kgtk_path}/docker_volume'

Path(wikibase_volume).mkdir(parents=True, exist_ok=True)

input_file_path = f'{kgtk_path}/{kgtk_file_name}'

In [19]:
'''
Utility class to print stuff in Bold.
'''
class color:
 PURPLE = '\033[95m'
 CYAN = '\033[96m'
 DARKCYAN = '\033[36m'
 BLUE = '\033[94m'
 GREEN = '\033[92m'
 YELLOW = '\033[93m'
 RED = '\033[91m'
 BOLD = '\033[1m'
 UNDERLINE = '\033[4m'
 END = '\033[0m'

### Generate Wikidata triples

In [7]:
##generate_wikidata_triples
#Run only generate triples
'''
1. This cell will run only if you just want to generate triples aligned to wikidata schema. 
It will first concatenate the KGTK edge files and then will use the KGTK generate_wikidata_triples 
command to generate triples for the concatenated file.

2. The generate_wikidata_triples takes in the properities file path as a parameter. 
The properties file should have the data_type mentioned for each of the property used in the KGTK edge file.

3. The generated triple file is then gzipped.
'''

if gen_triples:
 print(color.BOLD + '------------Head of the KGTK edge file-------------' + color.END)
 
 print()
 
 !gzcat $input_file_path | head -n 20

 triple_output_save_path = os.path.join(kgtk_path,triple_filename) # Name of the output triple file
 log_save_path = os.path.join(kgtk_path,triple_generation_log) # Name of the log file
 
 # generate the triples
 !kgtk generate_wikidata_triples -i $input_file_path \
 -ap alias -lp label -dp description \
 -pf $properties_file_path \
 -n 1000 \
 --debug \
 -gt yes -gz yes -w yes \
 -log $log_save_path > $triple_output_save_path


[1m------------Head of the KGTK edge file-------------[0m

id	node1	label	node2
P10-P1628-32b85d-7927ece6-0	P10	P1628	"http://www.w3.org/2006/vcard/ns#Video"
P10-P1628-acf60d-b8950832-0	P10	P1628	"https://schema.org/video"
P10-P1629-Q34508-bcc39400-0	P10	P1629	Q34508
P10-P1659-P1651-c4068028-0	P10	P1659	P1651
P10-P1659-P18-5e4b9c4f-0	P10	P1659	P18
P10-P1659-P4238-d21d1ac0-0	P10	P1659	P4238
P10-P1659-P51-86aca4c5-0	P10	P1659	P51
P10-P1855-Q15075950-7eff6d65-0	P10	P1855	Q15075950
P10-P1855-Q15075950-7eff6d65-0-P10-54b214-0	P10-P1855-Q15075950-7eff6d65-0	P10	"Smoorverliefd 12 september.webm"
P10-P1855-Q15075950-7eff6d65-0-P3831-Q622550-0	P10-P1855-Q15075950-7eff6d65-0	P3831	Q622550
P10-P1855-Q69063653-c8cdb04c-0	P10	P1855	Q69063653
P10-P1855-Q69063653-c8cdb04c-0-P10-6fb08f-0	P10-P1855-Q69063653-c8cdb04c-0	P10	"Couch Commander.webm"
P10-P1855-Q7378-555592a4-0	P10	P1855	Q7378
P10-P1855-Q7378-555592a4-0-P10-8a982d-0	P10-P1855-Q7378-555592a4-0	P10	"Elephants Dream (2006).webm"
P10-P2302-Q21

In [8]:
# gzip the triple file
!gzip -f $triple_output_save_path

print()

print(color.BOLD + 'The triple file is generated and saved at:' + color.END,end = ' ')
print(triple_output_save_path + '.gz') 
triple_path = triple_output_save_path + '.gz'

print()

print(color.BOLD + '------------Head of the triple file-------------' + color.END)

print()

!gzcat -cd $triple_path | head -n 20


[1mThe triple file is generated and saved at:[0m /Users/amandeep/Documents/kypher/wikidata_os_v5/all.ttl.gz

[1m------------Head of the triple file-------------[0m

@prefix wikibase: .
@prefix wd: .
@prefix wdt: .
@prefix wdtn: .
@prefix wdno: .
@prefix wds: .
@prefix wdv: .
@prefix wdref: .
@prefix p: .
@prefix pr: .
@prefix prv: .
@prefix prn: .
@prefix ps: .
@prefix psv: .
@prefix psn: .
@prefix pq: .
@prefix pqv: .
@prefix pqn: .
@prefix prov: .
@prefix skos: .
gzcat: error writing to output: Broken pipe
gzcat: /Users/amandeep/Documents/kypher/wikidata_os_v5/all.ttl.gz: uncompress failed


### Load Triples

In [None]:
if create_image:
 subprocess.call(['docker','build','-t','{}'.format(image_tag),'{}'.format(dockerfile_path)])
 blazegraph_image = image_tag

In [21]:
# Exception Functions
class PortInUseError(BaseException):
 """
 Exception class for generating error if the passed ports are already in use.
 
 """
 def __init__(self,value):
 self.value = value


class DockerNameInUse(BaseException):
 """
 Exception class for generating error if the passed Docker Name is already in use.
 
 """
 def __init__(self,value):
 self.value = value


In [22]:
class BlazegraphLoad():
 '''
 The class is used to create a new or use an existing wikibase-docker instance to load 
 a given gzipped ttl file to a blazegraph triple store.
 '''
 def __init__(self,ttl_path,wikibase_ui_port,wikibase_sparql,wikibase_proxy,wikibase_qs,wikibase_volume,
 create_new,docker_name,stop_docker,blazegraph_image,query_service_name):
 '''
 Initializing the class variables and Setting the environment variables 
 that will be used by the docker-compose.pipeline.yml file.
 
 '''
 self.ttl_path = ttl_path
 self.wikibase_ui_port = str(wikibase_ui_port)
 self.wikibase_sparql = str(wikibase_sparql)
 self.wikibase_proxy = str(wikibase_proxy)
 self.wikibase_qs = str(wikibase_qs)
 self.wikibase_volume = wikibase_volume
 self.create_new = create_new
 self.docker_name = docker_name
 self.stop_docker = stop_docker
 self.blazegraph_image = blazegraph_image
 self.query_service_name = query_service_name
 os.environ['WIKIBASE_UI'] = self.wikibase_ui_port
 os.environ['WIKIBASE_SPARQL'] = self.wikibase_sparql
 os.environ['WIKIBASE_PROXY'] = self.wikibase_proxy
 os.environ['WIKIBASE_QS'] = self.wikibase_qs
 os.environ['WIKIBASE_VOLUME'] = self.wikibase_volume
 os.environ['BLAZEGRAPH_IMAGE'] = self.blazegraph_image
 os.environ['QUERY_SERVICE_NAME'] = self.query_service_name

 @staticmethod
 def check_availability():
 '''
 1. The function checks whether the passed ports are available or not. If anyone of the passed port
 is not available, then it will generate an error.
 2. The functions also checks if the passed docker name is available or not. If the docker name is 
 already in use it will generate an error.
 '''

 wikibase_ui = os.getenv('WIKIBASE_UI')
 wikibase_sparql = os.getenv('WIKIBASE_SPARQL')
 wikibase_proxy = os.getenv('WIKIBASE_PROXY')
 wikibase_qs = os.getenv('WIKIBASE_QS')
 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
 wikibase_ui_usage = s.connect_ex(('localhost', int(wikibase_ui))) == 0
 wikibase_sparql_usage = s.connect_ex(('localhost', int(wikibase_sparql))) == 0
 wikibase_proxy_usage = s.connect_ex(('localhost', int(wikibase_proxy))) == 0
 wikibase_qs_usage = s.connect_ex(('localhost', int(wikibase_qs))) == 0
 docker_name_availability = subprocess.Popen(['docker', 'ps', '--filter', 'name={}'.format(docker_name)],
 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
 try:
 if create_new:
 if wikibase_ui_usage:
 raise PortInUseError('Wikibase UI Port is in use')
 if wikibase_sparql_usage:
 raise PortInUseError('Wikibase Sparql Port is in use')
 if wikibase_proxy_usage:
 raise PortInUseError('Wikibase Proxy Port is in use')
 if wikibase_qs_usage:
 raise PortInUseError('Wikibase QS Port is in use')
 if len(docker_name_availability.communicate()[0]) > 126:
 raise DockerNameInUse('Try changing docker container name')
 print(docker_name_availability)
 except PortInUseError as Argument:
 raise ('Error Message:', Argument)
 sys.exit(1)

 except DockerNameInUse as Argument:
 raise ('Error Message:', Argument)
 sys.exit(1)
 return True

 @staticmethod
 def load_data():
 '''
 The function is used to load a gzipped ttl file to the Blazegraph triple store.
 '''
 l_data = subprocess.Popen(
 ['docker', 'exec', '{}_wdqs_1'.format(docker_name), '/wdqs/loadData.sh', '-n', 'wdq', '-d',
 '/instancestore/wikibase/mungeOut'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
 print(l_data.communicate()[0])

 def driver_fn(self):
 '''
 This is the main driver function which first checks if the user wants to create a new docker instance.
 
 '''
 if self.create_new:
 all_parameters = self.check_availability() # checks the availability of the ports and the docker name
 if all_parameters:
 # creates a new docker container
 create_docker = subprocess.Popen(
 ['docker-compose', '-f', 'docker-compose.pipeline.yml', '-p', docker_name, 'up', '-d'],
 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
 create_docker.communicate()

 if self.stop_docker == 'Yes' or self.stop_docker == 'yes':
 docker_stop = subprocess.Popen(
 ['docker-compose', '-f', 'docker-compose.pipeline.yml', '-p', docker_name, 'down', '-v'],
 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
 docker_stop.communicate()
 sys.exit(1)

 # checks if the directory which needs to be mounted on the docker container is present. 
 # If not recursively creates a directory. Also moves the triple file that needs to be loaded to this directory
 if os.path.isdir(os.getenv('WIKIBASE_VOLUME') + '/mungeOut'):
 shutil.copy(ttl_path, os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut/wikidump-000000001.ttl.gz'))
 else:
 os.makedirs(os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut'))
 shutil.copy(ttl_path, os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut/wikidump-000000001.ttl.gz'))

 time.sleep(40) # Wait time to let the docker containers start before the loading function is called

 self.load_data()
 os.remove(os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut/wikidump-000000001.ttl.gz'))

In [24]:
# Run only load triples
'''
1. This cell is used to load a given triple file to blazegraph triple store.

2. It will run only if the parameter only_load_triples is set to True
'''
if (gen_triples and load_triples) or load_triples:
 if gen_triples:
 ttl_path = triple_path
 print(color.BOLD + '------------Log output of loading the triple file to Blazegraph-------------' + color.END)
 print()
 loader_obj = BlazegraphLoad(ttl_path,wikibase_ui_port,wikibase_sparql_port,wikibase_proxy_port,wikibase_qs_port,
 wikibase_volume,create_new,docker_name,stop_docker,blazegraph_image,query_service_name)
 loader_obj.driver_fn()

[1m------------Log output of loading the triple file to Blazegraph-------------[0m


b'Processing wikidump-000000001.ttl.gz\nblazegraph™ by SYSTAP</title\n></head\n><body<p>totalElapsed=24274ms, elapsed=24030ms, connFlush=0ms, batchResolve=0, whereClause=0ms, deleteClause=0ms, insertClause=0ms</p\n><hr><p>COMMIT: totalElapsed=57155ms, commitTime=1611257027095, mutationCount=2007140</p\n></html\n>File wikidump-000000002.ttl.gz not found, terminating\n'


In [25]:
# Generate a link to SPARQL ENDPOINT only if triples are loaded to Blazegraph

if load_triples:
 s = """<a href="http://localhost:{}">Sparql Endpoint</a>""".format(wikibase_sparql_port)
 display(HTML(s))
 