# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. """Columnar data structures and the ProfileProcessor aggregator for BHR. Ported from python_mozetl/mozetl/bhr_collection/bhr_collection.py as part of the bhr_collection migration. The semantics are unchanged — these classes take symbolicated, heuristic-trimmed hang samples and build the columnar output schema (stackTable / funcTable / stringArray / sampleTable / annotationsTable / dates / libs) the frontend consumes. The "(root)" sentinel at index 0 of stackTable / pruneStackCache is intentional: the frontend's stack walker terminates when prefix == 0, so keeping that slot reserved is load-bearing. """ import random import re def to_struct_of_arrays(a): if len(a) == 0: raise Exception("Need at least one item in array for this to work.") result = {k: [e[k] for e in a] for k in a[0].keys()} result["length"] = len(a) return result class UniqueKeyedTable: def __init__(self, get_default_from_key, key_names=()): self.get_default_from_key = get_default_from_key self.key_to_index_map = {} self.key_names = key_names self.items = [] def key_to_index(self, key): if key in self.key_to_index_map: return self.key_to_index_map[key] index = len(self.items) self.items.append(self.get_default_from_key(key)) self.key_to_index_map[key] = index return index def key_to_item(self, key): return self.items[self.key_to_index(key)] def index_to_item(self, index): return self.items[index] def get_items(self): return self.items def inner_struct_of_arrays(self, items): if len(items) == 0: raise Exception("Need at least one item in array for this to work.") result = {} num_keys = len(self.key_names) for i in range(0, num_keys): result[self.key_names[i]] = [x[i] for x in items] result["length"] = len(items) return result def struct_of_arrays(self): return self.inner_struct_of_arrays(self.items) def sorted_struct_of_arrays(self, key): return self.inner_struct_of_arrays(sorted(self.items, key=key)) class GrowToFitList(list): def __setitem__(self, index, value): if index >= len(self): to_grow = index + 1 - len(self) self.extend([None] * to_grow) list.__setitem__(self, index, value) def __getitem__(self, index): if index >= len(self): return None return list.__getitem__(self, index) def get_default_lib(name): return { "name": re.sub(r"\.pdb$", "", name), "offset": 0, "path": "", "debugName": name, "debugPath": name, "arch": "", } def get_default_thread(name, minimal_sample_table): strings_table = UniqueKeyedTable(lambda str: str) libs = UniqueKeyedTable(get_default_lib) func_table = UniqueKeyedTable( lambda key: ( strings_table.key_to_index(key[0]), None if key[1] is None else libs.key_to_index(key[1]), ), ("name", "lib"), ) stack_table = UniqueKeyedTable( lambda key: (key[2], func_table.key_to_index((key[0], key[1]))), ("prefix", "func"), ) annotations_table = UniqueKeyedTable( lambda key: ( key[0], strings_table.key_to_index(key[1]), strings_table.key_to_index(key[2]), ), ("prefix", "name", "value"), ) if minimal_sample_table: sample_table = UniqueKeyedTable( lambda key: ( key[0], strings_table.key_to_index(key[1]), key[2], strings_table.key_to_index(key[3]), ), ("stack", "platform"), ) else: sample_table = UniqueKeyedTable( lambda key: ( key[0], strings_table.key_to_index(key[1]), key[2], strings_table.key_to_index(key[3]), ), ("stack", "runnable", "annotations", "platform"), ) stack_table.key_to_index(("(root)", None, None)) prune_stack_cache = UniqueKeyedTable(lambda key: [0.0]) prune_stack_cache.key_to_index(("(root)", None, None)) return { "name": name, "libs": libs, "funcTable": func_table, "stackTable": stack_table, "annotationsTable": annotations_table, "pruneStackCache": prune_stack_cache, "sampleTable": sample_table, "stringArray": strings_table, "processType": "tab" if name == "Gecko_Child" else "default", "dates": UniqueKeyedTable( lambda date: ({ "date": date, "sampleHangMs": GrowToFitList(), "sampleHangCount": GrowToFitList(), }), ("date", "sampleHangMs", "sampleHangCount"), ), } def reconstruct_stack(string_array, func_table, stack_table, lib_table, stack_index): result = [] while stack_index != 0: func_index = stack_table["func"][stack_index] prefix = stack_table["prefix"][stack_index] func_name = string_array[func_table["name"][func_index]] lib_name = lib_table[func_table["lib"][func_index]]["debugName"] result.append((func_name, lib_name)) stack_index = prefix return result[::-1] def merge_number_dicts(a, b): keys = set(a.keys()).union(set(b.keys())) return {k: a.get(k, 0.0) + b.get(k, 0.0) for k in keys} class ProfileProcessor: def __init__(self, config): self.config = config def default_thread_closure(name): return get_default_thread(name, config["use_minimal_sample_table"]) self.thread_table = UniqueKeyedTable(default_thread_closure) self.usage_hours_by_date = {} def debug_dump(self, dump_str): if self.config["print_debug_info"]: print(dump_str) def ingest_processed_profile(self, profile): for existing_thread in self.thread_table.get_items(): prune_stack_cache = UniqueKeyedTable(lambda key: [0.0]) prune_stack_cache.key_to_index(("(root)", None, None)) existing_thread["pruneStackCache"] = prune_stack_cache sample_size = self.config["post_sample_size"] threads = profile["threads"] for other in threads: other_samples = other["sampleTable"] other_dates = other["dates"] for date in other_dates: build_date = date["date"] for i in range(0, len(date["sampleHangCount"])): stack_index = other_samples["stack"][i] stack = reconstruct_stack( other["stringArray"], other["funcTable"], other["stackTable"], other["libs"], stack_index, ) self.pre_ingest_row(( stack, other["stringArray"][other_samples["runnable"][i]], other["name"], build_date, other_samples["annotations"][i], other["stringArray"][other_samples["platform"][i]], date["sampleHangMs"][i], date["sampleHangCount"][i], )) for date in other_dates: build_date = date["date"] for i in range(0, len(date["sampleHangCount"])): stack_index = other_samples["stack"][i] stack = reconstruct_stack( other["stringArray"], other["funcTable"], other["stackTable"], other["libs"], stack_index, ) if sample_size == 1.0 or random.random() <= sample_size: self.ingest_row(( stack, other["stringArray"][other_samples["runnable"][i]], other["name"], build_date, other_samples["annotations"][i], other["stringArray"][other_samples["platform"][i]], date["sampleHangMs"][i], date["sampleHangCount"][i], )) self.usage_hours_by_date = merge_number_dicts( self.usage_hours_by_date, profile.get("usageHoursByDate", {}) ) def pre_ingest_row(self, row): ( stack, runnable_name, thread_name, build_date, annotations, platform, hang_ms, hang_count, ) = row thread = self.thread_table.key_to_item(thread_name) prune_stack_cache = thread["pruneStackCache"] root_stack = prune_stack_cache.key_to_item(("(root)", None, None)) root_stack[0] += hang_ms last_stack = 0 for func_name, lib_name in stack: last_stack = prune_stack_cache.key_to_index(( func_name, lib_name, last_stack, )) cache_item = prune_stack_cache.index_to_item(last_stack) cache_item[0] += hang_ms def ingest_row(self, row): ( stack, runnable_name, thread_name, build_date, annotations, platform, hang_ms, hang_count, ) = row thread = self.thread_table.key_to_item(thread_name) stack_table = thread["stackTable"] annotations_table = thread["annotationsTable"] sample_table = thread["sampleTable"] dates = thread["dates"] prune_stack_cache = thread["pruneStackCache"] last_annotation = None for name, value in annotations: last_annotation = annotations_table.key_to_index(( last_annotation, name, value, )) last_stack = 0 last_cache_item_index = 0 for func_name, lib_name in stack: cache_item_index = prune_stack_cache.key_to_index(( func_name, lib_name, last_cache_item_index, )) cache_item = prune_stack_cache.index_to_item(cache_item_index) parent_cache_item = prune_stack_cache.index_to_item(last_cache_item_index) if ( cache_item[0] / parent_cache_item[0] > self.config["stack_acceptance_threshold"] ): last_stack = stack_table.key_to_index((func_name, lib_name, last_stack)) last_cache_item_index = cache_item_index else: # Below the acceptance threshold — lump under "(other)" beneath # the parent rather than continuing to expand the tree. last_stack = stack_table.key_to_index(("(other)", lib_name, last_stack)) break if self.config["use_minimal_sample_table"] and thread_name == "Gecko_Child": return sample_index = sample_table.key_to_index(( last_stack, runnable_name, last_annotation, platform, )) date = dates.key_to_item(build_date) if date["sampleHangCount"][sample_index] is None: date["sampleHangCount"][sample_index] = 0.0 date["sampleHangMs"][sample_index] = 0.0 date["sampleHangCount"][sample_index] += hang_count date["sampleHangMs"][sample_index] += hang_ms def ingest(self, data, usage_hours_by_date): print(f"{len(data)} unfiltered samples in data") data = [ x for x in data # x[6] is hang_ms if x[6] > 0.0 ] print(f"{len(data)} filtered samples in data") print("Preprocessing stacks for prune cache...") for row in data: self.pre_ingest_row(row) print("Processing stacks...") for row in data: self.ingest_row(row) self.usage_hours_by_date = merge_number_dicts( self.usage_hours_by_date, usage_hours_by_date ) def process_date(self, date): if self.config["use_minimal_sample_table"]: return { "date": date["date"], "sampleHangCount": date["sampleHangCount"], } return date def process_thread(self, thread): string_array = thread["stringArray"] func_table = thread["funcTable"].struct_of_arrays() stack_table = thread["stackTable"].struct_of_arrays() annotations_table = thread["annotationsTable"].struct_of_arrays() sample_table = thread["sampleTable"].struct_of_arrays() return { "name": thread["name"], "processType": thread["processType"], "libs": thread["libs"].get_items(), "funcTable": func_table, "stackTable": stack_table, "annotationsTable": annotations_table, "sampleTable": sample_table, "stringArray": string_array.get_items(), "dates": [self.process_date(d) for d in thread["dates"].get_items()], } def process_into_split_profile(self): return { "main_payload": { "splitFiles": { t["name"]: [k for k in t.keys() if k != "name"] for t in self.thread_table.get_items() }, "usageHoursByDate": self.usage_hours_by_date, "uuid": self.config["uuid"], "isSplit": True, }, "file_data": [ [ (t["name"] + "_" + k, v) for k, v in self.process_thread(t).iteritems() if k != "name" ] for t in self.thread_table.get_items() ], } def process_into_profile(self): print("Processing into final format...") if self.config["split_threads_in_out_file"]: return [ { "name": t["name"], "threads": [self.process_thread(t)], "usageHoursByDate": self.usage_hours_by_date, "uuid": self.config["uuid"], } for t in self.thread_table.get_items() ] return { "threads": [self.process_thread(t) for t in self.thread_table.get_items()], "usageHoursByDate": self.usage_hours_by_date, "uuid": self.config["uuid"], }