9766: Supports packing workflow, setting defaults, uploading dependent files/docker...
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7
8 import cwltool.draft2tool
9 from cwltool.draft2tool import CommandLineTool
10 import cwltool.workflow
11 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
12 from cwltool.load_tool import fetch_document
13 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
14
15 import arvados.collection
16
17 from .arvdocker import arv_docker_get_image
18 from .pathmapper import ArvPathMapper
19
20 logger = logging.getLogger('arvados.cwl-runner')
21
22 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
23
24 def upload_dependencies(arvrunner, name, document_loader,
25                         workflowobj, uri, keepprefix, loadref_run):
26     loaded = set()
27     def loadref(b, u):
28         joined = urlparse.urljoin(b, u)
29         if joined not in loaded:
30             loaded.add(joined)
31             return document_loader.fetch(urlparse.urljoin(b, u))
32         else:
33             return {}
34
35     if loadref_run:
36         loadref_fields = set(("$import", "run"))
37     else:
38         loadref_fields = set(("$import",))
39
40     sc = scandeps(uri, workflowobj,
41                   loadref_fields,
42                   set(("$include", "$schemas", "path", "location")),
43                   loadref)
44
45     files = []
46     def visitFiles(path):
47         files.append(path)
48
49     adjustFileObjs(sc, visitFiles)
50     adjustDirObjs(sc, visitFiles)
51
52     normalizeFilesDirs(files)
53
54     mapper = ArvPathMapper(arvrunner, files, "",
55                            keepprefix+"%s",
56                            keepprefix+"%s/%s",
57                            name=name)
58
59     def setloc(p):
60         p["location"] = mapper.mapper(p["location"]).target
61     adjustFileObjs(workflowobj, setloc)
62     adjustDirObjs(workflowobj, setloc)
63
64     return mapper
65
66
67 def upload_docker(arvrunner, tool):
68     if isinstance(tool, CommandLineTool):
69         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
70         if docker_req:
71             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
72     elif isinstance(tool, cwltool.workflow.Workflow):
73         for s in tool.steps:
74             upload_docker(arvrunner, s.embedded_tool)
75
76
77 class Runner(object):
78     def __init__(self, runner, tool, job_order, enable_reuse):
79         self.arvrunner = runner
80         self.tool = tool
81         self.job_order = job_order
82         self.running = False
83         self.enable_reuse = enable_reuse
84         self.uuid = None
85
86     def update_pipeline_component(self, record):
87         pass
88
89     def arvados_job_spec(self, *args, **kwargs):
90         upload_docker(self.arvrunner, self.tool)
91
92         self.name = os.path.basename(self.tool.tool["id"])
93
94         workflowmapper = upload_dependencies(self.arvrunner,
95                                              self.name,
96                                              self.tool.doc_loader,
97                                              self.tool.tool,
98                                              self.tool.tool["id"],
99                                              kwargs.get("keepprefix", ""),
100                                              True)
101
102         jobmapper = upload_dependencies(self.arvrunner,
103                                         os.path.basename(self.job_order.get("id", "#")),
104                                         self.tool.doc_loader,
105                                         self.job_order,
106                                         self.job_order.get("id", "#"),
107                                         kwargs.get("keepprefix", ""),
108                                         False)
109
110         if "id" in self.job_order:
111             del self.job_order["id"]
112
113         return workflowmapper
114
115
116     def done(self, record):
117         if record["state"] == "Complete":
118             if record.get("exit_code") is not None:
119                 if record["exit_code"] == 33:
120                     processStatus = "UnsupportedRequirement"
121                 elif record["exit_code"] == 0:
122                     processStatus = "success"
123                 else:
124                     processStatus = "permanentFail"
125             else:
126                 processStatus = "success"
127         else:
128             processStatus = "permanentFail"
129
130         outputs = None
131         try:
132             try:
133                 outc = arvados.collection.Collection(record["output"])
134                 with outc.open("cwl.output.json") as f:
135                     outputs = json.load(f)
136                 def keepify(fileobj):
137                     path = fileobj["location"]
138                     if not path.startswith("keep:"):
139                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
140                 adjustFileObjs(outputs, keepify)
141                 adjustDirObjs(outputs, keepify)
142             except Exception as e:
143                 logger.error("While getting final output object: %s", e)
144             self.arvrunner.output_callback(outputs, processStatus)
145         finally:
146             del self.arvrunner.processes[record["uuid"]]