# 5.1 Big Data in Genomics - Clustering

This notebooks demonstated how kMeans clustering can be used to create the reduced (in terms of dimensions) representation of genonomic variants. 

In this notebook we use a small sample of the data from '1000 Genomes Project', which provide whole genomes for ~2500 individuals from ~20 populatios in the world. (We use only about 1000 variants from chromosome 22).

The data are available in the `VCF` (Variant Call Format), which essentially describe the positions, at which the genome of each of the individuals differs from the 'reference genome'.

Let's have a quick look at the data first (`data/ALL.chr22.phase3_1000.vcf.bz2`):

In [1]:
%%sh

bunzip2 -c 'data/ALL.chr22.phase3_1000.vcf.bz2' | head -n 255 | tail -n 5

##INFO=<ID=EX_TARGET,Number=0,Type=Flag,Description="indicates whether a variant is within the exon pull down target boundaries">
##INFO=<ID=MULTI_ALLELIC,Number=0,Type=Flag,Description="indicates whether a site is multi-allelic">
#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	HG00096	HG00097	HG00099	HG00100	HG00101	HG00102	HG00103	HG00105	HG00106	HG00107	HG00108	HG00109	HG00110	HG00111	HG00112	HG00113	HG00114	HG00115	HG00116	HG00117	HG00118	HG00119	HG00120	HG00121	HG00122	HG00123	HG00125	HG00126	HG00127	HG00128	HG00129	HG00130	HG00131	HG00132	HG00133	HG00136	HG00137	HG00138	HG00139	HG00140	HG00141	HG00142	HG00143	HG00145	HG00146	HG00148	HG00149	HG00150	HG00151	HG00154	HG00155	HG00157	HG00158	HG00159	HG00160	HG00171	HG00173	HG00174	HG00176	HG00177	HG00178	HG00179	HG00180	HG00181	HG00182	HG00183	HG00185	HG00186	HG00187	HG00188	HG00189	HG00190	HG00231	HG00232	HG00233	HG00234	HG00235	HG00236	HG00237	HG00238	HG00239	HG00240	HG00242	HG00243	HG00244	HG00245	HG00246	HG00250	HG00251	HG00252	HG0


bunzip2: I/O or other error, bailing out.  Possible reason follows.
bunzip2: Broken pipe
	Input file = data/ALL.chr22.phase3_1000.vcf.bz2, output file = (stdout)


We will preprocess the file and load it to a DataFrame

In [2]:
# load the file as text RDD
chr22RDD =  sc.textFile('data/ALL.chr22.phase3_1000.vcf.bz2', 4)

# extract the header line (starting with '#CHROM`)
header = chr22RDD.filter(lambda line: line.startswith("#CHROM")).map(lambda line:line.split()).first()
print(header[0:10])
columnNames = header[0:9]
sampleNames = header[9:]
print(sampleNames[0:10])

[u'#CHROM', u'POS', u'ID', u'REF', u'ALT', u'QUAL', u'FILTER', u'INFO', u'FORMAT', u'HG00096']
[u'HG00096', u'HG00097', u'HG00099', u'HG00100', u'HG00101', u'HG00102', u'HG00103', u'HG00105', u'HG00106', u'HG00107']


In [3]:
from pyspark.ml.linalg import Vectors

# encode the genotypes base on the amount of variation from the base allele
# 0|0 -> 0
# 0|1 or 1|0 -> 1
# 1|1 -> 2
encodeGenotype = lambda genotype: float(sum( allele!='0' for allele in  genotype.split('|')))
encodeGenotypes = lambda genotypes: Vectors.dense(map(encodeGenotype, genotypes))

def splitGenotypeLine(line):
    columns = line.split()
    return columns[0:9] + [ encodeGenotypes(columns[9:]) ]


# create a DataFrame from with the encoded genotypes
df = spark.createDataFrame(chr22RDD.filter(lambda line: not line.startswith("#")).map(splitGenotypeLine), 
                          schema = columnNames + ['encoded_genotypes'])
df.cache()
print("Number of variants: %s" % df.count())

Number of variants: 1000


In [4]:
df.printSchema()

root
 |-- #CHROM: string (nullable = true)
 |-- POS: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- REF: string (nullable = true)
 |-- ALT: string (nullable = true)
 |-- QUAL: string (nullable = true)
 |-- FILTER: string (nullable = true)
 |-- INFO: string (nullable = true)
 |-- FORMAT: string (nullable = true)
 |-- encoded_genotypes: vector (nullable = true)



In [5]:
display(df.limit(3))

Unnamed: 0,#CHROM,POS,ID,REF,ALT,QUAL,FILTER,INFO,FORMAT,encoded_genotypes
0,22,16050075,rs587697622,A,G,100,PASS,AC=1;AF=0.000199681;AN=5008;NS=2504;DP=8012;EA...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,22,16050115,rs587755077,G,A,100,PASS,AC=32;AF=0.00638978;AN=5008;NS=2504;DP=11468;E...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,22,16050213,rs587654921,C,T,100,PASS,AC=38;AF=0.00758786;AN=5008;NS=2504;DP=15092;E...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [6]:
# train the kMeans model with 10 clusters
from pyspark.ml.clustering import KMeans
kMeans = KMeans(featuresCol='encoded_genotypes', k=10, initMode='random')
kMeansModel = kMeans.fit(df)

In [7]:
# use pandas to transpose the cluster centers
import pandas as pd
pd.set_option('display.max_rows', 5)
clusterCentersPD = pd.DataFrame.from_records(kMeansModel.clusterCenters()).T
clusterCentersPD.insert(0, 'Sample ID', sampleNames)
clusterCentersPD

Unnamed: 0,Sample ID,0,1,2,3,4,5,6,7,8,9
0,HG00096,0.000000,0.000000,0.000000,0.003282,0.0,0.0,0.5625,1.0,0.863636,0.0
1,HG00097,0.000000,0.083333,0.800000,0.006565,0.0,0.0,0.1250,0.0,1.409091,0.0
...,...,...,...,...,...,...,...,...,...,...,...
2502,NA21143,0.000000,0.166667,1.533333,0.006565,0.0,0.0,0.0000,0.0,1.863636,0.0
2503,NA21144,1.222222,0.000000,0.000000,0.002188,0.0,0.0,0.7500,0.0,1.954545,1.0


In [8]:
# create a Spark DataFrame from panads 
clusterCentersDF = spark.createDataFrame(clusterCentersPD)
clusterCentersDF.printSchema()

root
 |-- Sample ID: string (nullable = true)
 |-- 0: double (nullable = true)
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)



In [9]:
# save the reduced representation to a csv file
clusterCentersDF.coalesce(1).write.csv(path='output/cluster_centers.csv', mode='overwrite', header=True)