More small build script improvements.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 from cwltool.errors import WorkflowException
15 import threading
16 import cwltool.docker
17 import fnmatch
18 import logging
19 import re
20 import os
21 import sys
22
23 from cwltool.process import get_feature
24 from arvados.api import OrderedJsonModel
25
26 logger = logging.getLogger('arvados.cwl-runner')
27 logger.setLevel(logging.INFO)
28
29 crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
30 crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
31 certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
32
33 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
34 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
35 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
36
37
38 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
39     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
40         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
41
42     sp = dockerRequirement["dockerImageId"].split(":")
43     image_name = sp[0]
44     image_tag = sp[1] if len(sp) > 1 else None
45
46     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
47                                                             image_name=image_name,
48                                                             image_tag=image_tag)
49
50     if not images:
51         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
52         args = ["--project-uuid="+project_uuid, image_name]
53         if image_tag:
54             args.append(image_tag)
55         logger.info("Uploading Docker image %s", ":".join(args[1:]))
56         arvados.commands.keepdocker.main(args)
57
58     return dockerRequirement["dockerImageId"]
59
60
61 class CollectionFsAccess(cwltool.process.StdFsAccess):
62     def __init__(self, basedir):
63         self.collections = {}
64         self.basedir = basedir
65
66     def get_collection(self, path):
67         p = path.split("/")
68         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
69             pdh = p[0][5:]
70             if pdh not in self.collections:
71                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
72             return (self.collections[pdh], "/".join(p[1:]))
73         else:
74             return (None, path)
75
76     def _match(self, collection, patternsegments, parent):
77         if not patternsegments:
78             return []
79
80         if not isinstance(collection, arvados.collection.RichCollectionBase):
81             return []
82
83         ret = []
84         # iterate over the files and subcollections in 'collection'
85         for filename in collection:
86             if patternsegments[0] == '.':
87                 # Pattern contains something like "./foo" so just shift
88                 # past the "./"
89                 ret.extend(self._match(collection, patternsegments[1:], parent))
90             elif fnmatch.fnmatch(filename, patternsegments[0]):
91                 cur = os.path.join(parent, filename)
92                 if len(patternsegments) == 1:
93                     ret.append(cur)
94                 else:
95                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
96         return ret
97
98     def glob(self, pattern):
99         collection, rest = self.get_collection(pattern)
100         patternsegments = rest.split("/")
101         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
102
103     def open(self, fn, mode):
104         collection, rest = self.get_collection(fn)
105         if collection:
106             return collection.open(rest, mode)
107         else:
108             return open(self._abs(fn), mode)
109
110     def exists(self, fn):
111         collection, rest = self.get_collection(fn)
112         if collection:
113             return collection.exists(rest)
114         else:
115             return os.path.exists(self._abs(fn))
116
117 class ArvadosJob(object):
118     def __init__(self, runner):
119         self.arvrunner = runner
120         self.running = False
121
122     def run(self, dry_run=False, pull_image=True, **kwargs):
123         script_parameters = {
124             "command": self.command_line
125         }
126         runtime_constraints = {}
127
128         if self.generatefiles:
129             vwd = arvados.collection.Collection()
130             script_parameters["task.vwd"] = {}
131             for t in self.generatefiles:
132                 if isinstance(self.generatefiles[t], dict):
133                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
134                     vwd.copy(rest, t, source_collection=src)
135                 else:
136                     with vwd.open(t, "w") as f:
137                         f.write(self.generatefiles[t])
138             vwd.save_new()
139             for t in self.generatefiles:
140                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
141
142         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
143         if self.environment:
144             script_parameters["task.env"].update(self.environment)
145
146         if self.stdin:
147             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
148
149         if self.stdout:
150             script_parameters["task.stdout"] = self.stdout
151
152         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
153         if docker_req and kwargs.get("use_container") is not False:
154             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
155
156         resources = self.builder.resources
157         if resources is not None:
158             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
159             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
160             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
161
162         try:
163             response = self.arvrunner.api.jobs().create(body={
164                 "owner_uuid": self.arvrunner.project_uuid,
165                 "script": "crunchrunner",
166                 "repository": "arvados",
167                 "script_version": "master",
168                 "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
169                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
170                 "runtime_constraints": runtime_constraints
171             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
172
173             self.arvrunner.jobs[response["uuid"]] = self
174
175             self.arvrunner.pipeline["components"][self.name] = {"job": response}
176             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
177                                                                                      body={
178                                                                                          "components": self.arvrunner.pipeline["components"]
179                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
180
181             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
182
183             if response["state"] in ("Complete", "Failed", "Cancelled"):
184                 self.done(response)
185         except Exception as e:
186             logger.error("Got error %s" % str(e))
187             self.output_callback({}, "permanentFail")
188
189     def update_pipeline_component(self, record):
190         self.arvrunner.pipeline["components"][self.name] = {"job": record}
191         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
192                                                                                  body={
193                                                                                     "components": self.arvrunner.pipeline["components"]
194                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
195
196     def done(self, record):
197         try:
198             self.update_pipeline_component(record)
199         except:
200             pass
201
202         try:
203             if record["state"] == "Complete":
204                 processStatus = "success"
205             else:
206                 processStatus = "permanentFail"
207
208             try:
209                 outputs = {}
210                 if record["output"]:
211                     logc = arvados.collection.Collection(record["log"])
212                     log = logc.open(logc.keys()[0])
213                     tmpdir = None
214                     outdir = None
215                     keepdir = None
216                     for l in log.readlines():
217                         g = tmpdirre.match(l)
218                         if g:
219                             tmpdir = g.group(1)
220                         g = outdirre.match(l)
221                         if g:
222                             outdir = g.group(1)
223                         g = keepre.match(l)
224                         if g:
225                             keepdir = g.group(1)
226
227                         # It turns out if the job fails and restarts it can
228                         # come up on a different compute node, so we have to
229                         # read the log to the end to be sure instead of taking the
230                         # easy way out.
231                         #
232                         #if tmpdir and outdir and keepdir:
233                         #    break
234
235                     self.builder.outdir = outdir
236                     self.builder.pathmapper.keepdir = keepdir
237                     outputs = self.collect_outputs("keep:" + record["output"])
238             except WorkflowException as e:
239                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
240                 processStatus = "permanentFail"
241             except Exception as e:
242                 logger.exception("Got unknown exception while collecting job outputs:")
243                 processStatus = "permanentFail"
244
245             self.output_callback(outputs, processStatus)
246         finally:
247             del self.arvrunner.jobs[record["uuid"]]
248
249
250 class ArvPathMapper(cwltool.pathmapper.PathMapper):
251     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
252         self._pathmap = arvrunner.get_uploaded()
253         uploadfiles = []
254
255         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
256
257         for src in referenced_files:
258             if isinstance(src, basestring) and pdh_path.match(src):
259                 self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
260             if src not in self._pathmap:
261                 ab = cwltool.pathmapper.abspath(src, basedir)
262                 st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
263                 if kwargs.get("conformance_test"):
264                     self._pathmap[src] = (src, ab)
265                 elif isinstance(st, arvados.commands.run.UploadFile):
266                     uploadfiles.append((src, ab, st))
267                 elif isinstance(st, arvados.commands.run.ArvFile):
268                     self._pathmap[src] = (ab, st.fn)
269                 else:
270                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
271
272         if uploadfiles:
273             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
274                                              arvrunner.api,
275                                              dry_run=kwargs.get("dry_run"),
276                                              num_retries=3,
277                                              fnPattern="$(task.keep)/%s/%s",
278                                              project=arvrunner.project_uuid)
279
280         for src, ab, st in uploadfiles:
281             arvrunner.add_uploaded(src, (ab, st.fn))
282             self._pathmap[src] = (ab, st.fn)
283
284         self.keepdir = None
285
286     def reversemap(self, target):
287         if target.startswith("keep:"):
288             return (target, target)
289         elif self.keepdir and target.startswith(self.keepdir):
290             return (target, "keep:" + target[len(self.keepdir)+1:])
291         else:
292             return super(ArvPathMapper, self).reversemap(target)
293
294
295 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
296     def __init__(self, arvrunner, toolpath_object, **kwargs):
297         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
298         self.arvrunner = arvrunner
299
300     def makeJobRunner(self):
301         return ArvadosJob(self.arvrunner)
302
303     def makePathMapper(self, reffiles, input_basedir, **kwargs):
304         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
305
306
307 class ArvCwlRunner(object):
308     def __init__(self, api_client):
309         self.api = api_client
310         self.jobs = {}
311         self.lock = threading.Lock()
312         self.cond = threading.Condition(self.lock)
313         self.final_output = None
314         self.uploaded = {}
315         self.num_retries = 4
316
317     def arvMakeTool(self, toolpath_object, **kwargs):
318         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
319             return ArvadosCommandTool(self, toolpath_object, **kwargs)
320         else:
321             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
322
323     def output_callback(self, out, processStatus):
324         if processStatus == "success":
325             logger.info("Overall job status is %s", processStatus)
326             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
327                                                  body={"state": "Complete"}).execute(num_retries=self.num_retries)
328
329         else:
330             logger.warn("Overall job status is %s", processStatus)
331             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
332                                                  body={"state": "Failed"}).execute(num_retries=self.num_retries)
333         self.final_output = out
334
335
336     def on_message(self, event):
337         if "object_uuid" in event:
338             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
339                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
340                     uuid = event["object_uuid"]
341                     with self.lock:
342                         j = self.jobs[uuid]
343                         logger.info("Job %s (%s) is Running", j.name, uuid)
344                         j.running = True
345                         j.update_pipeline_component(event["properties"]["new_attributes"])
346                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
347                     uuid = event["object_uuid"]
348                     try:
349                         self.cond.acquire()
350                         j = self.jobs[uuid]
351                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
352                         j.done(event["properties"]["new_attributes"])
353                         self.cond.notify()
354                     finally:
355                         self.cond.release()
356
357     def get_uploaded(self):
358         return self.uploaded.copy()
359
360     def add_uploaded(self, src, pair):
361         self.uploaded[src] = pair
362
363     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
364         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
365
366         self.debug = args.debug
367
368         try:
369             self.api.collections().get(uuid=crunchrunner_pdh).execute()
370         except arvados.errors.ApiError as e:
371             import httplib2
372             h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
373             resp, content = h.request(crunchrunner_download, "GET")
374             resp2, content2 = h.request(certs_download, "GET")
375             with arvados.collection.Collection() as col:
376                 with col.open("crunchrunner", "w") as f:
377                     f.write(content)
378                 with col.open("ca-certificates.crt", "w") as f:
379                     f.write(content2)
380
381                 col.save_new("crunchrunner binary", ensure_unique_name=True)
382
383         self.fs_access = CollectionFsAccess(input_basedir)
384
385         kwargs["fs_access"] = self.fs_access
386         kwargs["enable_reuse"] = args.enable_reuse
387
388         kwargs["outdir"] = "$(task.outdir)"
389         kwargs["tmpdir"] = "$(task.tmpdir)"
390
391         useruuid = self.api.users().current().execute()["uuid"]
392         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
393
394         if kwargs.get("conformance_test"):
395             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
396         else:
397             self.pipeline = self.api.pipeline_instances().create(
398                 body={
399                     "owner_uuid": self.project_uuid,
400                     "name": shortname(tool.tool["id"]),
401                     "components": {},
402                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
403
404             logger.info("Pipeline instance %s", self.pipeline["uuid"])
405
406             jobiter = tool.job(job_order,
407                                input_basedir,
408                                self.output_callback,
409                                docker_outdir="$(task.outdir)",
410                                **kwargs)
411
412             try:
413                 self.cond.acquire()
414                 # Will continue to hold the lock for the duration of this code
415                 # except when in cond.wait(), at which point on_message can update
416                 # job state and process output callbacks.
417
418                 for runnable in jobiter:
419                     if runnable:
420                         runnable.run(**kwargs)
421                     else:
422                         if self.jobs:
423                             self.cond.wait(1)
424                         else:
425                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
426                             break
427
428                 while self.jobs:
429                     self.cond.wait(1)
430
431                 events.close()
432
433                 if self.final_output is None:
434                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
435
436                 # create final output collection
437             except:
438                 if sys.exc_info()[0] is KeyboardInterrupt:
439                     logger.error("Interrupted, marking pipeline as failed")
440                 else:
441                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
442                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
443                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
444             finally:
445                 self.cond.release()
446
447             return self.final_output
448
449
450 def main(args, stdout, stderr, api_client=None):
451     args.insert(0, "--leave-outputs")
452     parser = cwltool.main.arg_parser()
453     exgroup = parser.add_mutually_exclusive_group()
454     exgroup.add_argument("--enable-reuse", action="store_true",
455                         default=True, dest="enable_reuse",
456                         help="")
457     exgroup.add_argument("--disable-reuse", action="store_false",
458                         default=True, dest="enable_reuse",
459                         help="")
460     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
461
462     try:
463         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
464     except Exception as e:
465         logger.error(e)
466         return 1
467
468     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)