6264: Fix bin/cwl-runner to import correct module.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import cwltool.draft2tool
9 import cwltool.workflow
10 import cwltool.main
11 import threading
12 import cwltool.docker
13 import fnmatch
14 import logging
15 import re
16 import os
17 from cwltool.process import get_feature
18
19 logger = logging.getLogger('arvados.cwl-runner')
20 logger.setLevel(logging.INFO)
21
22 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
23     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
24         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
25
26     sp = dockerRequirement["dockerImageId"].split(":")
27     image_name = sp[0]
28     image_tag = sp[1] if len(sp) > 1 else None
29
30     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
31                                                             image_name=image_name,
32                                                             image_tag=image_tag)
33
34     if not images:
35         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
36         args = [image_name]
37         if image_tag:
38             args.append(image_tag)
39         arvados.commands.keepdocker.main(args)
40
41     return dockerRequirement["dockerImageId"]
42
43 class CollectionFsAccess(cwltool.draft2tool.StdFsAccess):
44     def __init__(self, basedir):
45         self.collections = {}
46         self.basedir = basedir
47
48     def get_collection(self, path):
49         p = path.split("/")
50         if arvados.util.keep_locator_pattern.match(p[0]):
51             if p[0] not in self.collections:
52                 self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
53             return (self.collections[p[0]], "/".join(p[1:]))
54         else:
55             return (None, path)
56
57     def _match(self, collection, patternsegments, parent):
58         ret = []
59         for i in collection:
60             if fnmatch.fnmatch(i, patternsegments[0]):
61                 cur = os.path.join(parent, i)
62                 if len(patternsegments) == 1:
63                     ret.append(cur)
64                 else:
65                     ret.extend(self._match(collection[i], patternsegments[1:], cur))
66         return ret
67
68     def glob(self, pattern):
69         collection, rest = self.get_collection(pattern)
70         patternsegments = rest.split("/")
71         return self._match(collection, patternsegments, collection.manifest_locator())
72
73     def open(self, fn, mode):
74         collection, rest = self.get_collection(fn)
75         if collection:
76             return collection.open(rest, mode)
77         else:
78             return open(self._abs(fn), mode)
79
80     def exists(self, fn):
81         collection, rest = self.get_collection(fn)
82         if collection:
83             return collection.exists(rest)
84         else:
85             return os.path.exists(self._abs(fn))
86
87 class ArvadosJob(object):
88     def __init__(self, runner):
89         self.arvrunner = runner
90         self.running = False
91
92     def run(self, dry_run=False, pull_image=True, **kwargs):
93         script_parameters = {
94             "command": self.command_line
95         }
96         runtime_constraints = {}
97
98         if self.generatefiles:
99             vwd = arvados.collection.Collection()
100             for t in self.generatefiles:
101                 if isinstance(self.generatefiles[t], dict):
102                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"][6:])
103                     vwd.copy(rest, t, source_collection=src)
104                 else:
105                     with vwd.open(t, "w") as f:
106                         f.write(self.generatefiles[t])
107             vwd.save_new()
108             script_parameters["task.vwd"] = vwd.portable_data_hash()
109
110         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
111         if self.environment:
112             for k,v in self.environment.items():
113                 script_parameters["task.env"][k] = v
114
115         if self.stdin:
116             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
117
118         if self.stdout:
119             script_parameters["task.stdout"] = self.stdout
120
121         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
122         if docker_req and kwargs.get("use_container") is not False:
123             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
124             runtime_constraints["arvados_sdk_version"] = "master"
125
126         response = self.arvrunner.api.jobs().create(body={
127             "script": "run-command",
128             "repository": "arvados",
129             "script_version": "master",
130             "script_parameters": script_parameters,
131             "runtime_constraints": runtime_constraints
132         }, find_or_create=kwargs.get("enable_reuse", True)).execute()
133
134         self.arvrunner.jobs[response["uuid"]] = self
135
136         logger.info("Job %s is %s", response["uuid"], response["state"])
137
138         if response["state"] in ("Complete", "Failed", "Cancelled"):
139             self.done(response)
140
141     def done(self, record):
142         try:
143             if record["state"] == "Complete":
144                 processStatus = "success"
145             else:
146                 processStatus = "permanentFail"
147
148             try:
149                 outputs = {}
150                 outputs = self.collect_outputs(record["output"])
151             except Exception as e:
152                 logger.warn(str(e))
153                 processStatus = "permanentFail"
154
155             self.output_callback(outputs, processStatus)
156         finally:
157             del self.arvrunner.jobs[record["uuid"]]
158
159 class ArvPathMapper(cwltool.pathmapper.PathMapper):
160     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
161         self._pathmap = {}
162         uploadfiles = []
163
164         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
165
166         for src in referenced_files:
167             if isinstance(src, basestring) and pdh_path.match(src):
168                 self._pathmap[src] = (src, "/keep/%s" % src)
169             else:
170                 ab = src if os.path.isabs(src) else os.path.join(basedir, src)
171                 st = arvados.commands.run.statfile("", ab)
172                 if kwargs.get("conformance_test"):
173                     self._pathmap[src] = (src, ab)
174                 elif isinstance(st, arvados.commands.run.UploadFile):
175                     uploadfiles.append((src, ab, st))
176                 elif isinstance(st, arvados.commands.run.ArvFile):
177                     self._pathmap[src] = (ab, st.fn)
178                 else:
179                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
180
181         if uploadfiles:
182             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3)
183
184         for src, ab, st in uploadfiles:
185             self._pathmap[src] = (ab, st.fn)
186
187
188
189 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
190     def __init__(self, arvrunner, toolpath_object, **kwargs):
191         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
192         self.arvrunner = arvrunner
193
194     def makeJobRunner(self):
195         return ArvadosJob(self.arvrunner)
196
197     def makePathMapper(self, reffiles, input_basedir, **kwargs):
198         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
199
200
201 class ArvCwlRunner(object):
202     def __init__(self, api_client):
203         self.api = api_client
204         self.jobs = {}
205         self.lock = threading.Lock()
206         self.cond = threading.Condition(self.lock)
207         self.final_output = None
208
209     def arvMakeTool(self, toolpath_object, **kwargs):
210         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
211             return ArvadosCommandTool(self, toolpath_object, **kwargs)
212         else:
213             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
214
215     def output_callback(self, out, processStatus):
216         if processStatus == "success":
217             logger.info("Overall job status is %s", processStatus)
218         else:
219             logger.warn("Overall job status is %s", processStatus)
220         self.final_output = out
221
222     def on_message(self, event):
223         if "object_uuid" in event:
224                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
225                     if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
226                         logger.info("Job %s is Running", event["object_uuid"])
227                         with self.lock:
228                             self.jobs[event["object_uuid"]].running = True
229                     elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
230                         logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
231                         try:
232                             self.cond.acquire()
233                             self.jobs[event["object_uuid"]].done(event["properties"]["new_attributes"])
234                             self.cond.notify()
235                         finally:
236                             self.cond.release()
237
238     def arvExecutor(self, t, job_order, input_basedir, args, **kwargs):
239         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
240
241         self.fs_access = CollectionFsAccess(input_basedir)
242
243         kwargs["fs_access"] = self.fs_access
244         kwargs["enable_reuse"] = args.enable_reuse
245
246         if kwargs.get("conformance_test"):
247             return cwltool.main.single_job_executor(t, job_order, input_basedir, args, **kwargs)
248         else:
249             jobiter = t.job(job_order,
250                             input_basedir,
251                             self.output_callback,
252                             **kwargs)
253
254             for r in jobiter:
255                 if r:
256                     with self.lock:
257                         r.run(**kwargs)
258                 else:
259                     if self.jobs:
260                         try:
261                             self.cond.acquire()
262                             self.cond.wait()
263                         finally:
264                             self.cond.release()
265                     else:
266                         logger.error("Workflow cannot make any more progress.")
267                         break
268
269             while self.jobs:
270                 try:
271                     self.cond.acquire()
272                     self.cond.wait()
273                 finally:
274                     self.cond.release()
275
276             events.close()
277
278             if self.final_output is None:
279                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
280
281             return self.final_output
282
283
284 def main(args, stdout, stderr, api_client=None):
285     runner = ArvCwlRunner(api_client=arvados.api('v1'))
286     args.append("--leave-outputs")
287     parser = cwltool.main.arg_parser()
288     parser.add_argument("--enable-reuse", action="store_true",
289                         default=False, dest="enable_reuse",
290                         help="")
291     parser.add_argument("--disable-reuse", action="store_false",
292                         default=False, dest="enable_reuse",
293                         help="")
294
295     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)