# make_EAR.py
# by Diego De Panis
# ERGA Sequencing and Assembly Committee
EAR_version = "v24.09.10"
import sys
import argparse
import logging
import re
import yaml
import os
import pytz
import requests
import glob
import math
from datetime import datetime
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
from reportlab.lib.units import cm
def make_report(yaml_file):
logging.basicConfig(filename='EAR.log', level=logging.INFO)
# Read the content from EAR.yaml file
with open(yaml_file, "r") as file:
yaml_data = yaml.safe_load(file)
# FUNCTIONS ###################################################################################
def format_number(value):
try:
value_float = float(value)
if value_float.is_integer():
# format as an integer if no decimal part
return f'{int(value_float):,}'
else:
# format as a float
return f'{value_float:,}'
except ValueError:
# return the original value if it can't be converted to a float
return value
# extract gfastats values
def extract_gfastats_values(content, keys):
return [re.findall(f"{key}: (.+)", content)[0] for key in keys]
keys = [
"Total scaffold length",
"GC content %",
"# gaps in scaffolds",
"Total gap length in scaffolds",
"# scaffolds",
"Scaffold N50",
"Scaffold L50",
"Scaffold L90",
"# contigs",
"Contig N50",
"Contig L50",
"Contig L90",
]
display_names = keys.copy()
display_names[display_names.index("Total scaffold length")] = "Total bp"
total_length_index = keys.index("Total scaffold length")
display_names[display_names.index("GC content %")] = "GC %"
display_names[display_names.index("Total gap length in scaffolds")] = "Total gap bp"
display_names[display_names.index("# scaffolds")] = "Scaffolds"
display_names[display_names.index("# contigs")] = "Contigs"
gaps_index = keys.index("# gaps in scaffolds")
exclusion_list = ["# gaps in scaffolds"]
# extract Total bp from gfastats report
def extract_total_bp_from_gfastats(gfastats_path):
with open(gfastats_path, "r") as f:
content = f.read()
total_bp = re.search(r"Total scaffold length: (.+)", content).group(1)
total_bp = int(total_bp.replace(',', ''))
return "{:,}".format(total_bp)
# compute EBP quality metric
def compute_ebp_metric(haplotype, gfastats_path, qv_value):
keys_needed = ["Contig N50", "Scaffold N50"]
content = ''
with open(gfastats_path, "r") as f:
content = f.read()
values = extract_gfastats_values(content, keys_needed)
contig_n50_log = math.floor(math.log10(int(values[0].replace(',', ''))))
scaffold_n50_log = math.floor(math.log10(int(values[1].replace(',', ''))))
return f"Obtained EBP quality metric for {haplotype}: {contig_n50_log}.{scaffold_n50_log}.Q{math.floor(float(qv_value))}"
# extract qv values
def get_qv_value(dir_path, order):
try:
file_paths = glob.glob(f"{dir_path}/*.qv")
for file_path in file_paths:
with open(file_path, 'r') as file:
lines = file.readlines()
if len(lines) > order and (len(lines) == 1 or lines[2].split('\t')[0].strip() == "Both"):
target_line = lines[order]
fourth_column_value = target_line.split('\t')[3]
return fourth_column_value
except Exception as e:
logging.error(f"Error reading {dir_path}: {str(e)}")
return ''
# extract Kmer completeness values
def get_completeness_value(dir_path, order):
try:
file_paths = glob.glob(f"{dir_path}/*completeness.stats")
for file_path in file_paths:
with open(file_path, 'r') as file:
lines = file.readlines()
if len(lines) > order:
target_line = lines[order]
fifth_column_value = target_line.split('\t')[4].strip()
return fifth_column_value
except Exception as e:
logging.warning(f"Error reading {dir_path}: {str(e)}")
return ''
# Getting kmer plots for curated asm
def get_png_files(dir_path):
png_files = glob.glob(f"{dir_path}/*.ln.png")
if len(png_files) < 4:
logging.warning(f"Warning: Less than 4 png files found in {dir_path}. If this is diploid, some images may be missing.")
# fill missing with None
while len(png_files) < 4:
png_files.append(None)
return png_files[:4]
# get unique part in file names
def find_unique_parts(file1, file2):
# Split filenames into parts
parts1 = file1.split('.')
parts2 = file2.split('.')
# Find unique parts
unique_parts1 = [part for part in parts1 if part not in parts2]
unique_parts2 = [part for part in parts2 if part not in parts1]
return ' '.join(unique_parts1), ' '.join(unique_parts2)
# extract BUSCO values
def extract_busco_values(file_path):
try:
with open(file_path, 'r') as file:
content = file.read()
results_line = re.findall(r"C:.*n:\d+", content)[0]
s_value = re.findall(r"S:(\d+\.\d+%)", results_line)[0]
d_value = re.findall(r"D:(\d+\.\d+%)", results_line)[0]
f_value = re.findall(r"F:(\d+\.\d+%)", results_line)[0]
m_value = re.findall(r"M:(\d+\.\d+%)", results_line)[0]
return s_value, d_value, f_value, m_value
except Exception as e:
logging.warning(f"Error reading {file_path}: {str(e)}")
return '', '', '', ''
# extract BUSCO info
def extract_busco_info(file_path):
busco_version = None
lineage_info = None
try:
with open(file_path, 'r') as file:
content = file.read()
version_match = re.search(r"# BUSCO version is: ([\d.]+)", content)
if version_match:
busco_version = version_match.group(1)
lineage_match = re.search(r"The lineage dataset is: (.*?) \(Creation date:.*?, number of genomes: (\d+), number of BUSCOs: (\d+)\)", content)
if lineage_match:
lineage_info = lineage_match.groups()
if not lineage_info:
lineage_match = re.search(r"The lineage dataset is: (.*?) \(Creation date:.*?, number of species: (\d+), number of BUSCOs: (\d+)\)", content)
if lineage_match:
lineage_info = lineage_match.groups()
except Exception as e:
logging.warning(f"Error reading {file_path}: {str(e)}")
return busco_version, lineage_info
# Function to check and generate warning messages
def generate_warning_paragraphs(expected, observed, trait):
paragraphs = []
try:
if trait == "Haploid size (bp)":
expected_val = int(expected.replace(',', ''))
observed_val = int(observed.replace(',', ''))
if abs(expected_val - observed_val) / expected_val > 0.20:
message = f". Observed {trait} has >20% difference with Expected"
paragraphs.append(Paragraph(message, styles["midiStyle"]))
elif trait in ["Haploid Number", "Ploidy"]:
# Ensure both values are integers for comparison
expected_val = int(expected)
observed_val = int(observed)
if expected_val != observed_val:
message = f". Observed {trait} is different from Expected"
paragraphs.append(Paragraph(message, styles["midiStyle"]))
elif trait == "Sample Sex":
# Compare case-insensitive and trimmed strings
if expected.strip().lower() != observed.strip().lower():
message = f". Observed sex is different from Sample sex"
paragraphs.append(Paragraph(message, styles["midiStyle"]))
except Exception as e:
logging.warning(f"Error in generating warning for {trait}: {str(e)}")
return paragraphs
# Generate warnings for curated haplotypes (qv, kcomp, busco)
def generate_curated_warnings(haplotype, qv_value, completeness_value, busco_scores):
paragraphs = []
try:
# Ensure values are correctly interpreted as floats
qv_val = float(qv_value)
completeness_val = float(completeness_value)
s_value = float(busco_scores[0].rstrip('%'))
d_value = float(busco_scores[1].rstrip('%'))
# Check QV value
if qv_val < 40:
message = f". QV value is less than 40 for {haplotype}"
paragraphs.append(Paragraph(message, styles["midiStyle"]))
# Check Kmer completeness value
if completeness_val < 90:
message = f". Kmer completeness value is less than 90 for {haplotype}"
paragraphs.append(Paragraph(message, styles["midiStyle"]))
# Check BUSCO s_value
if s_value < 90:
message = f". BUSCO single copy value is less than 90% for {haplotype}"
paragraphs.append(Paragraph(message, styles["midiStyle"]))
# Check BUSCO d_value
if d_value > 5:
message = f". BUSCO duplicated value is more than 5% for {haplotype}"
paragraphs.append(Paragraph(message, styles["midiStyle"]))
except Exception as e:
logging.warning(f"Error in generating warnings for {haplotype}: {str(e)}")
return paragraphs
# Generate warnings for curated haplotypes (loss, gaps, 90inChrom)
def generate_assembly_warnings(asm_data, gaps_per_gbp_data, obs_haploid_num):
warnings = []
# Iterate over haplotypes and generate warnings based on the criteria
for haplotype in asm_stages:
pre_curation_bp = extract_total_bp_from_gfastats(asm_data['Pre-curation'][haplotype]['gfastats--nstar-report_txt'])
curated_bp = extract_total_bp_from_gfastats(asm_data['Curated'][haplotype]['gfastats--nstar-report_txt'])
scaffold_l90 = float(gfastats_data[('Curated', haplotype)][display_names.index('Scaffold L90')].replace(',', ''))
# Check for assembly length loss > 3%
if pre_curation_bp and curated_bp:
loss_percentage = (float(pre_curation_bp.replace(',', '')) - float(curated_bp.replace(',', ''))) / float(pre_curation_bp.replace(',', '')) * 100
if loss_percentage > 3:
warnings.append(Paragraph(f". Assembly length loss > 3% for {haplotype}", styles["midiStyle"]))
# Check for more than 1000 gaps/Gbp
gaps_gbp = gaps_per_gbp_data.get(('Curated', haplotype), 0)
if gaps_gbp > 1000:
warnings.append(Paragraph(f". More than 1000 gaps/Gbp for {haplotype}", styles["midiStyle"]))
# Check if Scaffold L90 value is more than Observed Haploid number
if scaffold_l90 > float(obs_haploid_num):
warnings.append(Paragraph(f". Not 90% of assembly in chromosomes for {haplotype}", styles["midiStyle"]))
return warnings
# Parse pipeline and generate "tree"
def generate_pipeline_tree(pipeline_data):
tree_lines = []
indent = " " * 2 # Adjust indent spacing
if isinstance(pipeline_data, dict):
for tool, version_param in pipeline_data.items():
# Tool line
tool_line = f"- {tool}"
tree_lines.append(tool_line)
# Convert version_param to string and split
version_param_str = str(version_param)
parts = version_param_str.split('/')
version = parts[0]
params = [p for p in parts[1:] if p] # This will remove empty strings
# Version line
version_line = f"{indent*2}|_ ver: {version}"
tree_lines.append(version_line)
# Param line(s)
if params:
for param in params:
param_line = f"{indent*2}|_ key param: {param}"
tree_lines.append(param_line)
else:
param_line = f"{indent*2}|_ key param: NA"
tree_lines.append(param_line)
else:
tree_lines.append("Invalid pipeline data format")
# Join lines with HTML break for paragraph
tree_diagram = " ".join(tree_lines)
return tree_diagram
# Reading SAMPLE INFORMATION section from yaml ################################################
# Check for required fields
required_fields = ["ToLID", "Species", "Sex", "Submitter", "Affiliation", "Tags"]
missing_fields = [field for field in required_fields if field not in yaml_data or not yaml_data[field]]
if missing_fields:
logging.error(f"# GENERAL INFORMATION section in the yaml file is missing or empty for the following information: {', '.join(missing_fields)}")
sys.exit(1)
# Check that "Species" field is a string
if not isinstance(yaml_data["Species"], str):
logging.error(f"# GENERAL INFORMATION section in the yaml file contains incorrect data type for 'Species'. Expected 'str' but got '{type(yaml_data['Species']).__name__}'.")
sys.exit(1)
## Get data for Header, ToLID table and submitter
tol_id = yaml_data["ToLID"]
species = yaml_data["Species"]
sex = yaml_data["Sex"]
submitter = yaml_data["Submitter"]
affiliation = yaml_data["Affiliation"]
tags = yaml_data["Tags"]
# Check if tag is valid
valid_tags = ["ERGA-BGE", "ERGA-Pilot", "ERGA-Community", "ERGA-testing"]
if tags not in valid_tags:
tags += "[INVALID TAG]"
logging.warning(f"# SAMPLE INFORMATION section in the yaml file contains an invalid tag. Valid tags are ERGA-BGE, ERGA-Pilot and ERGA-Community.")
# Get data from GoaT based on species name
# urllib.parse.quote to handle special characters and spaces in the species name
species_name = requests.utils.quote(species)
# Get stuff from GoaT
goat_response = requests.get(f'https://goat.genomehubs.org/api/v2/search?query=tax_name%28{species_name}%29&result=taxon')
goat_data = goat_response.json() # convert json to dict
taxon_number = goat_data['results'][0]['result']['taxon_id']
goat_results = goat_data['results']
class_name = 'NA'
order_name = 'NA'
haploid_number = 'NA'
haploid_source = 'NA'
ploidy = 'NA'
ploidy_source = 'NA'
for result in goat_results:
lineage = result['result']['lineage']
for node in lineage:
if node['taxon_rank'] == 'class':
class_name = node['scientific_name']
if node['taxon_rank'] == 'order':
order_name = node['scientific_name']
goat_second_response = requests.get(f'https://goat.genomehubs.org/api/v2/record?recordId={taxon_number}&result=taxon&taxonomy=ncbi')
goat_second_data = goat_second_response.json()
ploidy_info = goat_second_data['records'][0]['record']['attributes']['ploidy']
ploidy = ploidy_info['value']
ploidy_source = ploidy_info['aggregation_source']
haploid_info = goat_second_data['records'][0]['record']['attributes']['haploid_number']
haploid_number = haploid_info['value']
haploid_source = haploid_info['aggregation_source']
sp_data = [
["TxID", "ToLID", "Species", "Class", "Order"],
[taxon_number, tol_id, species, class_name, order_name]
]
# Transpose the data
transposed_sp_data = list(map(list, zip(*sp_data)))
# Reading SEQUENCING DATA section from yaml ###################################################
# get DATA section from yaml
data_list = yaml_data.get('DATA', [])
# Prepare headers
headers = ['Data']
data_values = ['Coverage']
# Extract data from YAML and format it for the table
for item in data_list:
for technology, coverage in item.items():
headers.append(technology)
data_values.append('NA' if not coverage else coverage)
# Create a list of lists for the table
table_data = [headers, data_values]
# Extract pipeline data
asm_pipeline_data = yaml_data.get('PIPELINES', {}).get('Assembly', {})
curation_pipeline_data = yaml_data.get('PIPELINES', {}).get('Curation', {})
# Extract pipeline data from 'Curated' category
asm_pipeline_tree = generate_pipeline_tree(asm_pipeline_data)
curation_pipeline_tree = generate_pipeline_tree(curation_pipeline_data)
# Reading GENOME PROFILING DATA section from yaml #############################################
profiling_data = yaml_data.get('PROFILING')
# Check if profiling_data is available
if not profiling_data:
logging.error('Error: No profiling data found in the YAML file.')
sys.exit(1)
# Check for GenomeScope data (mandatory)
genomescope_data = profiling_data.get('GenomeScope')
if not genomescope_data:
logging.error("Error: GenomeScope data is missing in the YAML file. This is mandatory.")
sys.exit(1)
genomescope_summary = genomescope_data.get('genomescope_summary_txt')
if not genomescope_summary:
logging.error("Error: GenomeScope summary file path is missing in the YAML file.")
sys.exit(1)
# Read the content of the GenomeScope summary file
try:
with open(genomescope_summary, "r") as f:
summary_txt = f.read()
# Extract values from summary.txt
genome_haploid_length_match = re.search(r"Genome Haploid Length\s+([\d,]+|NA)\s*bp", summary_txt)
proposed_ploidy = re.search(r"p = (\d+)", summary_txt).group(1)
if genome_haploid_length_match:
genome_haploid_length = genome_haploid_length_match.group(1)
if genome_haploid_length == "NA":
logging.warning("Genome Haploid Length is NA. Using max value if available.")
max_length_match = re.search(r"Genome Haploid Length\s+(?:NA|[\d,]+)\s*bp\s+([\d,]+)\s*bp", summary_txt)
if max_length_match:
genome_haploid_length = max_length_match.group(1)
else:
logging.error("Unable to find a valid Genome Haploid Length.")
sys.exit(1)
else:
logging.error("Unable to find Genome Haploid Length in the summary file.")
sys.exit(1)
except Exception as e:
logging.error(f"Error reading GenomeScope summary file: {str(e)}")
sys.exit(1)
# Check for Smudgeplot data (optional)
smudgeplot_data = profiling_data.get('Smudgeplot')
if smudgeplot_data:
smudgeplot_summary = smudgeplot_data.get('smudgeplot_verbose_summary_txt')
if smudgeplot_summary:
try:
with open(smudgeplot_summary, "r") as f:
smud_summary_txt = f.readlines()
for line in smud_summary_txt:
if line.startswith("* Proposed ploidy"):
proposed_ploidy = line.split(":")[1].strip()
break
except Exception as e:
logging.warning(f"Error reading Smudgeplot summary file: {str(e)}. Using GenomeScope ploidy.")
else:
logging.warning("Smudgeplot summary file path is missing. Using GenomeScope ploidy.")
else:
logging.info("Smudgeplot data not provided. Using GenomeScope ploidy.")
# Reading ASSEMBLY DATA section from yaml #####################################################
asm_data = yaml_data.get('ASSEMBLIES', {})
# make a list from the assemblies available in asm_data
asm_stages = []
for asm_stage, stage_properties in asm_data.items():
for haplotypes in stage_properties.keys():
if haplotypes not in asm_stages:
asm_stages.append(haplotypes)
# get gfastats-based data
gfastats_data = {}
for asm_stage, stage_properties in asm_data.items():
for haplotypes, haplotype_properties in stage_properties.items():
if isinstance(haplotype_properties, dict):
if 'gfastats--nstar-report_txt' in haplotype_properties:
file_path = haplotype_properties['gfastats--nstar-report_txt']
with open(file_path, 'r') as file:
content = file.read()
gfastats_data[(asm_stage, haplotypes)] = extract_gfastats_values(content, keys)
gaps_per_gbp_data = {}
for (asm_stage, haplotypes), values in gfastats_data.items():
try:
gaps = float(values[gaps_index])
total_length = float(values[total_length_index])
gaps_per_gbp = round((gaps / total_length * 1_000_000_000), 2)
gaps_per_gbp_data[(asm_stage, haplotypes)] = gaps_per_gbp
except (ValueError, ZeroDivisionError):
gaps_per_gbp_data[(asm_stage, haplotypes)] = ''
# Define the contigging table (column names)
asm_table_data = [["Metrics"] + [f'{asm_stage} \n {haplotypes}' for asm_stage in asm_data for haplotypes in asm_stages if haplotypes in asm_data[asm_stage]]]
# Fill the table with the gfastats data
for i in range(len(display_names)):
metric = display_names[i]
if metric not in exclusion_list:
asm_table_data.append([metric] + [format_number(gfastats_data.get((asm_stage, haplotypes), [''])[i]) if (asm_stage, haplotypes) in gfastats_data else '' for asm_stage in asm_data for haplotypes in asm_stages if haplotypes in asm_data[asm_stage]])
# Add the gaps/gbp in between
asm_table_data.insert(gaps_index + 1, ['Gaps/Gbp'] + [format_number(gaps_per_gbp_data.get((asm_stage, haplotypes), '')) for asm_stage in asm_data for haplotypes in asm_stages if haplotypes in asm_data[asm_stage]])
# get QV, Kmer completeness and BUSCO data
qv_data = {}
completeness_data = {}
busco_data = {metric: {} for metric in ['BUSCO sing.', 'BUSCO dupl.', 'BUSCO frag.', 'BUSCO miss.']}
for asm_stage, stage_properties in asm_data.items():
asm_stage_elements = list(stage_properties.keys())
for i, haplotypes in enumerate(asm_stage_elements):
haplotype_properties = stage_properties[haplotypes]
if isinstance(haplotype_properties, dict):
if 'merqury_folder' in haplotype_properties:
qv_data[(asm_stage, haplotypes)] = get_qv_value(haplotype_properties['merqury_folder'], i)
completeness_data[(asm_stage, haplotypes)] = get_completeness_value(haplotype_properties['merqury_folder'], i)
if 'busco_short_summary_txt' in haplotype_properties:
s_value, d_value, f_value, m_value = extract_busco_values(haplotype_properties['busco_short_summary_txt'])
busco_data['BUSCO sing.'].update({(asm_stage, haplotypes): s_value})
busco_data['BUSCO dupl.'].update({(asm_stage, haplotypes): d_value})
busco_data['BUSCO frag.'].update({(asm_stage, haplotypes): f_value})
busco_data['BUSCO miss.'].update({(asm_stage, haplotypes): m_value})
# Fill the table with the QV data
asm_table_data.append(['QV'] + [qv_data.get((asm_stage, haplotypes), '') for asm_stage in asm_data for haplotypes in asm_stages if haplotypes in asm_data[asm_stage]])
# Fill the table with the Kmer completeness data
asm_table_data.append(['Kmer compl.'] + [completeness_data.get((asm_stage, haplotypes), '') for asm_stage in asm_data for haplotypes in asm_stages if haplotypes in asm_data[asm_stage]])
# Fill the table with the BUSCO data
for metric in ['BUSCO sing.', 'BUSCO dupl.', 'BUSCO frag.', 'BUSCO miss.']:
asm_table_data.append([metric] + [busco_data[metric].get((asm_stage, haplotypes), '') for asm_stage in asm_data for haplotypes in asm_stages if haplotypes in asm_data[asm_stage]])
# Reading CURATION NOTES section from yaml ####################################################
obs_haploid_num = yaml_data.get("NOTES", {}).get("Obs_Haploid_num", "NA")
obs_sex = yaml_data.get("NOTES", {}).get("Obs_Sex", "NA")
interventions_per_gb = yaml_data.get("NOTES", {}).get("Interventions_per_Gb", "NA")
contamination_notes = yaml_data.get("NOTES", {}).get("Contamination_notes", "NA")
other_notes = yaml_data.get("NOTES", {}).get("Other_notes", "NA")
# Extract Total bp for each haplotype and find the maximum
curated_assemblies = yaml_data.get('ASSEMBLIES', {}).get('Curated', {})
total_bp_values = []
for haplotype, properties in curated_assemblies.items():
if 'gfastats--nstar-report_txt' in properties:
total_bp = extract_total_bp_from_gfastats(properties['gfastats--nstar-report_txt'])
total_bp_values.append(total_bp)
max_total_bp = max(total_bp_values, default='NA')
# Create table data
genome_traits_table_data = [
["Genome Traits", "Expected", "Observed"],
["Haploid size (bp)", genome_haploid_length, f"{max_total_bp}"],
["Haploid Number", f"{haploid_number} (source: {haploid_source})", obs_haploid_num],
["Ploidy", f"{ploidy} (source: {ploidy_source})", proposed_ploidy],
["Sample Sex", sex, obs_sex]
]
# Get curator notes
curator_notes_text = (
f". Interventions/Gb: {interventions_per_gb} "
f". Contamination notes: "{contamination_notes}" "
f". Other observations: "{other_notes}""
)
# PDF CONSTRUCTION ############################################################################
# Set up the PDF file
pdf_filename = f"{tol_id}_EAR.pdf"
margin = 0.5 * 72 # 0.5 inch in points (normal margin is 1 inch)
pdf = SimpleDocTemplate(pdf_filename,
pagesize=A4,
leftMargin=margin,
rightMargin=margin,
topMargin=margin,
bottomMargin=margin)
elements = []
# Set all the styles
styles = getSampleStyleSheet()
styles.add(ParagraphStyle(name='TitleStyle', fontName='Courier', fontSize=20))
styles.add(ParagraphStyle(name='subTitleStyle', fontName='Courier', fontSize=16))
styles.add(ParagraphStyle(name='normalStyle', fontName='Courier', fontSize=12))
styles.add(ParagraphStyle(name='midiStyle', fontName='Courier', fontSize=10))
#styles.add(ParagraphStyle(name='LinkStyle', fontName='Courier', fontSize=10, textColor='blue', underline=True))
styles.add(ParagraphStyle(name='treeStyle', fontName='Courier', fontSize=10, leftIndent=12))
styles.add(ParagraphStyle(name='miniStyle', fontName='Courier', fontSize=8))
styles.add(ParagraphStyle(name='FileNameStyle', fontName='Courier', fontSize=6))
# PDF SECTION 1 -------------------------------------------------------------------------------
# Add the title
title = Paragraph("ERGA Assembly Report", styles['TitleStyle'])
elements.append(title)
# Spacer
elements.append(Spacer(1, 12))
# Add version
ver_paragraph = Paragraph(EAR_version, styles['normalStyle'])
elements.append(ver_paragraph)
# Spacer
elements.append(Spacer(1, 12))
# Add tags
tags_paragraph = Paragraph(f"Tags: {tags}", styles['normalStyle'])
elements.append(tags_paragraph)
# Spacer
elements.append(Spacer(1, 24))
# Create the SPECIES DATA table with the transposed data
sp_data_table = Table(transposed_sp_data)
# Style the table
sp_data_table.setStyle(TableStyle([
("BACKGROUND", (0, 0), (0, -1), '#e7e7e7'), # Grey background for column 1
("BACKGROUND", (1, 0), (1, -1), colors.white), # White background for column 2
("ALIGN", (0, 0), (-1, -1), "CENTER"),
('FONTNAME', (0, 0), (0, 0), 'Courier'), # Regular font for row1, col1
('FONTNAME', (1, 0), (1, 0), 'Courier'),
('FONTNAME', (0, 1), (-1, -1), 'Courier'), # Regular font for the rest of the table
('FONTNAME', (1, 1), (1, 1), 'Courier-Bold'), # Bold font for row1, col2
("FONTSIZE", (0, 0), (-1, -1), 14),
('BOTTOMPADDING', (0, 0), (-1, -1), 8),
("GRID", (0, 0), (-1, -1), 0.5, colors.black)
]))
# Add SPECIES DATA table
elements.append(sp_data_table)
# Spacer
elements.append(Spacer(1, 32))
# Create the GENOME TRAITS table
genome_traits_table = Table(genome_traits_table_data)
# Style the table
genome_traits_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), '#e7e7e7'),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, -1), 'Courier'),
('FONTSIZE', (0, 0), (-1, -1), 12),
('BOTTOMPADDING', (0, 0), (-1, -1), 8),
("GRID", (0, 0), (-1, -1), 0.5, colors.black)
]))
# Add GENOME TRAITS table
elements.append(genome_traits_table)
# Spacer
elements.append(Spacer(1, 28))
# Add EBP METRICS SECTION subtitle
subtitle = Paragraph("EBP metrics summary and curation notes", styles['subTitleStyle'])
elements.append(subtitle)
# Spacer
elements.append(Spacer(1, 24))
# Iterate over haplotypes in the Curated category to get data for EBP metrics
curated_assemblies = yaml_data.get('ASSEMBLIES', {}).get('Curated', {})
haplotype_names = list(curated_assemblies.keys())
for haplotype in haplotype_names:
properties = curated_assemblies[haplotype]
if 'gfastats--nstar-report_txt' in properties and 'merqury_folder' in properties:
gfastats_path = properties['gfastats--nstar-report_txt']
order = haplotype_names.index(haplotype) # Determine the order based on the position of the haplotype in the list
qv_value = get_qv_value(properties['merqury_folder'], order)
ebp_quality_metric = compute_ebp_metric(haplotype, gfastats_path, qv_value)
EBP_metric_paragraph = Paragraph(ebp_quality_metric, styles["midiStyle"])
# Add the EBP quality metric paragraph to elements
elements.append(EBP_metric_paragraph)
# Spacer
elements.append(Spacer(1, 8))
# Add sentence
Textline = Paragraph("The following metrics were automatically flagged as below EBP recommended standards or different from expected:", styles['midiStyle'])
elements.append(Textline)
# Spacer
elements.append(Spacer(1, 4))
# Apply checks and add warning paragraphs to elements
elements += generate_warning_paragraphs(genome_haploid_length, max_total_bp, "Haploid size (bp)")
elements += generate_warning_paragraphs(haploid_number, obs_haploid_num, "Haploid Number")
elements += generate_warning_paragraphs(proposed_ploidy, ploidy, "Ploidy")
elements += generate_warning_paragraphs(sex, obs_sex, "Sample Sex")
# Spacer
elements.append(Spacer(1, 4))
# Iterate over haplotypes in the Curated category and apply checks
for haplotype in haplotype_names:
properties = curated_assemblies[haplotype]
if isinstance(properties, dict) and 'merqury_folder' in properties and 'busco_short_summary_txt' in properties:
order = haplotype_names.index(haplotype)
qv_value = get_qv_value(properties['merqury_folder'], order)
completeness_value = get_completeness_value(properties['merqury_folder'], order)
busco_scores = extract_busco_values(properties['busco_short_summary_txt'])
warnings = generate_curated_warnings(haplotype, qv_value, completeness_value, busco_scores)
elements += warnings
assembly_warnings = generate_assembly_warnings(asm_data, gaps_per_gbp_data, obs_haploid_num)
elements.extend(assembly_warnings)
# Spacer
elements.append(Spacer(1, 24))
# Add small subtitle for Curator notes
subtitle = Paragraph("Curator notes", styles['normalStyle'])
elements.append(subtitle)
# Spacer
elements.append(Spacer(1, 8))
# Curator notes
curator_notes_paragraph = Paragraph(curator_notes_text, styles["midiStyle"])
elements.append(curator_notes_paragraph)
# Page break
elements.append(PageBreak())
# PDF SECTION 2 -------------------------------------------------------------------------------
# Add quality metrics section subtitle
subtitle = Paragraph("Quality metrics table", styles['TitleStyle'])
elements.append(subtitle)
# Spacer
elements.append(Spacer(1, 48))
# create QUALITY METRICS table
asm_table = Table(asm_table_data)
# Style the table
asm_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), '#e7e7e7'), # grey background for the header
('ALIGN', (0, 0), (-1, -1), 'CENTER'), # center alignment
('FONTNAME', (0, 0), (-1, -1), 'Courier'), # bold font for the header
('FONTSIZE', (0, 0), (-1, -1), 11), # font size
('BOTTOMPADDING', (0, 0), (-1, -1), 8),
("GRID", (0, 0), (-1, -1), 0.5, colors.black)
]))
# Add QUALITY METRICS table
elements.append(asm_table)
# Spacer
elements.append(Spacer(1, 5))
# Store BUSCO version and lineage information from each file in list
busco_info_list = []
for asm_stages, stage_properties in asm_data.items():
for i, haplotype_properties in stage_properties.items():
if isinstance(haplotype_properties, dict):
if 'busco_short_summary_txt' in haplotype_properties:
busco_version, lineage_info = extract_busco_info(haplotype_properties['busco_short_summary_txt'])
if busco_version and lineage_info:
busco_info_list.append((busco_version, lineage_info))
# Checking if all elements in the list are identical
if all(info == busco_info_list[0] for info in busco_info_list):
busco_version, (lineage_name, num_genomes, num_buscos) = busco_info_list[0]
elements.append(Paragraph(f"BUSCO {busco_version} Lineage: {lineage_name} (genomes:{num_genomes}, BUSCOs:{num_buscos})", styles['miniStyle']))
else:
elements.append(Paragraph("Warning: BUSCO versions or lineage datasets are not the same across results", styles['miniStyle']))
logging.warning(f"WARNING!!! BUSCO versions or lineage datasets are not the same across results")
# Page break
elements.append(PageBreak())
# PDF SECTION 3 -------------------------------------------------------------------------------
# Add hic maps section subtitle
subtitle = Paragraph("HiC contact map of curated assembly", styles['TitleStyle'])
elements.append(subtitle)
# Spacer
elements.append(Spacer(1, 36))
# Initialize counter
tool_count = 0
# Add title and images for each step
for asm_stages, stage_properties in asm_data.items():
if asm_stages == 'Curated':
tool_elements = list(stage_properties.keys())
images_with_names = []
for haplotype in tool_elements:
haplotype_properties = stage_properties[haplotype]
# Check if there is an image and/or a link
png_file = haplotype_properties.get('hic_FullMap_png', '')
link = haplotype_properties.get('hic_FullMap_link', '')
# Prepare paragraphs for the image and link
if png_file:
# Create image object
img = Image(png_file, width=11 * cm, height=11 * cm)
images_with_names.append([img])
else:
# Add paragraph for missing image
missing_png_paragraph = Paragraph(f"{haplotype} HiC PNG is missing!", styles["midiStyle"])
images_with_names.append([missing_png_paragraph])
# Add paragraph for the link
if link:
link_html = f'{haplotype} [LINK]'
else:
link_html = f'{haplotype} File link is missing!'
link_paragraph = Paragraph(link_html, styles["midiStyle"])
images_with_names.append([link_paragraph])
# Append a spacer only if the next element is an image
if len(tool_elements) > 1 and tool_elements.index(haplotype) < len(tool_elements) - 1:
images_with_names.append([Spacer(1, 12)])
# Add images and names to the elements in pairs
for i in range(0, len(images_with_names), 4): # Process two images (and their names) at a time
elements_to_add = images_with_names[i:i+4]
# Create table for the images and names
table = Table(elements_to_add)
table.hAlign = 'CENTER'
elements.append(table)
# Add a page break conditionally
next_elements_start = i + 4
if next_elements_start < len(images_with_names):
if len(images_with_names[next_elements_start]) > 0 and isinstance(images_with_names[next_elements_start][0], Image):
elements.append(PageBreak())
tool_count += 1
elements.append(PageBreak())
# PDF SECTION 4 -------------------------------------------------------------------------------
# Add kmer spectra section subtitle
subtitle = Paragraph("K-mer spectra of curated assembly", styles['TitleStyle'])
elements.append(subtitle)
# Spacer
elements.append(Spacer(1, 48))
# Initialize counter
counter = 0
processed_folders = set()
for asm_stages, stage_properties in asm_data.items():
# Check if the stage is 'Curated'
if asm_stages == 'Curated':
stage_elements = list(stage_properties.keys())
for haplotype in stage_elements:
haplotype_properties = stage_properties[haplotype]
if isinstance(haplotype_properties, dict) and 'merqury_folder' in haplotype_properties:
merqury_folder = haplotype_properties['merqury_folder']
# Check if folder has already been processed
if merqury_folder not in processed_folders:
processed_folders.add(merqury_folder)
png_files = get_png_files(haplotype_properties['merqury_folder'])
# Filter out only .spectra-cn.ln.png files and find the shortest one, also skip None values
spectra_cn_files = [f for f in png_files if f and f.endswith("spectra-cn.ln.png")]
shortest_spectra_cn_file = min(spectra_cn_files, key=lambda f: len(os.path.basename(f)), default=None)
# Handle cases based on the number of spectra-cn.ln.png files
if len(spectra_cn_files) == 3:
# For 3 .spectra-cn.ln.png files
similar_files = [f for f in spectra_cn_files if f != shortest_spectra_cn_file]
if similar_files:
unique_name1, unique_name2 = find_unique_parts(similar_files[0], similar_files[1])
else:
# For 2 .spectra-cn.ln.png files
unique_name1 = unique_name2 = None
# Create image objects and add filename below each image
images = []
for png_file in png_files:
if png_file:
image = Image(png_file, width=8.4 * cm, height=7 * cm)
filename = os.path.basename(png_file)
if filename.endswith("spectra-asm.ln.png"):
text = "Distribution of k-mer counts coloured by their presence in reads/assemblies"
elif filename.endswith("spectra-cn.ln.png"):
if len(spectra_cn_files) == 3:
# For 3 spectra-cn files use particular text
if png_file == shortest_spectra_cn_file:
text = "Distribution of k-mer counts per copy numbers found in asm (dipl.)"
else:
text = f"Distribution of k-mer counts per copy numbers found in {unique_name1 if png_file == similar_files[0] else unique_name2} (hapl.)"
else:
# For 2 spectra-cn files use same text
text = "Distribution of k-mer counts per copy numbers found in asm"
else:
text = filename
images.append([image, Paragraph(text, styles["midiStyle"])])
# Filter None values
images = [img for img in images if img[0] is not None]
# get number of rows and columns for the table
num_rows = (len(images) + 1) // 2 # +1 to handle odd numbers of images
num_columns = 2
# Create the table with dynamic size
image_table_data = [[images[i * num_columns + j] if i * num_columns + j < len(images) else [] for j in range(num_columns)] for i in range(num_rows)]
image_table = Table(image_table_data)
# Style the "table"
table_style = TableStyle([
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('BOTTOMPADDING', (0, 0), (-1, -1), 20), #20 here is a spacer between rows
])
# Set the style
image_table.setStyle(table_style)
# Add image table to elements
elements.append(image_table)
# Increase counter by the number of PNGs added
counter += len(png_files)
# If counter is a multiple of 4, insert a page break and reset counter
if counter % 4 == 0:
elements.append(PageBreak())
counter = 0
# Add spacer
elements.append(Spacer(1, 12))
# If we have processed all haps and the last page does not contain exactly 4 images, insert a page break
if counter % 4 != 0:
elements.append(PageBreak())
# PDF SECTION 5 -------------------------------------------------------------------------------
# Add contamination section subtitle
subtitle = Paragraph("Post-curation contamination screening", styles['TitleStyle'])
elements.append(subtitle)
# Spacer
elements.append(Spacer(1, 36))
# Initialize counter
tool_count = 0
# Add title and images for each step
for asm_stages, stage_properties in asm_data.items():
if asm_stages == 'Curated': # Check if the current stage is 'Curated'
tool_elements = list(stage_properties.keys())
for haplotype in tool_elements:
haplotype_properties = stage_properties[haplotype]
if isinstance(haplotype_properties, dict) and 'blobplot_cont_png' in haplotype_properties:
# Get image path
png_file = haplotype_properties['blobplot_cont_png']
# If png_file is not empty, display it
if png_file:
# Create image object
img = Image(png_file, width=20 * cm, height=20 * cm)
elements.append(img)
# Create paragraph for filename with haplotype name
blob_text = f"{haplotype}. Bubble plot circles are scaled by sequence length, positioned by coverage and GC proportion, and coloured by taxonomy. Histograms show total assembly length distribution on each axis."
blob_paragraph = Paragraph(blob_text, styles["midiStyle"])
elements.append(blob_paragraph)
else:
# Add paragraph for missing image
missing_png_paragraph = Paragraph(f"{haplotype} PNG is missing!", styles["midiStyle"])
elements.append(missing_png_paragraph)
# Add a page break after each image and its description
elements.append(PageBreak())
tool_count += 1
# SECTION 6 -----------------------------------------------------------------------------------
# Add data profile section subtitle
subtitle = Paragraph("Data profile", styles['TitleStyle'])
elements.append(subtitle)
# Spacer
elements.append(Spacer(1, 24))
# Create the DATA PROFILE table
data_table = Table(table_data)
# Style the table
data_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), '#e7e7e7'), # grey background for the first column
('ALIGN', (0, 0), (-1, -1), 'CENTER'), # center alignment
('FONTNAME', (0, 0), (-1, -1), 'Courier'), # remove bold font
('FONTSIZE', (0, 0), (-1, -1), 12), # font size for the header
('BOTTOMPADDING', (0, 0), (-1, -1), 8),
("GRID", (0, 0), (-1, -1), 0.5, colors.black)
]))
# Add DATA PROFILE table
elements.append(data_table)
# Spacer
elements.append(Spacer(1, 32))
# Add assembly pipeline section subtitle
subtitle = Paragraph("Assembly pipeline", styles['TitleStyle'])
elements.append(subtitle)
# Spacer
elements.append(Spacer(1, 24))
# Add ASM PIPELINE tree
elements.append(Paragraph(asm_pipeline_tree, styles['treeStyle']))
# Spacer
elements.append(Spacer(1, 32))
# Add curation pipeline section subtitle
subtitle = Paragraph("Curation pipeline", styles['TitleStyle'])
elements.append(subtitle)
# Spacer
elements.append(Spacer(1, 24))
# Add CURATION PIPELINE tree
elements.append(Paragraph(curation_pipeline_tree, styles['treeStyle']))
# Spacer
elements.append(Spacer(1, 48))
# Add submitter, affiliation
submitter_paragraph_style = ParagraphStyle(name='SubmitterStyle', fontName='Courier', fontSize=10)
elements.append(Paragraph(f"Submitter: {submitter}", submitter_paragraph_style))
elements.append(Paragraph(f"Affiliation: {affiliation}", submitter_paragraph_style))
# Spacer
elements.append(Spacer(1, 8))
# Add the date and time (CET) of the document creation
cet = pytz.timezone("CET")
current_datetime = datetime.now(cet)
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S %Z")
elements.append(Paragraph(f"Date and time: {formatted_datetime}", submitter_paragraph_style))
# Build the PDF ###############################################################################
pdf.build(elements)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Create an ERGA Assembly Report (EAR) from a YAML file. Visit https://github.com/ERGA-consortium/EARs for more information')
parser.add_argument('yaml_file', type=str, help='Path to the YAML file')
args = parser.parse_args()
make_report(args.yaml_file)