Merge branch 'master' into 8654-arv-jobs-cwl-runner
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 from cwltool.errors import WorkflowException
15 import threading
16 import cwltool.docker
17 import fnmatch
18 import logging
19 import re
20 import os
21 import sys
22
23 from cwltool.process import get_feature
24 from arvados.api import OrderedJsonModel
25
26 logger = logging.getLogger('arvados.cwl-runner')
27 logger.setLevel(logging.INFO)
28
29 crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
30 crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
31 certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
32
33 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
34 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
35 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
36
37
38 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
39     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
40         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
41
42     sp = dockerRequirement["dockerImageId"].split(":")
43     image_name = sp[0]
44     image_tag = sp[1] if len(sp) > 1 else None
45
46     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
47                                                             image_name=image_name,
48                                                             image_tag=image_tag)
49
50     if not images:
51         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
52         args = ["--project-uuid="+project_uuid, image_name]
53         if image_tag:
54             args.append(image_tag)
55         logger.info("Uploading Docker image %s", ":".join(args[1:]))
56         arvados.commands.keepdocker.main(args)
57
58     return dockerRequirement["dockerImageId"]
59
60
61 class CollectionFsAccess(cwltool.process.StdFsAccess):
62     def __init__(self, basedir):
63         self.collections = {}
64         self.basedir = basedir
65
66     def get_collection(self, path):
67         p = path.split("/")
68         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
69             pdh = p[0][5:]
70             if pdh not in self.collections:
71                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
72             return (self.collections[pdh], "/".join(p[1:]))
73         else:
74             return (None, path)
75
76     def _match(self, collection, patternsegments, parent):
77         if not patternsegments:
78             return []
79
80         if not isinstance(collection, arvados.collection.RichCollectionBase):
81             return []
82
83         ret = []
84         # iterate over the files and subcollections in 'collection'
85         for filename in collection:
86             if patternsegments[0] == '.':
87                 # Pattern contains something like "./foo" so just shift
88                 # past the "./"
89                 ret.extend(self._match(collection, patternsegments[1:], parent))
90             elif fnmatch.fnmatch(filename, patternsegments[0]):
91                 cur = os.path.join(parent, filename)
92                 if len(patternsegments) == 1:
93                     ret.append(cur)
94                 else:
95                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
96         return ret
97
98     def glob(self, pattern):
99         collection, rest = self.get_collection(pattern)
100         patternsegments = rest.split("/")
101         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
102
103     def open(self, fn, mode):
104         collection, rest = self.get_collection(fn)
105         if collection:
106             return collection.open(rest, mode)
107         else:
108             return open(self._abs(fn), mode)
109
110     def exists(self, fn):
111         collection, rest = self.get_collection(fn)
112         if collection:
113             return collection.exists(rest)
114         else:
115             return os.path.exists(self._abs(fn))
116
117 class ArvadosJob(object):
118     def __init__(self, runner):
119         self.arvrunner = runner
120         self.running = False
121
122     def run(self, dry_run=False, pull_image=True, **kwargs):
123         script_parameters = {
124             "command": self.command_line
125         }
126         runtime_constraints = {}
127
128         if self.generatefiles:
129             vwd = arvados.collection.Collection()
130             script_parameters["task.vwd"] = {}
131             for t in self.generatefiles:
132                 if isinstance(self.generatefiles[t], dict):
133                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
134                     vwd.copy(rest, t, source_collection=src)
135                 else:
136                     with vwd.open(t, "w") as f:
137                         f.write(self.generatefiles[t])
138             vwd.save_new()
139             for t in self.generatefiles:
140                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
141
142         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
143         if self.environment:
144             script_parameters["task.env"].update(self.environment)
145
146         if self.stdin:
147             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
148
149         if self.stdout:
150             script_parameters["task.stdout"] = self.stdout
151
152         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
153         if docker_req and kwargs.get("use_container") is not False:
154             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
155
156         resources = self.builder.resources
157         if resources is not None:
158             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
159             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
160             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
161
162         try:
163             response = self.arvrunner.api.jobs().create(body={
164                 "owner_uuid": self.arvrunner.project_uuid,
165                 "script": "crunchrunner",
166                 "repository": "arvados",
167                 "script_version": "master",
168                 "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
169                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
170                 "runtime_constraints": runtime_constraints
171             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
172
173             self.arvrunner.jobs[response["uuid"]] = self
174
175             self.arvrunner.pipeline["components"][self.name] = {"job": response}
176             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
177                                                                                      body={
178                                                                                          "components": self.arvrunner.pipeline["components"]
179                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
180
181             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
182
183             if response["state"] in ("Complete", "Failed", "Cancelled"):
184                 self.done(response)
185         except Exception as e:
186             logger.error("Got error %s" % str(e))
187             self.output_callback({}, "permanentFail")
188
189     def update_pipeline_component(self, record):
190         self.arvrunner.pipeline["components"][self.name] = {"job": record}
191         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
192                                                                                  body={
193                                                                                     "components": self.arvrunner.pipeline["components"]
194                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
195
196     def done(self, record):
197         try:
198             self.update_pipeline_component(record)
199         except:
200             pass
201
202         try:
203             if record["state"] == "Complete":
204                 processStatus = "success"
205             else:
206                 processStatus = "permanentFail"
207
208             try:
209                 outputs = {}
210                 if record["output"]:
211                     logc = arvados.collection.Collection(record["log"])
212                     log = logc.open(logc.keys()[0])
213                     tmpdir = None
214                     outdir = None
215                     keepdir = None
216                     for l in log.readlines():
217                         g = tmpdirre.match(l)
218                         if g:
219                             tmpdir = g.group(1)
220                         g = outdirre.match(l)
221                         if g:
222                             outdir = g.group(1)
223                         g = keepre.match(l)
224                         if g:
225                             keepdir = g.group(1)
226
227                         # It turns out if the job fails and restarts it can
228                         # come up on a different compute node, so we have to
229                         # read the log to the end to be sure instead of taking the
230                         # easy way out.
231                         #
232                         #if tmpdir and outdir and keepdir:
233                         #    break
234
235                     self.builder.outdir = outdir
236                     self.builder.pathmapper.keepdir = keepdir
237                     outputs = self.collect_outputs("keep:" + record["output"])
238             except WorkflowException as e:
239                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
240                 processStatus = "permanentFail"
241             except Exception as e:
242                 logger.exception("Got unknown exception while collecting job outputs:")
243                 processStatus = "permanentFail"
244
245             self.output_callback(outputs, processStatus)
246         finally:
247             del self.arvrunner.jobs[record["uuid"]]
248
249
250 class ArvPathMapper(cwltool.pathmapper.PathMapper):
251     def __init__(self, arvrunner, referenced_files, basedir,
252                  collection_pattern, file_pattern, **kwargs):
253         self._pathmap = arvrunner.get_uploaded()
254         uploadfiles = []
255
256         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
257
258         for src in referenced_files:
259             if isinstance(src, basestring) and pdh_path.match(src):
260                 self._pathmap[src] = (src, collection_pattern % src[5:])
261             if src not in self._pathmap:
262                 ab = cwltool.pathmapper.abspath(src, basedir)
263                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
264                 if kwargs.get("conformance_test"):
265                     self._pathmap[src] = (src, ab)
266                 elif isinstance(st, arvados.commands.run.UploadFile):
267                     uploadfiles.append((src, ab, st))
268                 elif isinstance(st, arvados.commands.run.ArvFile):
269                     self._pathmap[src] = (ab, st.fn)
270                 else:
271                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
272
273         if uploadfiles:
274             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
275                                              arvrunner.api,
276                                              dry_run=kwargs.get("dry_run"),
277                                              num_retries=3,
278                                              fnPattern=file_pattern,
279                                              project=arvrunner.project_uuid)
280
281         for src, ab, st in uploadfiles:
282             arvrunner.add_uploaded(src, (ab, st.fn))
283             self._pathmap[src] = (ab, st.fn)
284
285         self.keepdir = None
286
287     def reversemap(self, target):
288         if target.startswith("keep:"):
289             return (target, target)
290         elif self.keepdir and target.startswith(self.keepdir):
291             return (target, "keep:" + target[len(self.keepdir)+1:])
292         else:
293             return super(ArvPathMapper, self).reversemap(target)
294
295
296 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
297     def __init__(self, arvrunner, toolpath_object, **kwargs):
298         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
299         self.arvrunner = arvrunner
300
301     def makeJobRunner(self):
302         return ArvadosJob(self.arvrunner)
303
304     def makePathMapper(self, reffiles, input_basedir, **kwargs):
305         return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
306                              "$(task.keep)/%s",
307                              "$(task.keep)/%s/%s",
308                              **kwargs)
309
310
311 class ArvCwlRunner(object):
312     def __init__(self, api_client):
313         self.api = api_client
314         self.jobs = {}
315         self.lock = threading.Lock()
316         self.cond = threading.Condition(self.lock)
317         self.final_output = None
318         self.uploaded = {}
319         self.num_retries = 4
320
321     def arvMakeTool(self, toolpath_object, **kwargs):
322         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
323             return ArvadosCommandTool(self, toolpath_object, **kwargs)
324         else:
325             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
326
327     def output_callback(self, out, processStatus):
328         if processStatus == "success":
329             logger.info("Overall job status is %s", processStatus)
330             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
331                                                  body={"state": "Complete"}).execute(num_retries=self.num_retries)
332
333         else:
334             logger.warn("Overall job status is %s", processStatus)
335             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
336                                                  body={"state": "Failed"}).execute(num_retries=self.num_retries)
337         self.final_output = out
338
339
340     def on_message(self, event):
341         if "object_uuid" in event:
342             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
343                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
344                     uuid = event["object_uuid"]
345                     with self.lock:
346                         j = self.jobs[uuid]
347                         logger.info("Job %s (%s) is Running", j.name, uuid)
348                         j.running = True
349                         j.update_pipeline_component(event["properties"]["new_attributes"])
350                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
351                     uuid = event["object_uuid"]
352                     try:
353                         self.cond.acquire()
354                         j = self.jobs[uuid]
355                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
356                         j.done(event["properties"]["new_attributes"])
357                         self.cond.notify()
358                     finally:
359                         self.cond.release()
360
361     def get_uploaded(self):
362         return self.uploaded.copy()
363
364     def add_uploaded(self, src, pair):
365         self.uploaded[src] = pair
366
367     def upload_docker(self, tool):
368         pass
369
370     def submit(self, tool, job_order, input_basedir, args, **kwargs):
371         files = set()
372         def visitFiles(self, path):
373             files.add(path)
374
375         adjustFiles(process.scandeps("", tool.tool,
376                                      set(("run")),
377                                      set(("$schemas", "path"))),
378                     visitFiles)
379         adjustFiles(job_order, visitFiles)
380
381         mapper = ArvPathMapper(self, files, "",
382                                "$(task.keep)/%s",
383                                "$(task.keep)/%s/%s",
384                                **kwargs)
385
386         job_order = adjustFiles(job_order, lambda p: mapper.mapper(p))
387
388         response = self.api.jobs().create(body={
389             "script": "cwl-runner",
390             "script_version": "8654-arv-jobs-cwl-runner",
391             "repository": "arvados",
392             "script_parameters": job_order,
393             "runtime_constraints": {
394                 "docker_image": "arvados/jobs"
395             }
396         }, find_or_create=args.enable_reuse).execute(num_retries=self.num_retries)
397         print response["uuid"]
398         return None
399
400
401     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
402         if args.submit:
403             self.submit(tool, job_order, input_basedir, args, **kwargs)
404             return
405
406         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
407
408         self.debug = args.debug
409
410         try:
411             self.api.collections().get(uuid=crunchrunner_pdh).execute()
412         except arvados.errors.ApiError as e:
413             import httplib2
414             h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
415             resp, content = h.request(crunchrunner_download, "GET")
416             resp2, content2 = h.request(certs_download, "GET")
417             with arvados.collection.Collection() as col:
418                 with col.open("crunchrunner", "w") as f:
419                     f.write(content)
420                 with col.open("ca-certificates.crt", "w") as f:
421                     f.write(content2)
422
423                 col.save_new("crunchrunner binary", ensure_unique_name=True)
424
425         self.fs_access = CollectionFsAccess(input_basedir)
426
427         kwargs["fs_access"] = self.fs_access
428         kwargs["enable_reuse"] = args.enable_reuse
429
430         kwargs["outdir"] = "$(task.outdir)"
431         kwargs["tmpdir"] = "$(task.tmpdir)"
432
433         useruuid = self.api.users().current().execute()["uuid"]
434         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
435
436         if kwargs.get("conformance_test"):
437             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
438         else:
439             self.pipeline = self.api.pipeline_instances().create(
440                 body={
441                     "owner_uuid": self.project_uuid,
442                     "name": shortname(tool.tool["id"]),
443                     "components": {},
444                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
445
446             logger.info("Pipeline instance %s", self.pipeline["uuid"])
447
448             jobiter = tool.job(job_order,
449                                input_basedir,
450                                self.output_callback,
451                                docker_outdir="$(task.outdir)",
452                                **kwargs)
453
454             try:
455                 self.cond.acquire()
456                 # Will continue to hold the lock for the duration of this code
457                 # except when in cond.wait(), at which point on_message can update
458                 # job state and process output callbacks.
459
460                 for runnable in jobiter:
461                     if runnable:
462                         runnable.run(**kwargs)
463                     else:
464                         if self.jobs:
465                             self.cond.wait(1)
466                         else:
467                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
468                             break
469
470                 while self.jobs:
471                     self.cond.wait(1)
472
473                 events.close()
474
475                 if self.final_output is None:
476                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
477
478                 # create final output collection
479             except:
480                 if sys.exc_info()[0] is KeyboardInterrupt:
481                     logger.error("Interrupted, marking pipeline as failed")
482                 else:
483                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
484                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
485                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
486             finally:
487                 self.cond.release()
488
489             return self.final_output
490
491
492 def main(args, stdout, stderr, api_client=None):
493     args.insert(0, "--leave-outputs")
494     parser = cwltool.main.arg_parser()
495     exgroup = parser.add_mutually_exclusive_group()
496     exgroup.add_argument("--enable-reuse", action="store_true",
497                         default=True, dest="enable_reuse",
498                         help="")
499     exgroup.add_argument("--disable-reuse", action="store_false",
500                         default=True, dest="enable_reuse",
501                         help="")
502     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
503     parser.add_argument("--submit", type=str, help="Submit job and print job uuid.")
504
505     try:
506         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
507     except Exception as e:
508         logger.error(e)
509         return 1
510
511     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)