8442: Fix tests. Update comments. Rename jobs -> processes since it doesn't hold...
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6
7 from cwltool.draft2tool import CommandLineTool
8 import cwltool.workflow
9 from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
10 from cwltool.load_tool import fetch_document
11
12 import arvados.collection
13
14 from .arvdocker import arv_docker_get_image
15 from .pathmapper import ArvPathMapper
16
17 logger = logging.getLogger('arvados.cwl-runner')
18
19 class Runner(object):
20     def __init__(self, runner, tool, job_order, enable_reuse):
21         self.arvrunner = runner
22         self.tool = tool
23         self.job_order = job_order
24         self.running = False
25         self.enable_reuse = enable_reuse
26         self.uuid = None
27
28     def update_pipeline_component(self, record):
29         pass
30
31     def upload_docker(self, tool):
32         if isinstance(tool, CommandLineTool):
33             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
34             if docker_req:
35                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
36         elif isinstance(tool, cwltool.workflow.Workflow):
37             for s in tool.steps:
38                 self.upload_docker(s.embedded_tool)
39
40
41     def arvados_job_spec(self, *args, **kwargs):
42         self.upload_docker(self.tool)
43
44         workflowfiles = set()
45         jobfiles = set()
46         workflowfiles.add(self.tool.tool["id"])
47
48         self.name = os.path.basename(self.tool.tool["id"])
49
50         def visitFiles(files, path):
51             files.add(path)
52             return path
53
54         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
55         loaded = set()
56         def loadref(b, u):
57             joined = urlparse.urljoin(b, u)
58             if joined not in loaded:
59                 loaded.add(joined)
60                 return document_loader.fetch(urlparse.urljoin(b, u))
61             else:
62                 return {}
63
64         sc = scandeps(uri, workflowobj,
65                       set(("$import", "run")),
66                       set(("$include", "$schemas", "path")),
67                       loadref)
68         adjustFiles(sc, partial(visitFiles, workflowfiles))
69         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
70
71         keepprefix = kwargs.get("keepprefix", "")
72         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
73                                        keepprefix+"%s",
74                                        keepprefix+"%s/%s",
75                                        name=self.name,
76                                        **kwargs)
77
78         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
79                                   keepprefix+"%s",
80                                   keepprefix+"%s/%s",
81                                   name=os.path.basename(self.job_order.get("id", "#")),
82                                   **kwargs)
83
84         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
85
86         if "id" in self.job_order:
87             del self.job_order["id"]
88
89         return workflowmapper
90
91
92     def done(self, record):
93         if record["state"] == "Complete":
94             if record.get("exit_code") is not None:
95                 if record["exit_code"] == 33:
96                     processStatus = "UnsupportedRequirement"
97                 elif record["exit_code"] == 0:
98                     processStatus = "success"
99                 else:
100                     processStatus = "permanentFail"
101             else:
102                 processStatus = "success"
103         else:
104             processStatus = "permanentFail"
105
106         outputs = None
107         try:
108             try:
109                 outc = arvados.collection.Collection(record["output"])
110                 with outc.open("cwl.output.json") as f:
111                     outputs = json.load(f)
112                 def keepify(path):
113                     if not path.startswith("keep:"):
114                         return "keep:%s/%s" % (record["output"], path)
115                     else:
116                         return path
117                 adjustFiles(outputs, keepify)
118             except Exception as e:
119                 logger.error("While getting final output object: %s", e)
120             self.arvrunner.output_callback(outputs, processStatus)
121         finally:
122             del self.arvrunner.processes[record["uuid"]]