Merge branch 'master' into 8558-cwl-propagate-resource-req
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 import threading
15 import cwltool.docker
16 import fnmatch
17 import logging
18 import re
19 import os
20 import sys
21
22 from cwltool.process import get_feature
23 from arvados.api import OrderedJsonModel
24
25 logger = logging.getLogger('arvados.cwl-runner')
26 logger.setLevel(logging.INFO)
27
28 crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
29 crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
30 certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
31
32 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
33 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
34 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
35
36
37 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
38     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
39         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
40
41     sp = dockerRequirement["dockerImageId"].split(":")
42     image_name = sp[0]
43     image_tag = sp[1] if len(sp) > 1 else None
44
45     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
46                                                             image_name=image_name,
47                                                             image_tag=image_tag)
48
49     if not images:
50         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
51         args = [image_name]
52         if image_tag:
53             args.append(image_tag)
54         logger.info("Uploading Docker image %s", ":".join(args))
55         arvados.commands.keepdocker.main(args)
56
57     return dockerRequirement["dockerImageId"]
58
59
60 class CollectionFsAccess(cwltool.process.StdFsAccess):
61     def __init__(self, basedir):
62         self.collections = {}
63         self.basedir = basedir
64
65     def get_collection(self, path):
66         p = path.split("/")
67         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
68             pdh = p[0][5:]
69             if pdh not in self.collections:
70                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
71             return (self.collections[pdh], "/".join(p[1:]))
72         else:
73             return (None, path)
74
75     def _match(self, collection, patternsegments, parent):
76         if not patternsegments:
77             return []
78
79         if not isinstance(collection, arvados.collection.RichCollectionBase):
80             return []
81
82         ret = []
83         # iterate over the files and subcollections in 'collection'
84         for filename in collection:
85             if patternsegments[0] == '.':
86                 # Pattern contains something like "./foo" so just shift
87                 # past the "./"
88                 ret.extend(self._match(collection, patternsegments[1:], parent))
89             elif fnmatch.fnmatch(filename, patternsegments[0]):
90                 cur = os.path.join(parent, filename)
91                 if len(patternsegments) == 1:
92                     ret.append(cur)
93                 else:
94                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
95         return ret
96
97     def glob(self, pattern):
98         collection, rest = self.get_collection(pattern)
99         patternsegments = rest.split("/")
100         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
101
102     def open(self, fn, mode):
103         collection, rest = self.get_collection(fn)
104         if collection:
105             return collection.open(rest, mode)
106         else:
107             return open(self._abs(fn), mode)
108
109     def exists(self, fn):
110         collection, rest = self.get_collection(fn)
111         if collection:
112             return collection.exists(rest)
113         else:
114             return os.path.exists(self._abs(fn))
115
116 class ArvadosJob(object):
117     def __init__(self, runner):
118         self.arvrunner = runner
119         self.running = False
120
121     def run(self, dry_run=False, pull_image=True, **kwargs):
122         script_parameters = {
123             "command": self.command_line
124         }
125         runtime_constraints = {}
126
127         if self.generatefiles:
128             vwd = arvados.collection.Collection()
129             script_parameters["task.vwd"] = {}
130             for t in self.generatefiles:
131                 if isinstance(self.generatefiles[t], dict):
132                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
133                     vwd.copy(rest, t, source_collection=src)
134                 else:
135                     with vwd.open(t, "w") as f:
136                         f.write(self.generatefiles[t])
137             vwd.save_new()
138             for t in self.generatefiles:
139                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
140
141         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
142         if self.environment:
143             script_parameters["task.env"].update(self.environment)
144
145         if self.stdin:
146             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
147
148         if self.stdout:
149             script_parameters["task.stdout"] = self.stdout
150
151         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
152         if docker_req and kwargs.get("use_container") is not False:
153             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
154
155         resources = self.builder.resources
156         if resources is not None:
157             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
158             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
159             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
160
161         try:
162             response = self.arvrunner.api.jobs().create(body={
163                 "script": "crunchrunner",
164                 "repository": "arvados",
165                 "script_version": "master",
166                 "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
167                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
168                 "runtime_constraints": runtime_constraints
169             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
170
171             self.arvrunner.jobs[response["uuid"]] = self
172
173             self.arvrunner.pipeline["components"][self.name] = {"job": response}
174             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
175                                                                                      body={
176                                                                                          "components": self.arvrunner.pipeline["components"]
177                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
178
179             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
180
181             if response["state"] in ("Complete", "Failed", "Cancelled"):
182                 self.done(response)
183         except Exception as e:
184             logger.error("Got error %s" % str(e))
185             self.output_callback({}, "permanentFail")
186
187     def update_pipeline_component(self, record):
188         self.arvrunner.pipeline["components"][self.name] = {"job": record}
189         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
190                                                                                  body={
191                                                                                     "components": self.arvrunner.pipeline["components"]
192                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
193
194     def done(self, record):
195         try:
196             self.update_pipeline_component(record)
197         except:
198             pass
199
200         try:
201             if record["state"] == "Complete":
202                 processStatus = "success"
203             else:
204                 processStatus = "permanentFail"
205
206             try:
207                 outputs = {}
208                 if record["output"]:
209                     logc = arvados.collection.Collection(record["log"])
210                     log = logc.open(logc.keys()[0])
211                     tmpdir = None
212                     outdir = None
213                     keepdir = None
214                     for l in log.readlines():
215                         g = tmpdirre.match(l)
216                         if g:
217                             tmpdir = g.group(1)
218                         g = outdirre.match(l)
219                         if g:
220                             outdir = g.group(1)
221                         g = keepre.match(l)
222                         if g:
223                             keepdir = g.group(1)
224                         if tmpdir and outdir and keepdir:
225                             break
226
227                     self.builder.outdir = outdir
228                     self.builder.pathmapper.keepdir = keepdir
229                     outputs = self.collect_outputs("keep:" + record["output"])
230             except Exception as e:
231                 logger.exception("Got exception while collecting job outputs:")
232                 processStatus = "permanentFail"
233
234             self.output_callback(outputs, processStatus)
235         finally:
236             del self.arvrunner.jobs[record["uuid"]]
237
238
239 class ArvPathMapper(cwltool.pathmapper.PathMapper):
240     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
241         self._pathmap = arvrunner.get_uploaded()
242         uploadfiles = []
243
244         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
245
246         for src in referenced_files:
247             if isinstance(src, basestring) and pdh_path.match(src):
248                 self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
249             if src not in self._pathmap:
250                 ab = cwltool.pathmapper.abspath(src, basedir)
251                 st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
252                 if kwargs.get("conformance_test"):
253                     self._pathmap[src] = (src, ab)
254                 elif isinstance(st, arvados.commands.run.UploadFile):
255                     uploadfiles.append((src, ab, st))
256                 elif isinstance(st, arvados.commands.run.ArvFile):
257                     self._pathmap[src] = (ab, st.fn)
258                 else:
259                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
260
261         if uploadfiles:
262             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
263                                              arvrunner.api,
264                                              dry_run=kwargs.get("dry_run"),
265                                              num_retries=3,
266                                              fnPattern="$(task.keep)/%s/%s")
267
268         for src, ab, st in uploadfiles:
269             arvrunner.add_uploaded(src, (ab, st.fn))
270             self._pathmap[src] = (ab, st.fn)
271
272         self.keepdir = None
273
274     def reversemap(self, target):
275         if target.startswith("keep:"):
276             return (target, target)
277         elif self.keepdir and target.startswith(self.keepdir):
278             return (target, "keep:" + target[len(self.keepdir)+1:])
279         else:
280             return super(ArvPathMapper, self).reversemap(target)
281
282
283 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
284     def __init__(self, arvrunner, toolpath_object, **kwargs):
285         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
286         self.arvrunner = arvrunner
287
288     def makeJobRunner(self):
289         return ArvadosJob(self.arvrunner)
290
291     def makePathMapper(self, reffiles, input_basedir, **kwargs):
292         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
293
294
295 class ArvCwlRunner(object):
296     def __init__(self, api_client):
297         self.api = api_client
298         self.jobs = {}
299         self.lock = threading.Lock()
300         self.cond = threading.Condition(self.lock)
301         self.final_output = None
302         self.uploaded = {}
303         self.num_retries = 4
304
305     def arvMakeTool(self, toolpath_object, **kwargs):
306         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
307             return ArvadosCommandTool(self, toolpath_object, **kwargs)
308         else:
309             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
310
311     def output_callback(self, out, processStatus):
312         if processStatus == "success":
313             logger.info("Overall job status is %s", processStatus)
314             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
315                                                  body={"state": "Complete"}).execute(num_retries=self.num_retries)
316
317         else:
318             logger.warn("Overall job status is %s", processStatus)
319             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
320                                                  body={"state": "Failed"}).execute(num_retries=self.num_retries)
321         self.final_output = out
322
323
324     def on_message(self, event):
325         if "object_uuid" in event:
326                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
327                     if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
328                         uuid = event["object_uuid"]
329                         with self.lock:
330                             j = self.jobs[uuid]
331                             logger.info("Job %s (%s) is Running", j.name, uuid)
332                             j.running = True
333                             j.update_pipeline_component(event["properties"]["new_attributes"])
334                     elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
335                         uuid = event["object_uuid"]
336                         try:
337                             self.cond.acquire()
338                             j = self.jobs[uuid]
339                             logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
340                             j.done(event["properties"]["new_attributes"])
341                             self.cond.notify()
342                         finally:
343                             self.cond.release()
344
345     def get_uploaded(self):
346         return self.uploaded.copy()
347
348     def add_uploaded(self, src, pair):
349         self.uploaded[src] = pair
350
351     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
352         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
353
354         try:
355             self.api.collections().get(uuid=crunchrunner_pdh).execute()
356         except arvados.errors.ApiError as e:
357             import httplib2
358             h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
359             resp, content = h.request(crunchrunner_download, "GET")
360             resp2, content2 = h.request(certs_download, "GET")
361             with arvados.collection.Collection() as col:
362                 with col.open("crunchrunner", "w") as f:
363                     f.write(content)
364                 with col.open("ca-certificates.crt", "w") as f:
365                     f.write(content2)
366
367                 col.save_new("crunchrunner binary", ensure_unique_name=True)
368
369         self.fs_access = CollectionFsAccess(input_basedir)
370
371         kwargs["fs_access"] = self.fs_access
372         kwargs["enable_reuse"] = args.enable_reuse
373
374         kwargs["outdir"] = "$(task.outdir)"
375         kwargs["tmpdir"] = "$(task.tmpdir)"
376
377         if kwargs.get("conformance_test"):
378             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
379         else:
380             self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
381                                                                    "components": {},
382                                                                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
383
384             jobiter = tool.job(job_order,
385                                input_basedir,
386                                self.output_callback,
387                                docker_outdir="$(task.outdir)",
388                                **kwargs)
389
390             try:
391                 for runnable in jobiter:
392                     if runnable:
393                         with self.lock:
394                             runnable.run(**kwargs)
395                     else:
396                         if self.jobs:
397                             try:
398                                 self.cond.acquire()
399                                 self.cond.wait(1)
400                             except RuntimeError:
401                                 pass
402                             finally:
403                                 self.cond.release()
404                         else:
405                             logger.error("Workflow cannot make any more progress.")
406                             break
407
408                 while self.jobs:
409                     try:
410                         self.cond.acquire()
411                         self.cond.wait(1)
412                     except RuntimeError:
413                         pass
414                     finally:
415                         self.cond.release()
416
417                 events.close()
418
419                 if self.final_output is None:
420                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
421
422             except:
423                 if sys.exc_info()[0] is not KeyboardInterrupt:
424                     logger.exception("Caught unhandled exception, marking pipeline as failed")
425                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
426                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
427
428             return self.final_output
429
430
431 def main(args, stdout, stderr, api_client=None):
432     args.insert(0, "--leave-outputs")
433     parser = cwltool.main.arg_parser()
434     exgroup = parser.add_mutually_exclusive_group()
435     exgroup.add_argument("--enable-reuse", action="store_true",
436                         default=False, dest="enable_reuse",
437                         help="")
438     exgroup.add_argument("--disable-reuse", action="store_false",
439                         default=False, dest="enable_reuse",
440                         help="")
441
442     try:
443         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
444     except Exception as e:
445         logger.error(e)
446         return 1
447
448     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)