/* * ADIOS is freely available under the terms of the BSD license described * in the COPYING file in the top level directory of this source distribution. * * Copyright (c) 2008 - 2009. UT-BATTELLE, LLC. All rights reserved. */ #include "config.h" #ifndef _GNU_SOURCE # define _GNU_SOURCE #endif #include #include #include #include #include #include #include #include "adios_types.h" #include "adios_internals.h" #include "adios_transport_hooks.h" #include "adios_bp_v1.h" #include "bp_utils.h" #include "adios_transforms_common.h" // NCSU ALACRITY-ADIOS #include "adios_transforms_read.h" // NCSU ALACRITY-ADIOS #if HAVE_PTHREAD # include "pthread.h" #endif #define DIVIDER "========================================================\n" // User arguments int verbose=0; // 1: print summary, 2: print indexes 3: print working info int removeZNTB=0; // 1: remove non-transformed,zero blocks from a transformed variable int nthreads=1; // Number of threads to use (main counts as 1 thread) char * filename; // process 'filename'.dir/'filename'.NNN subfiles and // generate metadata file 'filename' int nsubfiles=0; // number of subfiles to process struct option options[] = { {"help", no_argument, NULL, 'h'}, {"verbose", no_argument, NULL, 'v'}, {"nsubfiles", required_argument, NULL, 'n'}, {"zero-blocks", no_argument, NULL, 'z'}, #if HAVE_PTHREAD {"nthreads", required_argument, NULL, 't'}, #endif {NULL, 0, NULL, 0} }; #if HAVE_PTHREAD static const char *optstring = "hvzn:t:"; #else static const char *optstring = "hvzn:"; #endif // help function void display_help() { printf ("usage: bpmeta [OPTIONS] -n \n" "\nbpmeta processes .dir/. subfiles and\n" "generates a metadata file .\n" "\nIt is used to generate the missing metadata file after using\n" "the MPI_AGGREGATE output method with 'have_metada_file=0' option.\n" "\n" " --nsubfiles | -n The number of subfiles to process in\n" " .dir\n" #if HAVE_PTHREAD " --nthreads | -t Parallel reading with threads.\n" " The main thread is counted in.\n" #endif "\n" "Help options\n" " --help | -h Print this help.\n" " --verbose | -v Print log about what this program is doing.\n" " Use multiple -v to increase logging level.\n" "Other options\n" " --zero-blocks | -z Remove all zero sized data blocks from index\n" "\n" "Typical use: bpmeta -t 16 -n 1024 mydata.bp\n" ); } /* Global variables among threads */ struct adios_bp_buffer_struct_v1 ** b = 0; /* sub-index structure variables */ struct adios_index_struct_v1 ** subindex; int process_subfiles (int tid, int startidx, int endidx); int write_index (struct adios_index_struct_v1 * index, char * fname); int get_nsubfiles (char *filename); void print_pg_index ( int tid, struct adios_index_process_group_struct_v1 * pg_root); void print_variable_index (int tid, struct adios_index_var_struct_v1 * vars_root); void print_attribute_index (int tid, struct adios_index_attribute_struct_v1 * attrs_root); void remove_zero_blocks (int tid, struct adios_index_var_struct_v1 ** vars_root, int idx); #if HAVE_PTHREAD struct thread_args { int tid; int startidx; int endidx; }; void * thread_main (void *arg) { struct thread_args *targ = (struct thread_args *) arg; process_subfiles (targ->tid, targ->startidx, targ->endidx); pthread_exit(NULL); return NULL; // just to avoid compiler warning } #endif int main (int argc, char ** argv) { long int tmp; int c; while ((c = getopt_long(argc, argv, optstring, options, NULL)) != -1) { switch (c) { case 'n': case 't': errno = 0; tmp = strtol(optarg, (char **)NULL, 0); if (errno) { fprintf(stderr, "Error: could not convert -%c value: %s\n", c, optarg); return 1; } if (c == 'n') nsubfiles=tmp; else nthreads=tmp; break; case 'h': display_help(); return 0; break; case 'v': verbose++; break; case 'z': removeZNTB=1; break; default: printf("Unrecognized argument: %s\n", optarg); break; } } /* Check if we have a file defined */ if (optind >= argc) { printf ("Missing file name\n"); display_help(); return 1; } filename = strdup(argv[optind++]); if (nsubfiles < 1) nsubfiles = get_nsubfiles (filename); if (nsubfiles < 1) { printf ("Cannot determine the number of subfiles. To avoid this problem, " "provide the number of subfiles manually with the -n option.\n"); return -1; } if (nthreads < 1) nthreads = 1; if (nthreads > nsubfiles) { printf ("Warning: asked for processing %d subfiles using %d threads. " "We will utilize only %d threads.\n", nsubfiles, nthreads, nsubfiles); nthreads = nsubfiles; } if (verbose>1) printf ("Create metadata file %s from %d subfiles using %d threads\n", filename, nsubfiles, nthreads); /* Initialize global variables */ b = malloc (nsubfiles * sizeof (struct adios_bp_buffer_struct_v1*)); subindex = malloc (nthreads * sizeof (struct adios_index_struct_v1*)); /* Split the processing work among T threads */ int tid; #if HAVE_PTHREAD pthread_t *thread = (pthread_t *) malloc (nthreads * sizeof(pthread_t)); struct thread_args *targs = (struct thread_args*) malloc (nthreads * sizeof(struct thread_args)); int K = nsubfiles/nthreads; // base number of files to be processed by one thread int L = nsubfiles%nthreads; // this many threads processes one more subfile int startidx, endidx; int rc; //printf ("K=%d L=%d\n", K, L); endidx = -1; for (tid=0; tid1) printf ("Thread %d: Joined thread.\n", tid); } } free (targs); free (thread); #else /* non-threaded version */ nthreads = 1; process_subfiles (0, 0, nsubfiles-1); #endif /* Merge the T indexes into the global output index */ struct adios_index_struct_v1 * globalindex; globalindex = adios_alloc_index_v1(1); for (tid=0; tidpg_root, subindex[tid]->vars_root, subindex[tid]->attrs_root, 1); } write_index (globalindex, filename); /* Clean-up */ adios_clear_index_v1 (globalindex); /*... already cleaned-up by globalindex clearing for (tid=0; tid2) { printf ("buffer=%p size=%" PRId64 " offset=%" PRId64 "\n", buffer, buffer_size, buffer_offset); } adios_write_version_flag_v1 (&buffer, &buffer_size, &buffer_offset, flag); if (verbose>2) { printf ("buffer=%p size=%" PRId64 " offset=%" PRId64 "\n", buffer, buffer_size, buffer_offset); } f = open (fname, O_CREAT | O_WRONLY | O_TRUNC, 0644); if (f == -1) { fprintf (stderr, "Failed to create metadata file %s: %s\n", fname, strerror(errno)); return -1; } uint64_t bytes_written = 0; size_t to_write; const size_t MAX_WRITE_SIZE = 0x7ffff000; // = 2,147,479,552 if (verbose) { printf ("Total size of metadata to write = %" PRIu64 " bytes\n", buffer_offset); } while (bytes_written < buffer_offset) { if (buffer_offset - bytes_written > MAX_WRITE_SIZE) { to_write = MAX_WRITE_SIZE; } else { to_write = (size_t) (buffer_offset - bytes_written); } if (verbose>2) { printf ("total to write=%" PRIu64 " write now =%ld written so far=%" PRId64 "\n", buffer_offset, to_write, bytes_written); } ssize_t wrote = write (f, buffer+bytes_written, to_write); bytes_written += wrote; if (verbose>2) { printf ("wrote=%ld bytes to file\n", wrote); } if (wrote == -1) { fprintf (stderr, "Failed to write metadata to file %s: %s\n", fname, strerror(errno)); break; } else if (wrote != to_write) { fprintf (stderr, "Failed to write (a portion of) metadata of %" PRId64 " bytes to file %s. " "Only wrote %ld bytes\n", buffer_offset, fname, wrote); } } if (bytes_written != (ssize_t) buffer_offset) { fprintf (stderr, "Failed to write total metadata of %" PRId64 " bytes to file %s. " "Only wrote %lld bytes\n", buffer_offset, fname, (long long)bytes_written); } close(f); return 0; } int process_subfiles (int tid, int startidx, int endidx) { char fn[256]; uint32_t version = 0; int idx; int rc = 0; subindex[tid] = adios_alloc_index_v1(1); for (idx=startidx; idx<=endidx; idx++) { b[idx] = malloc (sizeof (struct adios_bp_buffer_struct_v1)); adios_buffer_struct_init (b[idx]); snprintf (fn, 256, "%s.dir/%s.%d", filename, filename, idx); rc = adios_posix_open_read_internal (fn, "", b[idx]); if (!rc) { fprintf (stderr, "bpmeta: file not found: %s\n", fn); return -1; } adios_posix_read_version (b[idx]); adios_parse_version (b[idx], &version); version = version & ADIOS_VERSION_NUM_MASK; if (verbose) { //printf (DIVIDER); printf ("Thread %d: Metadata of %s:\n", tid, fn); printf ("Thread %d: BP format version: %d\n", tid, version); } if (version < 2) { fprintf (stderr, "bpmeta: This version of bpmeta can only work with BP format version 2 and up. " "Use an older bpmeta from adios 1.6 to work with this file.\n"); adios_posix_close_internal (b[idx]); return -1; } struct adios_index_process_group_struct_v1 * new_pg_root = 0; struct adios_index_var_struct_v1 * new_vars_root = 0; struct adios_index_attribute_struct_v1 * new_attrs_root = 0; adios_posix_read_index_offsets (b[idx]); adios_parse_index_offsets_v1 (b[idx]); /* printf ("End of process groups = %" PRIu64 "\n", b->end_of_pgs); printf ("Process Groups Index Offset = %" PRIu64 "\n", b->pg_index_offset); printf ("Process Groups Index Size = %" PRIu64 "\n", b->pg_size); printf ("Variable Index Offset = %" PRIu64 "\n", b->vars_index_offset); printf ("Variable Index Size = %" PRIu64 "\n", b->vars_size); printf ("Attribute Index Offset = %" PRIu64 "\n", b->attrs_index_offset); printf ("Attribute Index Size = %" PRIu64 "\n", b->attrs_size); */ adios_posix_read_process_group_index (b[idx]); adios_parse_process_group_index_v1 (b[idx], &new_pg_root, NULL); print_pg_index (tid, new_pg_root); adios_posix_read_vars_index (b[idx]); adios_parse_vars_index_v1 (b[idx], &new_vars_root, NULL, NULL);\ if (removeZNTB) { remove_zero_blocks(tid, &new_vars_root, idx); } print_variable_index (tid, new_vars_root); if (idx == 0) { // only read attributes from the very first file. we don't merge attributes any more adios_posix_read_attributes_index (b[idx]); adios_parse_attributes_index_v1 (b[idx], &new_attrs_root); print_attribute_index (tid, new_attrs_root); } adios_merge_index_v1 (subindex[tid], new_pg_root, new_vars_root, new_attrs_root, 1); adios_posix_close_internal (b[idx]); adios_shared_buffer_free (b[idx]); } if (verbose>1) { //printf (DIVIDER); printf ("Thread %d: End of reading all subfiles\n", tid); } return 0; } int get_nsubfiles (char *filename) { char pattern[256]; glob_t g; int err,ret; snprintf (pattern, 256, "%s.dir/%s.*", filename, filename); err = glob (pattern, GLOB_ERR | GLOB_NOSORT, NULL, &g); if (!err) { ret = g.gl_pathc; } else { switch (err) { case GLOB_NOMATCH: printf ("ERROR: No matching file found for %s\n", pattern); break; case GLOB_NOSPACE: printf ("ERROR: Not enough memory for running glob for pattern %s\n", pattern); break; case GLOB_ABORTED: printf ("ERROR: glob was aborted by a reading error for pattern %s\n", pattern); printf ("errno = %d: %s\n", errno, strerror(errno)); break; } ret = 0; } return ret; } void print_pg_index (int tid, struct adios_index_process_group_struct_v1 * pg_root) { unsigned int npg=0; if (verbose>1) { //printf (DIVIDER); printf ("Thread %d: Process Groups Index:\n", tid); } while (pg_root) { if (verbose>1) { printf ("Thread %d: Group: %s\n", tid, pg_root->group_name); printf ("Thread %d: \tProcess ID: %d\n", tid, pg_root->process_id); printf ("Thread %d: \tTime Name: %s\n", tid, pg_root->time_index_name); printf ("Thread %d: \tTime: %d\n", tid, pg_root->time_index); printf ("Thread %d: \tOffset in File: %" PRIu64 "\n", tid, pg_root->offset_in_file); } pg_root = pg_root->next; npg++; } if (verbose==1) { printf ("Thread %d: Number of process groups: %u\n", tid, npg); } } void print_variable_index (int tid, struct adios_index_var_struct_v1 * vars_root) { unsigned int nvars=0; if (verbose>1) { //printf (DIVIDER); printf ("Thread %d: Variable Index:\n", tid); } while (vars_root) { if (verbose>1) { if (!strcmp (vars_root->var_path, "/")) { printf ("Thread %d: Var (Group) [ID]: /%s (%s) [%d]\n", tid, vars_root->var_name,vars_root->group_name, vars_root->id ); } else { printf ("Thread %d: Var (Group) [ID]: %s/%s (%s) [%d]\n", tid, vars_root->var_path, vars_root->var_name, vars_root->group_name, vars_root->id ); } const char * typestr = adios_type_to_string_int (vars_root->type); printf ("Thread %d: \tDatatype: %s\n", tid, typestr); printf ("Thread %d: \tVars Characteristics: %" PRIu64 "\n", tid, vars_root->characteristics_count ); uint64_t i; for (i = 0; i < vars_root->characteristics_count; i++) { printf ("Thread %d: \tOffset(%" PRIu64 ")", tid, vars_root->characteristics [i].offset); printf ("\tPayload Offset(%" PRIu64 ")", vars_root->characteristics [i].payload_offset); printf ("\tFile Index(%d)", vars_root->characteristics [i].file_index); printf ("\tTime Index(%d)", vars_root->characteristics [i].time_index); /* NCSU - Print min, max */ if (vars_root->type == adios_complex || vars_root->type == adios_double_complex) { uint8_t type; if (vars_root->type == adios_complex) type = adios_double; else type = adios_long_double; if (vars_root->characteristics [i].stats && vars_root->characteristics [i].stats[0][adios_statistic_min].data) { printf ("\tMin(%s)", bp_value_to_string (type ,vars_root->characteristics [i].stats[0][adios_statistic_min].data ) ); } if (vars_root->characteristics [i].stats && vars_root->characteristics [i].stats[0][adios_statistic_max].data) { printf ("\tMax(%s)", bp_value_to_string (type ,vars_root->characteristics [i].stats[0][adios_statistic_max].data ) ); } } else { if (vars_root->characteristics [i].stats && vars_root->characteristics [i].stats[0][adios_statistic_min].data) { printf ("\tMin(%s)", bp_value_to_string (vars_root->type ,vars_root->characteristics [i].stats[0][adios_statistic_min].data ) ); } if (vars_root->characteristics [i].stats && vars_root->characteristics [i].stats[0][adios_statistic_max].data) { printf ("\tMax(%s)", bp_value_to_string (vars_root->type ,vars_root->characteristics [i].stats[0][adios_statistic_max].data ) ); } } //*/ if (vars_root->characteristics [i].value) { if (vars_root->type != adios_string) printf ("\tValue(%s)", bp_value_to_string (vars_root->type, vars_root->characteristics [i].value)); } if (vars_root->characteristics [i].dims.count != 0) { int j; printf ("\tDims (l:g:o): ("); for (j = 0; j < vars_root->characteristics [i].dims.count; j++) { if (j != 0) printf (","); if ( vars_root->characteristics [i].dims.dims [j * 3 + 1] != 0 ) { printf ("%" PRIu64 ":%" PRIu64 ":%" PRIu64 ,vars_root->characteristics [i].dims.dims [j * 3 + 0] ,vars_root->characteristics [i].dims.dims [j * 3 + 1] ,vars_root->characteristics [i].dims.dims [j * 3 + 2] ); } else { printf ("%" PRIu64 ,vars_root->characteristics [i].dims.dims [j * 3 + 0] ); } } printf (")"); } // NCSU ALACRITY-ADIOS - Print transform info if (vars_root->characteristics[i].transform.transform_type != adios_transform_none) { struct adios_index_characteristic_transform_struct *transform = &vars_root->characteristics[i].transform; struct adios_index_characteristic_dims_struct_v1 *dims = &transform->pre_transform_dimensions; int j; printf ("\tTransform type: %s (ID = %hhu)", adios_transform_plugin_desc(transform->transform_type), transform->transform_type); printf ("\tPre-transform datatype: %s", adios_type_to_string_int(transform->pre_transform_type)); printf ("\tPre-transform dims (l:g:o): ("); for (j = 0; j < dims->count; j++) { if (j != 0) printf (","); if ( dims->dims [j * 3 + 1] != 0 ) { printf ("%" PRIu64 ":%" PRIu64 ":%" PRIu64 ,dims->dims [j * 3 + 0] ,dims->dims [j * 3 + 1] ,dims->dims [j * 3 + 2] ); } else { printf ("%" PRIu64 ,dims->dims [j * 3 + 0] ); } } printf (")"); } printf ("\n"); } } vars_root = vars_root->next; nvars++; } if (verbose==1) { printf ("Thread %d: Number of variables: %u\n", tid, nvars); } } int is_zero_block(struct adios_index_characteristic_struct_v1 * characteristic) { int retval = 0; if (characteristic->dims.count > 0) { int j; retval = 1; // assume zero block and negate if not for (j = 0; j < characteristic->dims.count; j++) { if (characteristic->dims.dims [j * 3 + 0] != 0) { retval = 0; } } } return retval; } /* Remove zero size data blocks from array variables */ void remove_zero_blocks (int tid, struct adios_index_var_struct_v1 ** vars_root, int idx) { unsigned int nvars=0; if (verbose>1) { //printf (DIVIDER); printf ("Thread %d: Look for non-transformed zero blocks in variables with transformation:\n", tid); } struct adios_index_var_struct_v1 * var_prev = NULL; struct adios_index_var_struct_v1 * var = *vars_root; while (var) { int nZB=0; // ZNTB = Zero and Non-Transformed Block, which we want to remove int first_good_block=-1; int is_transformed; /* * First, check if the variable has zero blocks and also has transformed blocks */ uint64_t i; for (i = 0; i < var->characteristics_count; i++) { if (is_zero_block(&var->characteristics[i])) { ++nZB; } if (var->characteristics[i].transform.transform_type != adios_transform_none) { is_transformed = 1; } } if (nZB > 0) { char name[256]; if (!var->var_path || strlen(var->var_path) == 0) { sprintf (name, "%s", var->var_name); } else { sprintf (name, "%s/%s", var->var_path, var->var_name); } printf ("Thread %d: *** Variable %s has %d ZERO BLOCKS among %" PRIu64 " blocks in subfile %d ***\n", tid, name, nZB, var->characteristics_count, idx); } /* * Remove the zero blocks that are not-transformed from the lists * if there is any transformed blocks */ if (nZB == var->characteristics_count) { // All blocks in this list is zero, remove the whole variable printf ("Thread %d: Remove entire variable from subfile index %d\n", tid, idx); if (var_prev == NULL) { // vars_root need to be changed here vars_root = &(var->next); } else { var_prev->next = var->next; } // This variable index is not freed so its a leak } else if (nZB > 0) { printf ("Thread %d: Remove zero, non-transformed blocks from subfile index %d\n", tid, idx); struct adios_index_characteristic_struct_v1 * old_ch = var->characteristics; struct adios_index_characteristic_struct_v1 * new_ch = (struct adios_index_characteristic_struct_v1 *) malloc ((var->characteristics_count - nZB) * sizeof(struct adios_index_characteristic_struct_v1)); uint64_t oldidx; uint64_t newidx = 0; for (oldidx = 0; oldidx < var->characteristics_count; oldidx++) { if (!is_zero_block(&old_ch[oldidx])) { // Retain this block in index memcpy(&new_ch[newidx], &old_ch[oldidx], sizeof(struct adios_index_characteristic_struct_v1)); new_ch[newidx].value = old_ch[oldidx].value; new_ch[newidx].stats = old_ch[oldidx].stats; new_ch[newidx].transform.transform_metadata = old_ch[oldidx].transform.transform_metadata; ++newidx; } } var->characteristics = new_ch; var->characteristics_count = newidx; const char * typestr = adios_type_to_string_int (var->type); printf ("Thread %d: %" PRIu64 " blocks kept in index. ", tid, newidx); if (is_transformed) { var->type = adios_byte; printf ("Type changed from %s to byte", typestr); } printf("\n"); } var_prev = var; var = var->next; nvars++; } } void print_attribute_index (int tid, struct adios_index_attribute_struct_v1 * attrs_root) { unsigned int nattrs=0; if (verbose>1) { //printf (DIVIDER); printf ("Thread %d: Attribute Index:\n", tid); } while (attrs_root) { if (verbose>1) { if (!strcmp (attrs_root->attr_path, "/")) { printf ("Thread %d: Attribute (Group) [ID]: /%s (%s) [%d]\n", tid, attrs_root->attr_name, attrs_root->group_name, attrs_root->id ); } else { printf ("Thread %d: Attribute (Group) [ID]: %s/%s (%s) [%d]\n", tid, attrs_root->attr_path, attrs_root->attr_name, attrs_root->group_name, attrs_root->id ); } printf ("Thread %d: \tDatatype: %s\n", tid, adios_type_to_string_int (attrs_root->type)); printf ("Thread %d: \tAttribute Characteristics: %" PRIu64 "\n", tid, attrs_root->characteristics_count ); uint64_t i; for (i = 0; i < attrs_root->characteristics_count; i++) { printf ("Thread %d: \t\tOffset(%" PRIu64 ")", tid, attrs_root->characteristics [i].offset); printf ("\t\tPayload Offset(%" PRIu64 ")", attrs_root->characteristics [i].payload_offset); printf ("\t\tFile Index(%d)", attrs_root->characteristics [i].file_index); printf ("\t\tTime Index(%d)", attrs_root->characteristics [i].time_index); /* NCSU - Print min, max */ if (attrs_root->type == adios_complex || attrs_root->type == adios_double_complex) { uint8_t type; if (attrs_root->type == adios_complex) type = adios_double; else type = adios_long_double; if (attrs_root->characteristics [i].stats && attrs_root->characteristics [i].stats[0][adios_statistic_min].data) { printf ("\tMin(%s)", bp_value_to_string (type ,attrs_root->characteristics [i].stats[0][adios_statistic_min].data ) ); } if (attrs_root->characteristics [i].stats && attrs_root->characteristics [i].stats[0][adios_statistic_max].data) { printf ("\tMax(%s)", bp_value_to_string (type ,attrs_root->characteristics [i].stats[0][adios_statistic_max].data ) ); } } else { if (attrs_root->characteristics [i].stats && attrs_root->characteristics [i].stats[0][adios_statistic_min].data) { printf ("\tMin(%s)", bp_value_to_string (attrs_root->type ,attrs_root->characteristics [i].stats[0][adios_statistic_min].data ) ); } if (attrs_root->characteristics [i].stats && attrs_root->characteristics [i].stats[0][adios_statistic_max].data) { printf ("\tMax(%s)", bp_value_to_string (attrs_root->type ,attrs_root->characteristics [i].stats[0][adios_statistic_max].data ) ); } } if (attrs_root->characteristics [i].value) { printf ("\t\tValue(%s)", bp_value_to_string (attrs_root->type ,attrs_root->characteristics [i].value ) ); } if (attrs_root->characteristics [i].var_id) { printf ("\t\tVar(%u)", attrs_root->characteristics [i].var_id); } if (attrs_root->characteristics [i].dims.count != 0) { int j; printf ("\t\tDims (l:g:o): ("); for (j = 0; j < attrs_root->characteristics [i].dims.count; j++) { if (j != 0) printf (","); if ( attrs_root->characteristics [i].dims.dims [j * 3 + 1] != 0 ) { printf ("%" PRIu64 ":%" PRIu64 ":%" PRIu64 ,attrs_root->characteristics [i].dims.dims [j * 3 + 0] ,attrs_root->characteristics [i].dims.dims [j * 3 + 1] ,attrs_root->characteristics [i].dims.dims [j * 3 + 2] ); } else { printf ("%" PRIu64 ,attrs_root->characteristics [i].dims.dims [j * 3 + 0] ); } } printf (")"); } printf ("\n"); } } attrs_root = attrs_root->next; nattrs++; } if (verbose==1) { printf ("Thread %d: Number of attributes: %u\n", tid, nattrs); } }