How to specify results files
Rule all
The rule all specifies the result files from the pipeline and is defined last in the Snakemake file:
rule all:
input:
unpack(compile_output_list),
compile_output_list function
The rule calls upon the function compile_output_list defined in common.smk which uses a json-file to define the output files. An example of the function is shown below:
def compile_output_list(wildcards):
output_files = []
types = set([unit.type for unit in units.itertuples()])
for output in output_json:
output_files += set(
[
output.format(sample=sample, type=unit_type, caller=caller)
for sample in get_samples(samples)
for unit_type in get_unit_types(units, sample)
if unit_type in set(output_json[output]["types"]).intersection(types)
for caller in config["bcbio_variation_recall_ensemble"]["callers"]
]
)
return list(set(output_files))
Output files specified in json format
The output_json is specified in the common.smk which look up the actual file in config.yaml:
with open(config["output"]) as output:
output_json = json.load(output)
In our example the output_json file looks like this:
{
"bam_dna/{sample}_{type}.bam": {"name": "_result_bam", "file": "alignment/samtools_merge_bam/{sample}_{type}.bam", "types": ["T", "N"]},
"bam_dna/{sample}_{type}.bam.bai": {"name": "_result_bai", "file": null, "types": ["T", "N"]},
"results/dna/vcf/{sample}_{type}.annotated.vcf.gz": {"name": "_result_annotated_vcf", "file": "annotation/background_annotation/{sample}_{type}.background_annotation.vcf.gz", "types": ["T", "N"]},
"results/rna/qc/multiqc_RNA.html": {"name": "_result_multiqc_rna_html", "file": "qc/multiqc/multiqc_RNA.html", "types": ["R"]}
}
Generate copy file rules
It is recommended to use a copy rule for each output file. In this way, result files will be copied individually and not as a bulk which improves performance for reruns. These rules can be generated programmatically in common.smk for example by the following function:
def generate_copy_code(workflow, output_json):
code = ""
for result, values in output_json.items():
if values["file"] is not None:
input_file = values["file"]
output_file = result
rule_name = values["name"]
mem_mb = config.get("_copy", {}).get("mem_mb", config["default_resources"]["mem_mb"])
mem_per_cpu = config.get("_copy", {}).get("mem_mb", config["default_resources"]["mem_mb"])
partition = config.get("_copy", {}).get("partition", config["default_resources"]["partition"])
threads = config.get("_copy", {}).get("threads", config["default_resources"]["threads"])
time = config.get("_copy", {}).get("time", config["default_resources"]["time"])
copy_container = config.get("_copy", {}).get("container", config["default_container"])
result_file = os.path.basename(output_file)
code += f'@workflow.rule(name="{rule_name}")\n'
code += f'@workflow.input("{input_file}")\n'
code += f'@workflow.output("{output_file}")\n'
code += f'@workflow.log("logs/{rule_name}_{result_file}.log")\n'
code += f'@workflow.container("{copy_container}")\n'
code += f'@workflow.conda("../env/copy_result.yaml")\n'
code += f'@workflow.resources(time = "{time}", threads = {threads}, mem_mb = {mem_mb}, mem_per_cpu = {mem_per_cpu}, partition = "{partition}")\n'
code += '@workflow.shellcmd("cp {input} {output}")\n\n'
code += "@workflow.run\n"
code += (
f"def __rule_{rule_name}(input, output, params, wildcards, threads, resources, log, version, rule, "
"conda_env, container_img, singularity_args, use_singularity, env_modules, bench_record, jobid, is_shell, "
"bench_iteration, cleanup_scripts, shadow_dir, edit_notebook, conda_base_path, basedir, runtime_sourcecache_path, "
"__is_snakemake_rule_func=True):\n"
'\tshell ( "(cp {input[0]} {output[0]}) &> {log}" , bench_record=bench_record, bench_iteration=bench_iteration)\n\n'
)
exec(compile(code, "result_to_copy", "exec"), workflow.globals)
generate_copy_code(workflow, output_json)