8815: Fix syntax errors.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 from cwltool.errors import WorkflowException
15 import threading
16 import cwltool.docker
17 import fnmatch
18 import logging
19 import re
20 import os
21 import sys
22
23 from cwltool.process import get_feature
24 from arvados.api import OrderedJsonModel
25
26 logger = logging.getLogger('arvados.cwl-runner')
27 logger.setLevel(logging.INFO)
28
29 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
30 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
31 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
32
33
34 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
35     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
36         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
37
38     sp = dockerRequirement["dockerImageId"].split(":")
39     image_name = sp[0]
40     image_tag = sp[1] if len(sp) > 1 else None
41
42     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
43                                                             image_name=image_name,
44                                                             image_tag=image_tag)
45
46     if not images:
47         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
48         args = ["--project-uuid="+project_uuid, image_name]
49         if image_tag:
50             args.append(image_tag)
51         logger.info("Uploading Docker image %s", ":".join(args[1:]))
52         arvados.commands.keepdocker.main(args)
53
54     return dockerRequirement["dockerImageId"]
55
56
57 class CollectionFsAccess(cwltool.process.StdFsAccess):
58     def __init__(self, basedir):
59         self.collections = {}
60         self.basedir = basedir
61
62     def get_collection(self, path):
63         p = path.split("/")
64         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
65             pdh = p[0][5:]
66             if pdh not in self.collections:
67                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
68             return (self.collections[pdh], "/".join(p[1:]))
69         else:
70             return (None, path)
71
72     def _match(self, collection, patternsegments, parent):
73         if not patternsegments:
74             return []
75
76         if not isinstance(collection, arvados.collection.RichCollectionBase):
77             return []
78
79         ret = []
80         # iterate over the files and subcollections in 'collection'
81         for filename in collection:
82             if patternsegments[0] == '.':
83                 # Pattern contains something like "./foo" so just shift
84                 # past the "./"
85                 ret.extend(self._match(collection, patternsegments[1:], parent))
86             elif fnmatch.fnmatch(filename, patternsegments[0]):
87                 cur = os.path.join(parent, filename)
88                 if len(patternsegments) == 1:
89                     ret.append(cur)
90                 else:
91                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
92         return ret
93
94     def glob(self, pattern):
95         collection, rest = self.get_collection(pattern)
96         patternsegments = rest.split("/")
97         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
98
99     def open(self, fn, mode):
100         collection, rest = self.get_collection(fn)
101         if collection:
102             return collection.open(rest, mode)
103         else:
104             return open(self._abs(fn), mode)
105
106     def exists(self, fn):
107         collection, rest = self.get_collection(fn)
108         if collection:
109             return collection.exists(rest)
110         else:
111             return os.path.exists(self._abs(fn))
112
113 class ArvadosJob(object):
114     def __init__(self, runner):
115         self.arvrunner = runner
116         self.running = False
117
118     def run(self, dry_run=False, pull_image=True, **kwargs):
119         script_parameters = {
120             "command": self.command_line
121         }
122         runtime_constraints = {}
123
124         if self.generatefiles:
125             vwd = arvados.collection.Collection()
126             script_parameters["task.vwd"] = {}
127             for t in self.generatefiles:
128                 if isinstance(self.generatefiles[t], dict):
129                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
130                     vwd.copy(rest, t, source_collection=src)
131                 else:
132                     with vwd.open(t, "w") as f:
133                         f.write(self.generatefiles[t])
134             vwd.save_new()
135             for t in self.generatefiles:
136                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
137
138         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
139         if self.environment:
140             script_parameters["task.env"].update(self.environment)
141
142         if self.stdin:
143             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
144
145         if self.stdout:
146             script_parameters["task.stdout"] = self.stdout
147
148         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
149         if docker_req and kwargs.get("use_container") is not False:
150             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
151         else:
152             runtime_constraints["docker_image"] = "arvados/jobs"
153
154         resources = self.builder.resources
155         if resources is not None:
156             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
157             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
158             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
159
160         try:
161             response = self.arvrunner.api.jobs().create(body={
162                 "owner_uuid": self.arvrunner.project_uuid,
163                 "script": "crunchrunner",
164                 "repository": "arvados",
165                 "script_version": "master",
166                 "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
167                 "script_parameters": {"tasks": [script_parameters]},
168                 "runtime_constraints": runtime_constraints
169             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
170
171             self.arvrunner.jobs[response["uuid"]] = self
172
173             self.arvrunner.pipeline["components"][self.name] = {"job": response}
174             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
175                                                                                      body={
176                                                                                          "components": self.arvrunner.pipeline["components"]
177                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
178
179             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
180
181             if response["state"] in ("Complete", "Failed", "Cancelled"):
182                 self.done(response)
183         except Exception as e:
184             logger.error("Got error %s" % str(e))
185             self.output_callback({}, "permanentFail")
186
187     def update_pipeline_component(self, record):
188         self.arvrunner.pipeline["components"][self.name] = {"job": record}
189         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
190                                                                                  body={
191                                                                                     "components": self.arvrunner.pipeline["components"]
192                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
193
194     def done(self, record):
195         try:
196             self.update_pipeline_component(record)
197         except:
198             pass
199
200         try:
201             if record["state"] == "Complete":
202                 processStatus = "success"
203             else:
204                 processStatus = "permanentFail"
205
206             try:
207                 outputs = {}
208                 if record["output"]:
209                     logc = arvados.collection.Collection(record["log"])
210                     log = logc.open(logc.keys()[0])
211                     tmpdir = None
212                     outdir = None
213                     keepdir = None
214                     for l in log:
215                         # Determine the tmpdir, outdir and keepdir paths from
216                         # the job run.  Unfortunately, we can't take the first
217                         # values we find (which are expected to be near the
218                         # top) and stop scanning because if the node fails and
219                         # the job restarts on a different node these values
220                         # will different runs, and we need to know about the
221                         # final run that actually produced output.
222
223                         g = tmpdirre.match(l)
224                         if g:
225                             tmpdir = g.group(1)
226                         g = outdirre.match(l)
227                         if g:
228                             outdir = g.group(1)
229                         g = keepre.match(l)
230                         if g:
231                             keepdir = g.group(1)
232
233                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
234
235                     # check if collection already exists with same owner, name and content
236                     collection_exists = self.arvrunner.api.collections().list(
237                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
238                                  ['portable_data_hash', '=', record["output"]],
239                                  ["name", "=", colname]]
240                     ).execute(num_retries=self.arvrunner.num_retries)
241
242                     if not collection_exists["items"]:
243                         # Create a collection located in the same project as the
244                         # pipeline with the contents of the output.
245                         # First, get output record.
246                         collections = self.arvrunner.api.collections().list(
247                             limit=1,
248                             filters=[['portable_data_hash', '=', record["output"]]],
249                             select=["manifest_text"]
250                         ).execute(num_retries=self.arvrunner.num_retries)
251
252                         if not collections["items"]:
253                             raise WorkflowException(
254                                 "Job output '%s' cannot be found on API server" % (
255                                     record["output"]))
256
257                         # Create new collection in the parent project
258                         # with the output contents.
259                         self.arvrunner.api.collections().create(body={
260                             "owner_uuid": self.arvrunner.project_uuid,
261                             "name": colname,
262                             "portable_data_hash": record["output"],
263                             "manifest_text": collections["items"][0]["manifest_text"]
264                         }, ensure_unique_name=True).execute(
265                             num_retries=self.arvrunner.num_retries)
266
267                     self.builder.outdir = outdir
268                     self.builder.pathmapper.keepdir = keepdir
269                     outputs = self.collect_outputs("keep:" + record["output"])
270             except WorkflowException as e:
271                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
272                 processStatus = "permanentFail"
273             except Exception as e:
274                 logger.exception("Got unknown exception while collecting job outputs:")
275                 processStatus = "permanentFail"
276
277             self.output_callback(outputs, processStatus)
278         finally:
279             del self.arvrunner.jobs[record["uuid"]]
280
281
282 class ArvPathMapper(cwltool.pathmapper.PathMapper):
283     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
284         self._pathmap = arvrunner.get_uploaded()
285         uploadfiles = []
286
287         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
288
289         for src in referenced_files:
290             if isinstance(src, basestring) and pdh_path.match(src):
291                 self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
292             if src not in self._pathmap:
293                 ab = cwltool.pathmapper.abspath(src, basedir)
294                 st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
295                 if kwargs.get("conformance_test"):
296                     self._pathmap[src] = (src, ab)
297                 elif isinstance(st, arvados.commands.run.UploadFile):
298                     uploadfiles.append((src, ab, st))
299                 elif isinstance(st, arvados.commands.run.ArvFile):
300                     self._pathmap[src] = (ab, st.fn)
301                 else:
302                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
303
304         if uploadfiles:
305             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
306                                              arvrunner.api,
307                                              dry_run=kwargs.get("dry_run"),
308                                              num_retries=3,
309                                              fnPattern="$(task.keep)/%s/%s",
310                                              project=arvrunner.project_uuid)
311
312         for src, ab, st in uploadfiles:
313             arvrunner.add_uploaded(src, (ab, st.fn))
314             self._pathmap[src] = (ab, st.fn)
315
316         self.keepdir = None
317
318     def reversemap(self, target):
319         if target.startswith("keep:"):
320             return (target, target)
321         elif self.keepdir and target.startswith(self.keepdir):
322             return (target, "keep:" + target[len(self.keepdir)+1:])
323         else:
324             return super(ArvPathMapper, self).reversemap(target)
325
326
327 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
328     def __init__(self, arvrunner, toolpath_object, **kwargs):
329         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
330         self.arvrunner = arvrunner
331
332     def makeJobRunner(self):
333         return ArvadosJob(self.arvrunner)
334
335     def makePathMapper(self, reffiles, input_basedir, **kwargs):
336         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
337
338
339 class ArvCwlRunner(object):
340     def __init__(self, api_client):
341         self.api = api_client
342         self.jobs = {}
343         self.lock = threading.Lock()
344         self.cond = threading.Condition(self.lock)
345         self.final_output = None
346         self.uploaded = {}
347         self.num_retries = 4
348
349     def arvMakeTool(self, toolpath_object, **kwargs):
350         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
351             return ArvadosCommandTool(self, toolpath_object, **kwargs)
352         else:
353             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
354
355     def output_callback(self, out, processStatus):
356         if processStatus == "success":
357             logger.info("Overall job status is %s", processStatus)
358             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
359                                                  body={"state": "Complete"}).execute(num_retries=self.num_retries)
360
361         else:
362             logger.warn("Overall job status is %s", processStatus)
363             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
364                                                  body={"state": "Failed"}).execute(num_retries=self.num_retries)
365         self.final_output = out
366
367
368     def on_message(self, event):
369         if "object_uuid" in event:
370             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
371                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
372                     uuid = event["object_uuid"]
373                     with self.lock:
374                         j = self.jobs[uuid]
375                         logger.info("Job %s (%s) is Running", j.name, uuid)
376                         j.running = True
377                         j.update_pipeline_component(event["properties"]["new_attributes"])
378                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
379                     uuid = event["object_uuid"]
380                     try:
381                         self.cond.acquire()
382                         j = self.jobs[uuid]
383                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
384                         j.done(event["properties"]["new_attributes"])
385                         self.cond.notify()
386                     finally:
387                         self.cond.release()
388
389     def get_uploaded(self):
390         return self.uploaded.copy()
391
392     def add_uploaded(self, src, pair):
393         self.uploaded[src] = pair
394
395     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
396         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
397
398         self.debug = args.debug
399         self.fs_access = CollectionFsAccess(input_basedir)
400
401         kwargs["fs_access"] = self.fs_access
402         kwargs["enable_reuse"] = args.enable_reuse
403
404         kwargs["outdir"] = "$(task.outdir)"
405         kwargs["tmpdir"] = "$(task.tmpdir)"
406
407         useruuid = self.api.users().current().execute()["uuid"]
408         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
409
410         if kwargs.get("conformance_test"):
411             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
412         else:
413             self.pipeline = self.api.pipeline_instances().create(
414                 body={
415                     "owner_uuid": self.project_uuid,
416                     "name": shortname(tool.tool["id"]),
417                     "components": {},
418                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
419
420             logger.info("Pipeline instance %s", self.pipeline["uuid"])
421
422             jobiter = tool.job(job_order,
423                                input_basedir,
424                                self.output_callback,
425                                docker_outdir="$(task.outdir)",
426                                **kwargs)
427
428             try:
429                 self.cond.acquire()
430                 # Will continue to hold the lock for the duration of this code
431                 # except when in cond.wait(), at which point on_message can update
432                 # job state and process output callbacks.
433
434                 for runnable in jobiter:
435                     if runnable:
436                         runnable.run(**kwargs)
437                     else:
438                         if self.jobs:
439                             self.cond.wait(1)
440                         else:
441                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
442                             break
443
444                 while self.jobs:
445                     self.cond.wait(1)
446
447                 events.close()
448
449                 if self.final_output is None:
450                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
451
452                 # create final output collection
453             except:
454                 if sys.exc_info()[0] is KeyboardInterrupt:
455                     logger.error("Interrupted, marking pipeline as failed")
456                 else:
457                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
458                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
459                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
460             finally:
461                 self.cond.release()
462
463             return self.final_output
464
465
466 def main(args, stdout, stderr, api_client=None):
467     args.insert(0, "--leave-outputs")
468     parser = cwltool.main.arg_parser()
469     exgroup = parser.add_mutually_exclusive_group()
470     exgroup.add_argument("--enable-reuse", action="store_true",
471                         default=True, dest="enable_reuse",
472                         help="")
473     exgroup.add_argument("--disable-reuse", action="store_false",
474                         default=True, dest="enable_reuse",
475                         help="")
476     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
477
478     try:
479         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
480     except Exception as e:
481         logger.error(e)
482         return 1
483
484     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)