8558: add min_cores_per_node and min_ram_mb_per_node to runtime_constraints from...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 import threading
15 import cwltool.docker
16 import fnmatch
17 import logging
18 import re
19 import os
20 import sys
21
22 from cwltool.process import get_feature
23 from arvados.api import OrderedJsonModel
24
25 logger = logging.getLogger('arvados.cwl-runner')
26 logger.setLevel(logging.INFO)
27
28 crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
29 crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
30 certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
31
32 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
33 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
34 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
35
36
37 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
38     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
39         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
40
41     sp = dockerRequirement["dockerImageId"].split(":")
42     image_name = sp[0]
43     image_tag = sp[1] if len(sp) > 1 else None
44
45     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
46                                                             image_name=image_name,
47                                                             image_tag=image_tag)
48
49     if not images:
50         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
51         args = [image_name]
52         if image_tag:
53             args.append(image_tag)
54         logger.info("Uploading Docker image %s", ":".join(args))
55         arvados.commands.keepdocker.main(args)
56
57     return dockerRequirement["dockerImageId"]
58
59
60 class CollectionFsAccess(cwltool.process.StdFsAccess):
61     def __init__(self, basedir):
62         self.collections = {}
63         self.basedir = basedir
64
65     def get_collection(self, path):
66         p = path.split("/")
67         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
68             pdh = p[0][5:]
69             if pdh not in self.collections:
70                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
71             return (self.collections[pdh], "/".join(p[1:]))
72         else:
73             return (None, path)
74
75     def _match(self, collection, patternsegments, parent):
76         if not patternsegments:
77             return []
78
79         if not isinstance(collection, arvados.collection.RichCollectionBase):
80             return []
81
82         ret = []
83         # iterate over the files and subcollections in 'collection'
84         for filename in collection:
85             if patternsegments[0] == '.':
86                 # Pattern contains something like "./foo" so just shift
87                 # past the "./"
88                 ret.extend(self._match(collection, patternsegments[1:], parent))
89             elif fnmatch.fnmatch(filename, patternsegments[0]):
90                 cur = os.path.join(parent, filename)
91                 if len(patternsegments) == 1:
92                     ret.append(cur)
93                 else:
94                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
95         return ret
96
97     def glob(self, pattern):
98         collection, rest = self.get_collection(pattern)
99         patternsegments = rest.split("/")
100         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
101
102     def open(self, fn, mode):
103         collection, rest = self.get_collection(fn)
104         if collection:
105             return collection.open(rest, mode)
106         else:
107             return open(self._abs(fn), mode)
108
109     def exists(self, fn):
110         collection, rest = self.get_collection(fn)
111         if collection:
112             return collection.exists(rest)
113         else:
114             return os.path.exists(self._abs(fn))
115
116 class ArvadosJob(object):
117     def __init__(self, runner):
118         self.arvrunner = runner
119         self.running = False
120
121     def run(self, dry_run=False, pull_image=True, **kwargs):
122         script_parameters = {
123             "command": self.command_line
124         }
125         runtime_constraints = {}
126
127         if self.generatefiles:
128             vwd = arvados.collection.Collection()
129             script_parameters["task.vwd"] = {}
130             for t in self.generatefiles:
131                 if isinstance(self.generatefiles[t], dict):
132                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
133                     vwd.copy(rest, t, source_collection=src)
134                 else:
135                     with vwd.open(t, "w") as f:
136                         f.write(self.generatefiles[t])
137             vwd.save_new()
138             for t in self.generatefiles:
139                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
140
141         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
142         if self.environment:
143             script_parameters["task.env"].update(self.environment)
144
145         if self.stdin:
146             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
147
148         if self.stdout:
149             script_parameters["task.stdout"] = self.stdout
150
151         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
152         if docker_req and kwargs.get("use_container") is not False:
153             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
154
155         resources = self.builder.resources
156         if resources is not None:
157             if "coresMin" in resources.keys():
158                 try:
159                     runtime_constraints["min_cores_per_node"] = int(resources["coresMin"])
160                 except:
161                     runtime_constraints["min_cores_per_node"] = None
162             if "ramMin" in resources.keys():
163                 try:
164                     runtime_constraints["min_ram_mb_per_node"] = int(resources["ramMin"])
165                 except:
166                     runtime_constraints["min_ram_mb_per_node"] = None
167
168         try:
169             response = self.arvrunner.api.jobs().create(body={
170                 "script": "crunchrunner",
171                 "repository": "arvados",
172                 "script_version": "8488-cwl-crunchrunner-collection",
173                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
174                 "runtime_constraints": runtime_constraints
175             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
176
177             self.arvrunner.jobs[response["uuid"]] = self
178
179             self.arvrunner.pipeline["components"][self.name] = {"job": response}
180             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
181                                                                                      body={
182                                                                                          "components": self.arvrunner.pipeline["components"]
183                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
184
185             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
186
187             if response["state"] in ("Complete", "Failed", "Cancelled"):
188                 self.done(response)
189         except Exception as e:
190             logger.error("Got error %s" % str(e))
191             self.output_callback({}, "permanentFail")
192
193     def update_pipeline_component(self, record):
194         self.arvrunner.pipeline["components"][self.name] = {"job": record}
195         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
196                                                                                  body={
197                                                                                     "components": self.arvrunner.pipeline["components"]
198                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
199
200     def done(self, record):
201         try:
202             self.update_pipeline_component(record)
203         except:
204             pass
205
206         try:
207             if record["state"] == "Complete":
208                 processStatus = "success"
209             else:
210                 processStatus = "permanentFail"
211
212             try:
213                 outputs = {}
214                 if record["output"]:
215                     logc = arvados.collection.Collection(record["log"])
216                     log = logc.open(logc.keys()[0])
217                     tmpdir = None
218                     outdir = None
219                     keepdir = None
220                     for l in log.readlines():
221                         g = tmpdirre.match(l)
222                         if g:
223                             tmpdir = g.group(1)
224                         g = outdirre.match(l)
225                         if g:
226                             outdir = g.group(1)
227                         g = keepre.match(l)
228                         if g:
229                             keepdir = g.group(1)
230                         if tmpdir and outdir and keepdir:
231                             break
232
233                     self.builder.outdir = outdir
234                     self.builder.pathmapper.keepdir = keepdir
235                     outputs = self.collect_outputs("keep:" + record["output"])
236             except Exception as e:
237                 logger.exception("Got exception while collecting job outputs:")
238                 processStatus = "permanentFail"
239
240             self.output_callback(outputs, processStatus)
241         finally:
242             del self.arvrunner.jobs[record["uuid"]]
243
244
245 class ArvPathMapper(cwltool.pathmapper.PathMapper):
246     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
247         self._pathmap = arvrunner.get_uploaded()
248         uploadfiles = []
249
250         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
251
252         for src in referenced_files:
253             if isinstance(src, basestring) and pdh_path.match(src):
254                 self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
255             if src not in self._pathmap:
256                 ab = cwltool.pathmapper.abspath(src, basedir)
257                 st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
258                 if kwargs.get("conformance_test"):
259                     self._pathmap[src] = (src, ab)
260                 elif isinstance(st, arvados.commands.run.UploadFile):
261                     uploadfiles.append((src, ab, st))
262                 elif isinstance(st, arvados.commands.run.ArvFile):
263                     self._pathmap[src] = (ab, st.fn)
264                 else:
265                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
266
267         if uploadfiles:
268             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
269                                              arvrunner.api,
270                                              dry_run=kwargs.get("dry_run"),
271                                              num_retries=3,
272                                              fnPattern="$(task.keep)/%s/%s")
273
274         for src, ab, st in uploadfiles:
275             arvrunner.add_uploaded(src, (ab, st.fn))
276             self._pathmap[src] = (ab, st.fn)
277
278         self.keepdir = None
279
280     def reversemap(self, target):
281         if target.startswith("keep:"):
282             return target
283         elif self.keepdir and target.startswith(self.keepdir):
284             return "keep:" + target[len(self.keepdir)+1:]
285         else:
286             return super(ArvPathMapper, self).reversemap(target)
287
288
289 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
290     def __init__(self, arvrunner, toolpath_object, **kwargs):
291         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
292         self.arvrunner = arvrunner
293
294     def makeJobRunner(self):
295         return ArvadosJob(self.arvrunner)
296
297     def makePathMapper(self, reffiles, input_basedir, **kwargs):
298         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
299
300
301 class ArvCwlRunner(object):
302     def __init__(self, api_client):
303         self.api = api_client
304         self.jobs = {}
305         self.lock = threading.Lock()
306         self.cond = threading.Condition(self.lock)
307         self.final_output = None
308         self.uploaded = {}
309         self.num_retries = 4
310
311     def arvMakeTool(self, toolpath_object, **kwargs):
312         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
313             return ArvadosCommandTool(self, toolpath_object, **kwargs)
314         else:
315             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
316
317     def output_callback(self, out, processStatus):
318         if processStatus == "success":
319             logger.info("Overall job status is %s", processStatus)
320             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
321                                                  body={"state": "Complete"}).execute(num_retries=self.num_retries)
322
323         else:
324             logger.warn("Overall job status is %s", processStatus)
325             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
326                                                  body={"state": "Failed"}).execute(num_retries=self.num_retries)
327         self.final_output = out
328
329
330     def on_message(self, event):
331         if "object_uuid" in event:
332                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
333                     if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
334                         uuid = event["object_uuid"]
335                         with self.lock:
336                             j = self.jobs[uuid]
337                             logger.info("Job %s (%s) is Running", j.name, uuid)
338                             j.running = True
339                             j.update_pipeline_component(event["properties"]["new_attributes"])
340                     elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
341                         uuid = event["object_uuid"]
342                         try:
343                             self.cond.acquire()
344                             j = self.jobs[uuid]
345                             logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
346                             j.done(event["properties"]["new_attributes"])
347                             self.cond.notify()
348                         finally:
349                             self.cond.release()
350
351     def get_uploaded(self):
352         return self.uploaded.copy()
353
354     def add_uploaded(self, src, pair):
355         self.uploaded[src] = pair
356
357     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
358         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
359
360         try:
361             self.api.collections().get(uuid=crunchrunner_pdh).execute()
362         except arvados.errors.ApiError as e:
363             import httplib2
364             h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
365             resp, content = h.request(crunchrunner_download, "GET")
366             resp2, content2 = h.request(certs_download, "GET")
367             with arvados.collection.Collection() as col:
368                 with col.open("crunchrunner", "w") as f:
369                     f.write(content)
370                 with col.open("ca-certificates.crt", "w") as f:
371                     f.write(content2)
372
373                 col.save_new("crunchrunner binary", ensure_unique_name=True)
374
375         self.fs_access = CollectionFsAccess(input_basedir)
376
377         kwargs["fs_access"] = self.fs_access
378         kwargs["enable_reuse"] = args.enable_reuse
379
380         kwargs["outdir"] = "$(task.outdir)"
381         kwargs["tmpdir"] = "$(task.tmpdir)"
382
383         if kwargs.get("conformance_test"):
384             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
385         else:
386             self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
387                                                                    "components": {},
388                                                                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
389
390             jobiter = tool.job(job_order,
391                                input_basedir,
392                                self.output_callback,
393                                docker_outdir="$(task.outdir)",
394                                **kwargs)
395
396             try:
397                 for runnable in jobiter:
398                     if runnable:
399                         with self.lock:
400                             runnable.run(**kwargs)
401                     else:
402                         if self.jobs:
403                             try:
404                                 self.cond.acquire()
405                                 self.cond.wait(1)
406                             except RuntimeError:
407                                 pass
408                             finally:
409                                 self.cond.release()
410                         else:
411                             logger.error("Workflow cannot make any more progress.")
412                             break
413
414                 while self.jobs:
415                     try:
416                         self.cond.acquire()
417                         self.cond.wait(1)
418                     except RuntimeError:
419                         pass
420                     finally:
421                         self.cond.release()
422
423                 events.close()
424
425                 if self.final_output is None:
426                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
427
428             except:
429                 if sys.exc_info()[0] is not KeyboardInterrupt:
430                     logger.exception("Caught unhandled exception, marking pipeline as failed")
431                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
432                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
433
434             return self.final_output
435
436
437 def main(args, stdout, stderr, api_client=None):
438     args.insert(0, "--leave-outputs")
439     parser = cwltool.main.arg_parser()
440     exgroup = parser.add_mutually_exclusive_group()
441     exgroup.add_argument("--enable-reuse", action="store_true",
442                         default=False, dest="enable_reuse",
443                         help="")
444     exgroup.add_argument("--disable-reuse", action="store_false",
445                         default=False, dest="enable_reuse",
446                         help="")
447
448     try:
449         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
450     except Exception as e:
451         logger.error(e)
452         return 1
453
454     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)