8654: Remove print statements.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 from cwltool.errors import WorkflowException
15 import threading
16 import cwltool.docker
17 import fnmatch
18 import logging
19 import re
20 import os
21 import sys
22 import functools
23 import json
24
25 from cwltool.process import get_feature, adjustFiles, scandeps
26 from arvados.api import OrderedJsonModel
27
28 logger = logging.getLogger('arvados.cwl-runner')
29 logger.setLevel(logging.INFO)
30
31 crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
32 crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
33 certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
34
35 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
36 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
37 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
38
39
40 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
41     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
42         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
43
44     sp = dockerRequirement["dockerImageId"].split(":")
45     image_name = sp[0]
46     image_tag = sp[1] if len(sp) > 1 else None
47
48     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
49                                                             image_name=image_name,
50                                                             image_tag=image_tag)
51
52     if not images:
53         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
54         args = ["--project-uuid="+project_uuid, image_name]
55         if image_tag:
56             args.append(image_tag)
57         logger.info("Uploading Docker image %s", ":".join(args[1:]))
58         arvados.commands.keepdocker.main(args)
59
60     return dockerRequirement["dockerImageId"]
61
62
63 class CollectionFsAccess(cwltool.process.StdFsAccess):
64     def __init__(self, basedir):
65         self.collections = {}
66         self.basedir = basedir
67
68     def get_collection(self, path):
69         p = path.split("/")
70         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
71             pdh = p[0][5:]
72             if pdh not in self.collections:
73                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
74             return (self.collections[pdh], "/".join(p[1:]))
75         else:
76             return (None, path)
77
78     def _match(self, collection, patternsegments, parent):
79         if not patternsegments:
80             return []
81
82         if not isinstance(collection, arvados.collection.RichCollectionBase):
83             return []
84
85         ret = []
86         # iterate over the files and subcollections in 'collection'
87         for filename in collection:
88             if patternsegments[0] == '.':
89                 # Pattern contains something like "./foo" so just shift
90                 # past the "./"
91                 ret.extend(self._match(collection, patternsegments[1:], parent))
92             elif fnmatch.fnmatch(filename, patternsegments[0]):
93                 cur = os.path.join(parent, filename)
94                 if len(patternsegments) == 1:
95                     ret.append(cur)
96                 else:
97                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
98         return ret
99
100     def glob(self, pattern):
101         collection, rest = self.get_collection(pattern)
102         patternsegments = rest.split("/")
103         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
104
105     def open(self, fn, mode):
106         collection, rest = self.get_collection(fn)
107         if collection:
108             return collection.open(rest, mode)
109         else:
110             return open(self._abs(fn), mode)
111
112     def exists(self, fn):
113         collection, rest = self.get_collection(fn)
114         if collection:
115             return collection.exists(rest)
116         else:
117             return os.path.exists(self._abs(fn))
118
119 class ArvadosJob(object):
120     def __init__(self, runner):
121         self.arvrunner = runner
122         self.running = False
123
124     def run(self, dry_run=False, pull_image=True, **kwargs):
125         script_parameters = {
126             "command": self.command_line
127         }
128         runtime_constraints = {}
129
130         if self.generatefiles:
131             vwd = arvados.collection.Collection()
132             script_parameters["task.vwd"] = {}
133             for t in self.generatefiles:
134                 if isinstance(self.generatefiles[t], dict):
135                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
136                     vwd.copy(rest, t, source_collection=src)
137                 else:
138                     with vwd.open(t, "w") as f:
139                         f.write(self.generatefiles[t])
140             vwd.save_new()
141             for t in self.generatefiles:
142                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
143
144         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
145         if self.environment:
146             script_parameters["task.env"].update(self.environment)
147
148         if self.stdin:
149             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
150
151         if self.stdout:
152             script_parameters["task.stdout"] = self.stdout
153
154         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
155         if docker_req and kwargs.get("use_container") is not False:
156             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
157
158         resources = self.builder.resources
159         if resources is not None:
160             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
161             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
162             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
163
164         try:
165             response = self.arvrunner.api.jobs().create(body={
166                 "owner_uuid": self.arvrunner.project_uuid,
167                 "script": "crunchrunner",
168                 "repository": "arvados",
169                 "script_version": "master",
170                 "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
171                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
172                 "runtime_constraints": runtime_constraints
173             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
174
175             self.arvrunner.jobs[response["uuid"]] = self
176
177             self.arvrunner.pipeline["components"][self.name] = {"job": response}
178             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
179                                                                                      body={
180                                                                                          "components": self.arvrunner.pipeline["components"]
181                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
182
183             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
184
185             if response["state"] in ("Complete", "Failed", "Cancelled"):
186                 self.done(response)
187         except Exception as e:
188             logger.error("Got error %s" % str(e))
189             self.output_callback({}, "permanentFail")
190
191     def update_pipeline_component(self, record):
192         self.arvrunner.pipeline["components"][self.name] = {"job": record}
193         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
194                                                                                  body={
195                                                                                     "components": self.arvrunner.pipeline["components"]
196                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
197
198     def done(self, record):
199         try:
200             self.update_pipeline_component(record)
201         except:
202             pass
203
204         try:
205             if record["state"] == "Complete":
206                 processStatus = "success"
207             else:
208                 processStatus = "permanentFail"
209
210             try:
211                 outputs = {}
212                 if record["output"]:
213                     logc = arvados.collection.Collection(record["log"])
214                     log = logc.open(logc.keys()[0])
215                     tmpdir = None
216                     outdir = None
217                     keepdir = None
218                     for l in log.readlines():
219                         g = tmpdirre.match(l)
220                         if g:
221                             tmpdir = g.group(1)
222                         g = outdirre.match(l)
223                         if g:
224                             outdir = g.group(1)
225                         g = keepre.match(l)
226                         if g:
227                             keepdir = g.group(1)
228
229                         # It turns out if the job fails and restarts it can
230                         # come up on a different compute node, so we have to
231                         # read the log to the end to be sure instead of taking the
232                         # easy way out.
233                         #
234                         #if tmpdir and outdir and keepdir:
235                         #    break
236
237                     self.builder.outdir = outdir
238                     self.builder.pathmapper.keepdir = keepdir
239                     outputs = self.collect_outputs("keep:" + record["output"])
240             except WorkflowException as e:
241                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
242                 processStatus = "permanentFail"
243             except Exception as e:
244                 logger.exception("Got unknown exception while collecting job outputs:")
245                 processStatus = "permanentFail"
246
247             self.output_callback(outputs, processStatus)
248         finally:
249             del self.arvrunner.jobs[record["uuid"]]
250
251
252 class RunnerJob(object):
253     def __init__(self, runner, tool, job_order, enable_reuse):
254         self.arvrunner = runner
255         self.tool = tool
256         self.job_order = job_order
257         self.running = False
258         self.enable_reuse = enable_reuse
259
260     def update_pipeline_component(self, record):
261         pass
262
263     def upload_docker(self, tool):
264         if isinstance(tool, cwltool.draft2tool.CommandLineTool):
265             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
266             if docker_req:
267                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
268         elif isinstance(tool, cwltool.workflow.Workflow):
269             for s in tool.steps:
270                 self.upload_docker(s.embedded_tool)
271
272     def run(self, dry_run=False, pull_image=True, **kwargs):
273         self.upload_docker(self.tool)
274
275         workflowfiles = set()
276         jobfiles = set()
277         workflowfiles.add(self.tool.tool["id"])
278
279         self.name = os.path.basename(self.tool.tool["id"])
280
281         def visitFiles(files, path):
282             files.add(path)
283             return path
284
285         document_loader, _, _ = cwltool.process.get_schema()
286         def loadref(b, u):
287             return document_loader.resolve_ref(u, base_url=b)[0]
288
289         sc = scandeps("", self.tool.tool,
290                       set(("$import", "run")),
291                       set(("$include", "$schemas", "path")),
292                       loadref)
293         adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
294         adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
295
296         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
297                                        "%s",
298                                        "%s/%s",
299                                        name=self.name,
300                                        **kwargs)
301
302         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
303                                   "%s",
304                                   "%s/%s",
305                                   name=os.path.basename(self.job_order.get("id", "#")),
306                                   **kwargs)
307
308         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
309
310         if "id" in self.job_order:
311             del self.job_order["id"]
312
313         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
314
315         response = self.arvrunner.api.jobs().create(body={
316             "script": "cwl-runner",
317             "script_version": "8654-arv-jobs-cwl-runner",
318             "repository": "arvados",
319             "script_parameters": self.job_order,
320             "runtime_constraints": {
321                 "docker_image": "arvados/jobs"
322             }
323         }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
324
325         self.arvrunner.jobs[response["uuid"]] = self
326
327         logger.info("Submitted job %s", response["uuid"])
328
329         if response["state"] in ("Complete", "Failed", "Cancelled"):
330             self.done(response)
331
332     def done(self, record):
333         if record["state"] == "Complete":
334             processStatus = "success"
335         else:
336             processStatus = "permanentFail"
337
338         outputs = None
339         try:
340             outc = arvados.collection.Collection(record["output"])
341             with outc.open("cwl.output.json") as f:
342                 outputs = json.load(f)
343             self.arvrunner.output_callback(outputs, processStatus)
344         finally:
345             del self.arvrunner.jobs[record["uuid"]]
346
347 class ArvPathMapper(cwltool.pathmapper.PathMapper):
348     def __init__(self, arvrunner, referenced_files, basedir,
349                  collection_pattern, file_pattern, name=None, **kwargs):
350         self._pathmap = arvrunner.get_uploaded()
351         uploadfiles = []
352
353         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
354
355         for src in referenced_files:
356             if isinstance(src, basestring) and pdh_path.match(src):
357                 self._pathmap[src] = (src, collection_pattern % src[5:])
358             if src not in self._pathmap:
359                 ab = cwltool.pathmapper.abspath(src, basedir)
360                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
361                 if kwargs.get("conformance_test"):
362                     self._pathmap[src] = (src, ab)
363                 elif isinstance(st, arvados.commands.run.UploadFile):
364                     uploadfiles.append((src, ab, st))
365                 elif isinstance(st, arvados.commands.run.ArvFile):
366                     self._pathmap[src] = (ab, st.fn)
367                 else:
368                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
369
370         if uploadfiles:
371             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
372                                              arvrunner.api,
373                                              dry_run=kwargs.get("dry_run"),
374                                              num_retries=3,
375                                              fnPattern=file_pattern,
376                                              name=name,
377                                              project=arvrunner.project_uuid)
378
379         for src, ab, st in uploadfiles:
380             arvrunner.add_uploaded(src, (ab, st.fn))
381             self._pathmap[src] = (ab, st.fn)
382
383         self.keepdir = None
384
385     def reversemap(self, target):
386         if target.startswith("keep:"):
387             return (target, target)
388         elif self.keepdir and target.startswith(self.keepdir):
389             return (target, "keep:" + target[len(self.keepdir)+1:])
390         else:
391             return super(ArvPathMapper, self).reversemap(target)
392
393
394 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
395     def __init__(self, arvrunner, toolpath_object, **kwargs):
396         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
397         self.arvrunner = arvrunner
398
399     def makeJobRunner(self):
400         return ArvadosJob(self.arvrunner)
401
402     def makePathMapper(self, reffiles, input_basedir, **kwargs):
403         return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
404                              "$(task.keep)/%s",
405                              "$(task.keep)/%s/%s",
406                              **kwargs)
407
408
409 class ArvCwlRunner(object):
410     def __init__(self, api_client):
411         self.api = api_client
412         self.jobs = {}
413         self.lock = threading.Lock()
414         self.cond = threading.Condition(self.lock)
415         self.final_output = None
416         self.uploaded = {}
417         self.num_retries = 4
418
419     def arvMakeTool(self, toolpath_object, **kwargs):
420         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
421             return ArvadosCommandTool(self, toolpath_object, **kwargs)
422         else:
423             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
424
425     def output_callback(self, out, processStatus):
426         if processStatus == "success":
427             logger.info("Overall job status is %s", processStatus)
428             if self.pipeline:
429                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
430                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
431
432         else:
433             logger.warn("Overall job status is %s", processStatus)
434             if self.pipeline:
435                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
436                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
437         self.final_output = out
438
439
440     def on_message(self, event):
441         if "object_uuid" in event:
442             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
443                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
444                     uuid = event["object_uuid"]
445                     with self.lock:
446                         j = self.jobs[uuid]
447                         logger.info("Job %s (%s) is Running", j.name, uuid)
448                         j.running = True
449                         j.update_pipeline_component(event["properties"]["new_attributes"])
450                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
451                     uuid = event["object_uuid"]
452                     try:
453                         self.cond.acquire()
454                         j = self.jobs[uuid]
455                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
456                         j.done(event["properties"]["new_attributes"])
457                         self.cond.notify()
458                     finally:
459                         self.cond.release()
460
461     def get_uploaded(self):
462         return self.uploaded.copy()
463
464     def add_uploaded(self, src, pair):
465         self.uploaded[src] = pair
466
467     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
468         self.debug = args.debug
469
470         if args.quiet:
471             logger.setLevel(logging.WARN)
472             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
473
474         try:
475             self.api.collections().get(uuid=crunchrunner_pdh).execute()
476         except arvados.errors.ApiError as e:
477             import httplib2
478             h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
479             resp, content = h.request(crunchrunner_download, "GET")
480             resp2, content2 = h.request(certs_download, "GET")
481             with arvados.collection.Collection() as col:
482                 with col.open("crunchrunner", "w") as f:
483                     f.write(content)
484                 with col.open("ca-certificates.crt", "w") as f:
485                     f.write(content2)
486
487                 col.save_new("crunchrunner binary", ensure_unique_name=True)
488
489         useruuid = self.api.users().current().execute()["uuid"]
490         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
491         self.pipeline = None
492
493         if args.submit:
494             runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
495             if not args.wait:
496                 runnerjob.run()
497                 return
498
499         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
500
501         self.fs_access = CollectionFsAccess(input_basedir)
502
503         kwargs["fs_access"] = self.fs_access
504         kwargs["enable_reuse"] = args.enable_reuse
505
506         kwargs["outdir"] = "$(task.outdir)"
507         kwargs["tmpdir"] = "$(task.tmpdir)"
508
509         if kwargs.get("conformance_test"):
510             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
511         else:
512             if args.submit:
513                 jobiter = iter((runnerjob,))
514             else:
515                 components = {}
516                 if "cwl_runner_job" in kwargs:
517                     components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
518
519                 self.pipeline = self.api.pipeline_instances().create(
520                     body={
521                         "owner_uuid": self.project_uuid,
522                         "name": shortname(tool.tool["id"]),
523                         "components": components,
524                         "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
525
526                 logger.info("Pipeline instance %s", self.pipeline["uuid"])
527
528                 jobiter = tool.job(job_order,
529                                    input_basedir,
530                                    self.output_callback,
531                                    docker_outdir="$(task.outdir)",
532                                    **kwargs)
533
534             try:
535                 self.cond.acquire()
536                 # Will continue to hold the lock for the duration of this code
537                 # except when in cond.wait(), at which point on_message can update
538                 # job state and process output callbacks.
539
540                 for runnable in jobiter:
541                     if runnable:
542                         runnable.run(**kwargs)
543                     else:
544                         if self.jobs:
545                             self.cond.wait(1)
546                         else:
547                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
548                             break
549
550                 while self.jobs:
551                     self.cond.wait(1)
552
553                 events.close()
554
555                 if self.final_output is None:
556                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
557
558                 # create final output collection
559             except:
560                 if sys.exc_info()[0] is KeyboardInterrupt:
561                     logger.error("Interrupted, marking pipeline as failed")
562                 else:
563                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
564                 if self.pipeline:
565                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
566                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
567             finally:
568                 self.cond.release()
569
570             return self.final_output
571
572
573 def main(args, stdout, stderr, api_client=None):
574     args.insert(0, "--leave-outputs")
575     parser = cwltool.main.arg_parser()
576     exgroup = parser.add_mutually_exclusive_group()
577     exgroup.add_argument("--enable-reuse", action="store_true",
578                         default=True, dest="enable_reuse",
579                         help="")
580     exgroup.add_argument("--disable-reuse", action="store_false",
581                         default=True, dest="enable_reuse",
582                         help="")
583     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
584     parser.add_argument("--submit", action="store_true", help="Submit job and print job uuid.",
585                         default=False)
586     parser.add_argument("--wait", action="store_true", help="Wait for completion after submitting cwl-runner job.",
587                         default=False)
588
589     try:
590         if api_client is None:
591             api_client=arvados.api('v1', model=OrderedJsonModel())
592         runner = ArvCwlRunner(api_client)
593     except Exception as e:
594         logger.error(e)
595         return 1
596
597     return cwltool.main.main(args,
598                              stdout=stdout,
599                              stderr=stderr,
600                              executor=runner.arvExecutor,
601                              makeTool=runner.arvMakeTool,
602                              parser=parser)