8442: Adding --submit support with --crunch2. General refactoring into more/smaller...
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4
5 from cwltool.draft2tool import CommandLineTool
6 import cwltool.workflow
7 from cwltool.process import get_feature, scandeps, adjustFiles
8 from cwltool.load_tool import fetch_document
9
10 from .arvdocker import arv_docker_get_image
11 from .pathmapper import ArvPathMapper
12
13 class Runner(object):
14     def __init__(self, runner, tool, job_order, enable_reuse):
15         self.arvrunner = runner
16         self.tool = tool
17         self.job_order = job_order
18         self.running = False
19         self.enable_reuse = enable_reuse
20
21     def update_pipeline_component(self, record):
22         pass
23
24     def upload_docker(self, tool):
25         if isinstance(tool, CommandLineTool):
26             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
27             if docker_req:
28                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
29         elif isinstance(tool, cwltool.workflow.Workflow):
30             for s in tool.steps:
31                 self.upload_docker(s.embedded_tool)
32
33
34     def arvados_job_spec(self, *args, **kwargs):
35         self.upload_docker(self.tool)
36
37         workflowfiles = set()
38         jobfiles = set()
39         workflowfiles.add(self.tool.tool["id"])
40
41         self.name = os.path.basename(self.tool.tool["id"])
42
43         def visitFiles(files, path):
44             files.add(path)
45             return path
46
47         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
48         def loadref(b, u):
49             return document_loader.fetch(urlparse.urljoin(b, u))
50
51         sc = scandeps(uri, workflowobj,
52                       set(("$import", "run")),
53                       set(("$include", "$schemas", "path")),
54                       loadref)
55         adjustFiles(sc, partial(visitFiles, workflowfiles))
56         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
57
58         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
59                                        "%s",
60                                        "%s/%s",
61                                        name=self.name,
62                                        **kwargs)
63
64         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
65                                   "%s",
66                                   "%s/%s",
67                                   name=os.path.basename(self.job_order.get("id", "#")),
68                                   **kwargs)
69
70         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
71
72         if "id" in self.job_order:
73             del self.job_order["id"]
74
75         return workflowmapper
76
77
78     def done(self, record):
79         if record["state"] == "Complete":
80             processStatus = "success"
81         else:
82             processStatus = "permanentFail"
83
84         outputs = None
85         try:
86             try:
87                 outc = arvados.collection.Collection(record["output"])
88                 with outc.open("cwl.output.json") as f:
89                     outputs = json.load(f)
90                 def keepify(path):
91                     if not path.startswith("keep:"):
92                         return "keep:%s/%s" % (record["output"], path)
93                 adjustFiles(outputs, keepify)
94             except Exception as e:
95                 logger.error("While getting final output object: %s", e)
96             self.arvrunner.output_callback(outputs, processStatus)
97         finally:
98             del self.arvrunner.jobs[record["uuid"]]