for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
+def upload_instance(arvrunner, name, tool, job_order):
+ upload_docker(arvrunner, tool)
+
+ workflowmapper = upload_dependencies(arvrunner,
+ name,
+ tool.doc_loader,
+ tool.tool,
+ tool.tool["id"],
+ True)
+
+ jobmapper = upload_dependencies(arvrunner,
+ os.path.basename(job_order.get("id", "#")),
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ adjustDirObjs(job_order, trim_listing)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ return workflowmapper
+
class Runner(object):
- def __init__(self, runner, tool, job_order, enable_reuse):
+ def __init__(self, runner, tool, job_order, enable_reuse, output_name):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
self.running = False
self.enable_reuse = enable_reuse
self.uuid = None
+ self.final_output = None
+ self.output_name = output_name
def update_pipeline_component(self, record):
pass
def arvados_job_spec(self, *args, **kwargs):
- upload_docker(self.arvrunner, self.tool)
-
self.name = os.path.basename(self.tool.tool["id"])
-
- workflowmapper = upload_dependencies(self.arvrunner,
- self.name,
- self.tool.doc_loader,
- self.tool.tool,
- self.tool.tool["id"],
- True)
-
- jobmapper = upload_dependencies(self.arvrunner,
- os.path.basename(self.job_order.get("id", "#")),
- self.tool.doc_loader,
- self.job_order,
- self.job_order.get("id", "#"),
- False)
-
- adjustDirObjs(self.job_order, trim_listing)
-
- if "id" in self.job_order:
- del self.job_order["id"]
-
- return workflowmapper
-
+ return upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
def done(self, record):
if record["state"] == "Complete":
outputs = None
try:
try:
- outc = arvados.collection.Collection(record["output"])
+ self.final_output = record["output"]
+ outc = arvados.collection.CollectionReader(self.final_output,
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
with outc.open("cwl.output.json") as f:
outputs = json.load(f)
def keepify(fileobj):