8857: Add --ignore-docker-for-reuse option to assist workflow development.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 from cwltool.errors import WorkflowException
15 import threading
16 import cwltool.docker
17 import fnmatch
18 import logging
19 import re
20 import os
21 import sys
22
23 from cwltool.process import get_feature
24 from arvados.api import OrderedJsonModel
25
26 logger = logging.getLogger('arvados.cwl-runner')
27 logger.setLevel(logging.INFO)
28
29 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
30 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
31 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
32
33
34 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
35     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
36         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
37
38     sp = dockerRequirement["dockerImageId"].split(":")
39     image_name = sp[0]
40     image_tag = sp[1] if len(sp) > 1 else None
41
42     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
43                                                             image_name=image_name,
44                                                             image_tag=image_tag)
45
46     if not images:
47         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
48         args = ["--project-uuid="+project_uuid, image_name]
49         if image_tag:
50             args.append(image_tag)
51         logger.info("Uploading Docker image %s", ":".join(args[1:]))
52         arvados.commands.keepdocker.main(args)
53
54     return dockerRequirement["dockerImageId"]
55
56
57 class CollectionFsAccess(cwltool.process.StdFsAccess):
58     def __init__(self, basedir):
59         self.collections = {}
60         self.basedir = basedir
61
62     def get_collection(self, path):
63         p = path.split("/")
64         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
65             pdh = p[0][5:]
66             if pdh not in self.collections:
67                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
68             return (self.collections[pdh], "/".join(p[1:]))
69         else:
70             return (None, path)
71
72     def _match(self, collection, patternsegments, parent):
73         if not patternsegments:
74             return []
75
76         if not isinstance(collection, arvados.collection.RichCollectionBase):
77             return []
78
79         ret = []
80         # iterate over the files and subcollections in 'collection'
81         for filename in collection:
82             if patternsegments[0] == '.':
83                 # Pattern contains something like "./foo" so just shift
84                 # past the "./"
85                 ret.extend(self._match(collection, patternsegments[1:], parent))
86             elif fnmatch.fnmatch(filename, patternsegments[0]):
87                 cur = os.path.join(parent, filename)
88                 if len(patternsegments) == 1:
89                     ret.append(cur)
90                 else:
91                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
92         return ret
93
94     def glob(self, pattern):
95         collection, rest = self.get_collection(pattern)
96         patternsegments = rest.split("/")
97         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
98
99     def open(self, fn, mode):
100         collection, rest = self.get_collection(fn)
101         if collection:
102             return collection.open(rest, mode)
103         else:
104             return open(self._abs(fn), mode)
105
106     def exists(self, fn):
107         collection, rest = self.get_collection(fn)
108         if collection:
109             return collection.exists(rest)
110         else:
111             return os.path.exists(self._abs(fn))
112
113 class ArvadosJob(object):
114     def __init__(self, runner):
115         self.arvrunner = runner
116         self.running = False
117
118     def run(self, dry_run=False, pull_image=True, **kwargs):
119         script_parameters = {
120             "command": self.command_line
121         }
122         runtime_constraints = {}
123
124         if self.generatefiles:
125             vwd = arvados.collection.Collection()
126             script_parameters["task.vwd"] = {}
127             for t in self.generatefiles:
128                 if isinstance(self.generatefiles[t], dict):
129                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
130                     vwd.copy(rest, t, source_collection=src)
131                 else:
132                     with vwd.open(t, "w") as f:
133                         f.write(self.generatefiles[t])
134             vwd.save_new()
135             for t in self.generatefiles:
136                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
137
138         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
139         if self.environment:
140             script_parameters["task.env"].update(self.environment)
141
142         if self.stdin:
143             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
144
145         if self.stdout:
146             script_parameters["task.stdout"] = self.stdout
147
148         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
149         if docker_req and kwargs.get("use_container") is not False:
150             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
151         else:
152             runtime_constraints["docker_image"] = "arvados/jobs"
153
154         resources = self.builder.resources
155         if resources is not None:
156             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
157             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
158             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
159
160         filters = [["repository", "=", "arvados"],
161                    ["script", "=", "crunchrunner"],
162                    ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
163         if not self.arvrunner.ignore_docker_for_reuse:
164             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
165
166         try:
167             response = self.arvrunner.api.jobs().create(
168                 body={
169                     "owner_uuid": self.arvrunner.project_uuid,
170                     "script": "crunchrunner",
171                     "repository": "arvados",
172                     "script_version": "master",
173                     "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
174                     "script_parameters": {"tasks": [script_parameters]},
175                     "runtime_constraints": runtime_constraints
176                 },
177                 filters=filters,
178                 find_or_create=kwargs.get("enable_reuse", True)
179             ).execute(num_retries=self.arvrunner.num_retries)
180
181             self.arvrunner.jobs[response["uuid"]] = self
182
183             self.arvrunner.pipeline["components"][self.name] = {"job": response}
184             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
185                                                                                      body={
186                                                                                          "components": self.arvrunner.pipeline["components"]
187                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
188
189             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
190
191             if response["state"] in ("Complete", "Failed", "Cancelled"):
192                 self.done(response)
193         except Exception as e:
194             logger.error("Got error %s" % str(e))
195             self.output_callback({}, "permanentFail")
196
197     def update_pipeline_component(self, record):
198         self.arvrunner.pipeline["components"][self.name] = {"job": record}
199         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
200                                                                                  body={
201                                                                                     "components": self.arvrunner.pipeline["components"]
202                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
203
204     def done(self, record):
205         try:
206             self.update_pipeline_component(record)
207         except:
208             pass
209
210         try:
211             if record["state"] == "Complete":
212                 processStatus = "success"
213             else:
214                 processStatus = "permanentFail"
215
216             try:
217                 outputs = {}
218                 if record["output"]:
219                     logc = arvados.collection.Collection(record["log"])
220                     log = logc.open(logc.keys()[0])
221                     tmpdir = None
222                     outdir = None
223                     keepdir = None
224                     for l in log:
225                         # Determine the tmpdir, outdir and keepdir paths from
226                         # the job run.  Unfortunately, we can't take the first
227                         # values we find (which are expected to be near the
228                         # top) and stop scanning because if the node fails and
229                         # the job restarts on a different node these values
230                         # will different runs, and we need to know about the
231                         # final run that actually produced output.
232
233                         g = tmpdirre.match(l)
234                         if g:
235                             tmpdir = g.group(1)
236                         g = outdirre.match(l)
237                         if g:
238                             outdir = g.group(1)
239                         g = keepre.match(l)
240                         if g:
241                             keepdir = g.group(1)
242
243                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
244
245                     # check if collection already exists with same owner, name and content
246                     collection_exists = self.arvrunner.api.collections().list(
247                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
248                                  ['portable_data_hash', '=', record["output"]],
249                                  ["name", "=", colname]]
250                     ).execute(num_retries=self.arvrunner.num_retries)
251
252                     if not collection_exists["items"]:
253                         # Create a collection located in the same project as the
254                         # pipeline with the contents of the output.
255                         # First, get output record.
256                         collections = self.arvrunner.api.collections().list(
257                             limit=1,
258                             filters=[['portable_data_hash', '=', record["output"]]],
259                             select=["manifest_text"]
260                         ).execute(num_retries=self.arvrunner.num_retries)
261
262                         if not collections["items"]:
263                             raise WorkflowException(
264                                 "Job output '%s' cannot be found on API server" % (
265                                     record["output"]))
266
267                         # Create new collection in the parent project
268                         # with the output contents.
269                         self.arvrunner.api.collections().create(body={
270                             "owner_uuid": self.arvrunner.project_uuid,
271                             "name": colname,
272                             "portable_data_hash": record["output"],
273                             "manifest_text": collections["items"][0]["manifest_text"]
274                         }, ensure_unique_name=True).execute(
275                             num_retries=self.arvrunner.num_retries)
276
277                     self.builder.outdir = outdir
278                     self.builder.pathmapper.keepdir = keepdir
279                     outputs = self.collect_outputs("keep:" + record["output"])
280             except WorkflowException as e:
281                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
282                 processStatus = "permanentFail"
283             except Exception as e:
284                 logger.exception("Got unknown exception while collecting job outputs:")
285                 processStatus = "permanentFail"
286
287             self.output_callback(outputs, processStatus)
288         finally:
289             del self.arvrunner.jobs[record["uuid"]]
290
291
292 class ArvPathMapper(cwltool.pathmapper.PathMapper):
293     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
294         self._pathmap = arvrunner.get_uploaded()
295         uploadfiles = []
296
297         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
298
299         for src in referenced_files:
300             if isinstance(src, basestring) and pdh_path.match(src):
301                 self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
302             if src not in self._pathmap:
303                 ab = cwltool.pathmapper.abspath(src, basedir)
304                 st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
305                 if kwargs.get("conformance_test"):
306                     self._pathmap[src] = (src, ab)
307                 elif isinstance(st, arvados.commands.run.UploadFile):
308                     uploadfiles.append((src, ab, st))
309                 elif isinstance(st, arvados.commands.run.ArvFile):
310                     self._pathmap[src] = (ab, st.fn)
311                 else:
312                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
313
314         if uploadfiles:
315             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
316                                              arvrunner.api,
317                                              dry_run=kwargs.get("dry_run"),
318                                              num_retries=3,
319                                              fnPattern="$(task.keep)/%s/%s",
320                                              project=arvrunner.project_uuid)
321
322         for src, ab, st in uploadfiles:
323             arvrunner.add_uploaded(src, (ab, st.fn))
324             self._pathmap[src] = (ab, st.fn)
325
326         self.keepdir = None
327
328     def reversemap(self, target):
329         if target.startswith("keep:"):
330             return (target, target)
331         elif self.keepdir and target.startswith(self.keepdir):
332             return (target, "keep:" + target[len(self.keepdir)+1:])
333         else:
334             return super(ArvPathMapper, self).reversemap(target)
335
336
337 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
338     def __init__(self, arvrunner, toolpath_object, **kwargs):
339         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
340         self.arvrunner = arvrunner
341
342     def makeJobRunner(self):
343         return ArvadosJob(self.arvrunner)
344
345     def makePathMapper(self, reffiles, input_basedir, **kwargs):
346         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
347
348
349 class ArvCwlRunner(object):
350     def __init__(self, api_client):
351         self.api = api_client
352         self.jobs = {}
353         self.lock = threading.Lock()
354         self.cond = threading.Condition(self.lock)
355         self.final_output = None
356         self.uploaded = {}
357         self.num_retries = 4
358
359     def arvMakeTool(self, toolpath_object, **kwargs):
360         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
361             return ArvadosCommandTool(self, toolpath_object, **kwargs)
362         else:
363             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
364
365     def output_callback(self, out, processStatus):
366         if processStatus == "success":
367             logger.info("Overall job status is %s", processStatus)
368             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
369                                                  body={"state": "Complete"}).execute(num_retries=self.num_retries)
370
371         else:
372             logger.warn("Overall job status is %s", processStatus)
373             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
374                                                  body={"state": "Failed"}).execute(num_retries=self.num_retries)
375         self.final_output = out
376
377
378     def on_message(self, event):
379         if "object_uuid" in event:
380             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
381                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
382                     uuid = event["object_uuid"]
383                     with self.lock:
384                         j = self.jobs[uuid]
385                         logger.info("Job %s (%s) is Running", j.name, uuid)
386                         j.running = True
387                         j.update_pipeline_component(event["properties"]["new_attributes"])
388                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
389                     uuid = event["object_uuid"]
390                     try:
391                         self.cond.acquire()
392                         j = self.jobs[uuid]
393                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
394                         j.done(event["properties"]["new_attributes"])
395                         self.cond.notify()
396                     finally:
397                         self.cond.release()
398
399     def get_uploaded(self):
400         return self.uploaded.copy()
401
402     def add_uploaded(self, src, pair):
403         self.uploaded[src] = pair
404
405     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
406         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
407
408         self.debug = args.debug
409         self.ignore_docker_for_reuse = args.ignore_docker_for_reuse
410         self.fs_access = CollectionFsAccess(input_basedir)
411
412         kwargs["fs_access"] = self.fs_access
413         kwargs["enable_reuse"] = args.enable_reuse
414
415         kwargs["outdir"] = "$(task.outdir)"
416         kwargs["tmpdir"] = "$(task.tmpdir)"
417
418         useruuid = self.api.users().current().execute()["uuid"]
419         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
420
421         if kwargs.get("conformance_test"):
422             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
423         else:
424             self.pipeline = self.api.pipeline_instances().create(
425                 body={
426                     "owner_uuid": self.project_uuid,
427                     "name": shortname(tool.tool["id"]),
428                     "components": {},
429                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
430
431             logger.info("Pipeline instance %s", self.pipeline["uuid"])
432
433             jobiter = tool.job(job_order,
434                                input_basedir,
435                                self.output_callback,
436                                docker_outdir="$(task.outdir)",
437                                **kwargs)
438
439             try:
440                 self.cond.acquire()
441                 # Will continue to hold the lock for the duration of this code
442                 # except when in cond.wait(), at which point on_message can update
443                 # job state and process output callbacks.
444
445                 for runnable in jobiter:
446                     if runnable:
447                         runnable.run(**kwargs)
448                     else:
449                         if self.jobs:
450                             self.cond.wait(1)
451                         else:
452                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
453                             break
454
455                 while self.jobs:
456                     self.cond.wait(1)
457
458                 events.close()
459
460                 if self.final_output is None:
461                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
462
463                 # create final output collection
464             except:
465                 if sys.exc_info()[0] is KeyboardInterrupt:
466                     logger.error("Interrupted, marking pipeline as failed")
467                 else:
468                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
469                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
470                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
471             finally:
472                 self.cond.release()
473
474             return self.final_output
475
476
477 def main(args, stdout, stderr, api_client=None):
478     args.insert(0, "--leave-outputs")
479     parser = cwltool.main.arg_parser()
480     exgroup = parser.add_mutually_exclusive_group()
481     exgroup.add_argument("--enable-reuse", action="store_true",
482                         default=True, dest="enable_reuse",
483                         help="")
484     exgroup.add_argument("--disable-reuse", action="store_false",
485                         default=True, dest="enable_reuse",
486                         help="")
487     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
488     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
489                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
490                         default=False)
491
492     try:
493         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
494     except Exception as e:
495         logger.error(e)
496         return 1
497
498     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)