3 from functools import partial
7 from cwltool.draft2tool import CommandLineTool
8 import cwltool.workflow
9 from cwltool.process import get_feature, scandeps, adjustFiles
10 from cwltool.load_tool import fetch_document
12 import arvados.collection
14 from .arvdocker import arv_docker_get_image
15 from .pathmapper import ArvPathMapper
17 logger = logging.getLogger('arvados.cwl-runner')
20 def __init__(self, runner, tool, job_order, enable_reuse):
21 self.arvrunner = runner
23 self.job_order = job_order
25 self.enable_reuse = enable_reuse
27 def update_pipeline_component(self, record):
30 def upload_docker(self, tool):
31 if isinstance(tool, CommandLineTool):
32 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
34 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
35 elif isinstance(tool, cwltool.workflow.Workflow):
37 self.upload_docker(s.embedded_tool)
40 def arvados_job_spec(self, *args, **kwargs):
41 self.upload_docker(self.tool)
45 workflowfiles.add(self.tool.tool["id"])
47 self.name = os.path.basename(self.tool.tool["id"])
49 def visitFiles(files, path):
53 document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
55 return document_loader.fetch(urlparse.urljoin(b, u))
57 sc = scandeps(uri, workflowobj,
58 set(("$import", "run")),
59 set(("$include", "$schemas", "path")),
61 adjustFiles(sc, partial(visitFiles, workflowfiles))
62 adjustFiles(self.job_order, partial(visitFiles, jobfiles))
64 workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
70 jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
73 name=os.path.basename(self.job_order.get("id", "#")),
76 adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
78 if "id" in self.job_order:
79 del self.job_order["id"]
84 def done(self, record):
85 if record["state"] == "Complete":
86 processStatus = "success"
88 processStatus = "permanentFail"
93 outc = arvados.collection.Collection(record["output"])
94 with outc.open("cwl.output.json") as f:
95 outputs = json.load(f)
97 if not path.startswith("keep:"):
98 return "keep:%s/%s" % (record["output"], path)
99 adjustFiles(outputs, keepify)
100 except Exception as e:
101 logger.error("While getting final output object: %s", e)
102 self.arvrunner.output_callback(outputs, processStatus)
104 del self.arvrunner.jobs[record["uuid"]]