9570: CWL v1.0 support wip
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6
7 from cwltool.draft2tool import CommandLineTool
8 import cwltool.workflow
9 from cwltool.process import get_feature, scandeps, UnsupportedRequirement
10 from cwltool.load_tool import fetch_document
11 from cwltool.pathmapper import adjustFiles
12
13 import arvados.collection
14
15 from .arvdocker import arv_docker_get_image
16 from .pathmapper import ArvPathMapper
17
18 logger = logging.getLogger('arvados.cwl-runner')
19
20 class Runner(object):
21     def __init__(self, runner, tool, job_order, enable_reuse):
22         self.arvrunner = runner
23         self.tool = tool
24         self.job_order = job_order
25         self.running = False
26         self.enable_reuse = enable_reuse
27         self.uuid = None
28
29     def update_pipeline_component(self, record):
30         pass
31
32     def upload_docker(self, tool):
33         if isinstance(tool, CommandLineTool):
34             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
35             if docker_req:
36                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
37         elif isinstance(tool, cwltool.workflow.Workflow):
38             for s in tool.steps:
39                 self.upload_docker(s.embedded_tool)
40
41
42     def arvados_job_spec(self, *args, **kwargs):
43         self.upload_docker(self.tool)
44
45         workflowfiles = set()
46         jobfiles = set()
47         workflowfiles.add(self.tool.tool["id"])
48
49         self.name = os.path.basename(self.tool.tool["id"])
50
51         def visitFiles(files, path):
52             files.add(path)
53             return path
54
55         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
56         loaded = set()
57         def loadref(b, u):
58             joined = urlparse.urljoin(b, u)
59             if joined not in loaded:
60                 loaded.add(joined)
61                 return document_loader.fetch(urlparse.urljoin(b, u))
62             else:
63                 return {}
64
65         sc = scandeps(uri, workflowobj,
66                       set(("$import", "run")),
67                       set(("$include", "$schemas", "path")),
68                       loadref)
69         adjustFiles(sc, partial(visitFiles, workflowfiles))
70         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
71
72         keepprefix = kwargs.get("keepprefix", "")
73         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
74                                        keepprefix+"%s",
75                                        keepprefix+"%s/%s",
76                                        name=self.name,
77                                        **kwargs)
78
79         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
80                                   keepprefix+"%s",
81                                   keepprefix+"%s/%s",
82                                   name=os.path.basename(self.job_order.get("id", "#")),
83                                   **kwargs)
84
85         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
86
87         if "id" in self.job_order:
88             del self.job_order["id"]
89
90         return workflowmapper
91
92
93     def done(self, record):
94         if record["state"] == "Complete":
95             if record.get("exit_code") is not None:
96                 if record["exit_code"] == 33:
97                     processStatus = "UnsupportedRequirement"
98                 elif record["exit_code"] == 0:
99                     processStatus = "success"
100                 else:
101                     processStatus = "permanentFail"
102             else:
103                 processStatus = "success"
104         else:
105             processStatus = "permanentFail"
106
107         outputs = None
108         try:
109             try:
110                 outc = arvados.collection.Collection(record["output"])
111                 with outc.open("cwl.output.json") as f:
112                     outputs = json.load(f)
113                 def keepify(path):
114                     if not path.startswith("keep:"):
115                         return "keep:%s/%s" % (record["output"], path)
116                     else:
117                         return path
118                 adjustFiles(outputs, keepify)
119             except Exception as e:
120                 logger.error("While getting final output object: %s", e)
121             self.arvrunner.output_callback(outputs, processStatus)
122         finally:
123             del self.arvrunner.processes[record["uuid"]]