diff --git a/pipes/WDL/tasks/tasks_terra.wdl b/pipes/WDL/tasks/tasks_terra.wdl index 9002a6a43..42af6f048 100644 --- a/pipes/WDL/tasks/tasks_terra.wdl +++ b/pipes/WDL/tasks/tasks_terra.wdl @@ -382,8 +382,12 @@ task create_or_update_sample_tables { Array[String]? raw_reads_unaligned_bams Array[String]? cleaned_reads_unaligned_bams - File? meta_by_filename_json - File? meta_by_sample_json + File? meta_by_filename_json + File? meta_by_sample_json + + String flowcell_table_name = "flowcell" + String seq_library_table_name = "library" + String sample_aggregation_table_name = "sample" String docker = "quay.io/broadinstitute/viral-core:2.2.4" #skip-global-version-pin } @@ -392,6 +396,18 @@ task create_or_update_sample_tables { volatile: true } + parameter_meta { + flowcell_table_name: { + description: "Name of table containing per-flowcell rows. If overridden, flowcell_table_name must not contain spaces." + } + seq_library_table_name: { + description: "Name of table containing per-library rows. If overridden, seq_library_table_name must not contain spaces." + } + sample_aggregation_table_name: { + description: "Name of table representing samples, where each row has a foreign key link to one or more per-library rows. If overridden, sample_aggregation_table_name must not contain spaces." + } + } + command <<< python3<0 bytes # parse as json, pass to for loop below @@ -518,20 +534,44 @@ task create_or_update_sample_tables { library_meta_dict = json.load(meta_fp) else: # API call to get flowcell_data table - header, rows = get_entities_to_table(workspace_project, workspace_name, "flowcell") - df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="flowcell_id") + header, rows = get_entities_to_table(workspace_project, workspace_name, "~{flowcell_table_name}") + df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="~{flowcell_table_name}_id") library_meta_dict = df_flowcell.meta_by_filename[flowcell_data_id] # grab the meta_by_filename values to create new sample->library (sample_set->sample) mappings sample_to_libraries = {} for library_id, data in library_meta_dict.items(): - sample_id = data['sample'] + # get the sample ID from either the json files provided, or the live flowcell table + # (defaulting to the "sample" key used in the json, but falling back to the WDL-specified table name) + sample_id = data.get('sample',data['~{sample_aggregation_table_name}']) sample_to_libraries.setdefault(sample_id, []) if library_id in df_library.index: sample_to_libraries[sample_id].append(library_id) else: print (f"missing {library_id} from library table") + # (very) naive pluralization + def pluralize(input_noun_str): + if input_noun_str.endswith(('s', 'x', 'z', 'ch', 'sh')): + return input_noun_str + 'es' + elif input_noun_str.endswith('y'): + # if a consonant precedes the y, 'y' -> 'ies' + if input_noun_str[-2] not in 'aeiou': + return input_noun_str[:-1] + 'ies' + else: + return input_noun_str + 's' + elif input_noun_str.endswith('f'): + return input_noun_str[:-1] + 'ves' + elif input_noun_str.endswith('fe'): + return input_noun_str[:-2] + 'ves' + else: + return input_noun_str + 's' + + # naively pluralize "library" table name for column in the "sample" table + # listing one or more such entities + # (or what the inputs specify these names should be called) + pluralized_library_term = pluralize("~{seq_library_table_name}") + # merge in new sample->library mappings with any pre-existing sample->library mappings if len(df_sample)>0: print(df_sample.index) @@ -539,7 +579,7 @@ task create_or_update_sample_tables { if sample_id in df_sample.index: print (f"sample_set {sample_id} pre-exists in Terra table, merging with new members") #sample_set_to_samples[set_id].extend(df_sample_set.samples[set_id]) - already_associated_libraries = [entity["entityName"] for entity in df_sample.libraries[sample_id]] + already_associated_libraries = [entity["entityName"] for entity in df_sample[pluralized_library_term][sample_id]] print(f"already_associated_libraries {already_associated_libraries}") print(f"sample_to_libraries[sample_id] {sample_to_libraries[sample_id]}") @@ -548,12 +588,12 @@ task create_or_update_sample_tables { # collapse duplicate sample IDs sample_to_libraries[sample_id] = list(set(sample_to_libraries[sample_id])) - sample_fname = 'sample_membership.tsv' + sample_fname = '~{sample_aggregation_table_name}_membership.tsv' with open(sample_fname, 'wt') as outf: - outf.write('entity:sample_id\tlibraries\n') + outf.write(f'entity:~{sample_aggregation_table_name}_id\t{pluralized_library_term}\n') for sample_id, libraries in sample_to_libraries.items(): #for library_id in sorted(libraries): - outf.write(f'{sample_id}\t{json.dumps([{"entityType":"library","entityName":library_name} for library_name in libraries])}\n') + outf.write(f'{~{sample_aggregation_table_name}_id}\t{json.dumps([{"entityType":"~{seq_library_table_name}","entityName":library_name} for library_name in libraries])}\n') # if meta_by_filename_json specified and size>0 bytes # parse as json, pass to for loop below @@ -567,26 +607,26 @@ task create_or_update_sample_tables { meta_by_library_all = json.load(meta_fp) else: # API call to get flowcell_data table - header, rows = get_entities_to_table(workspace_project, workspace_name, "flowcell") - df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="flowcell_id") + header, rows = get_entities_to_table(workspace_project, workspace_name, "~{flowcell_table_name}") + df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="~{flowcell_table_name}_id") # grab the meta_by_sample values from one row in the flowcell_data table meta_by_library_all = df_flowcell.meta_by_sample[flowcell_data_id] # grab all the library IDs - header, rows = get_entities_to_table(workspace_project, workspace_name, "library") + header, rows = get_entities_to_table(workspace_project, workspace_name, "~{seq_library_table_name}") out_rows = [] - out_header = ['library_id'] + copy_cols + out_header = ['~{seq_library_table_name}_id'] + copy_cols print(f"out_header {out_header}") for row in rows: - out_row = {'library_id': row['library_id']} + out_row = {'~{seq_library_table_name}_id': row['~{seq_library_table_name}_id']} for sample_id,sample_library_metadata in meta_by_library_all.items(): - if sample_library_metadata["library"] in row['library_id']: + if sample_library_metadata["~{seq_library_table_name}"] in row.get('library_id',row['~{seq_library_table_name}']): for col in copy_cols: out_row[col] = sample_library_metadata.get(col, '') out_rows.append(out_row) - library_meta_fname = "sample_metadata.tsv" + library_meta_fname = "~{seq_library_table_name}_metadata.tsv" with open(library_meta_fname, 'wt') as outf: outf.write("entity:") writer = csv.DictWriter(outf, out_header, delimiter='\t', dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL) @@ -614,6 +654,10 @@ task create_or_update_sample_tables { output { File stdout_log = stdout() File stderr_log = stderr() + + File sample_table_tsv = '~{sample_aggregation_table_name}_membership.tsv' + File library_table_tsv = "~{seq_library_table_name}_metadata.tsv" + Int max_ram_gb = ceil(read_float("MEM_BYTES")/1000000000) } }