8752: Skip unreliable Python SDK Keep test.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 import threading
15 import cwltool.docker
16 import fnmatch
17 import logging
18 import re
19 import os
20 import sys
21
22 from cwltool.process import get_feature
23 from arvados.api import OrderedJsonModel
24
25 logger = logging.getLogger('arvados.cwl-runner')
26 logger.setLevel(logging.INFO)
27
28 crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
29 crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
30 certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
31
32 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
33 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
34 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
35
36
37 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
38     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
39         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
40
41     sp = dockerRequirement["dockerImageId"].split(":")
42     image_name = sp[0]
43     image_tag = sp[1] if len(sp) > 1 else None
44
45     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
46                                                             image_name=image_name,
47                                                             image_tag=image_tag)
48
49     if not images:
50         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
51         args = ["--project-uuid="+project_uuid, image_name]
52         if image_tag:
53             args.append(image_tag)
54         logger.info("Uploading Docker image %s", ":".join(args[1:]))
55         arvados.commands.keepdocker.main(args)
56
57     return dockerRequirement["dockerImageId"]
58
59
60 class CollectionFsAccess(cwltool.process.StdFsAccess):
61     def __init__(self, basedir):
62         self.collections = {}
63         self.basedir = basedir
64
65     def get_collection(self, path):
66         p = path.split("/")
67         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
68             pdh = p[0][5:]
69             if pdh not in self.collections:
70                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
71             return (self.collections[pdh], "/".join(p[1:]))
72         else:
73             return (None, path)
74
75     def _match(self, collection, patternsegments, parent):
76         if not patternsegments:
77             return []
78
79         if not isinstance(collection, arvados.collection.RichCollectionBase):
80             return []
81
82         ret = []
83         # iterate over the files and subcollections in 'collection'
84         for filename in collection:
85             if patternsegments[0] == '.':
86                 # Pattern contains something like "./foo" so just shift
87                 # past the "./"
88                 ret.extend(self._match(collection, patternsegments[1:], parent))
89             elif fnmatch.fnmatch(filename, patternsegments[0]):
90                 cur = os.path.join(parent, filename)
91                 if len(patternsegments) == 1:
92                     ret.append(cur)
93                 else:
94                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
95         return ret
96
97     def glob(self, pattern):
98         collection, rest = self.get_collection(pattern)
99         patternsegments = rest.split("/")
100         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
101
102     def open(self, fn, mode):
103         collection, rest = self.get_collection(fn)
104         if collection:
105             return collection.open(rest, mode)
106         else:
107             return open(self._abs(fn), mode)
108
109     def exists(self, fn):
110         collection, rest = self.get_collection(fn)
111         if collection:
112             return collection.exists(rest)
113         else:
114             return os.path.exists(self._abs(fn))
115
116 class ArvadosJob(object):
117     def __init__(self, runner):
118         self.arvrunner = runner
119         self.running = False
120
121     def run(self, dry_run=False, pull_image=True, **kwargs):
122         script_parameters = {
123             "command": self.command_line
124         }
125         runtime_constraints = {}
126
127         if self.generatefiles:
128             vwd = arvados.collection.Collection()
129             script_parameters["task.vwd"] = {}
130             for t in self.generatefiles:
131                 if isinstance(self.generatefiles[t], dict):
132                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
133                     vwd.copy(rest, t, source_collection=src)
134                 else:
135                     with vwd.open(t, "w") as f:
136                         f.write(self.generatefiles[t])
137             vwd.save_new()
138             for t in self.generatefiles:
139                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
140
141         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
142         if self.environment:
143             script_parameters["task.env"].update(self.environment)
144
145         if self.stdin:
146             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
147
148         if self.stdout:
149             script_parameters["task.stdout"] = self.stdout
150
151         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
152         if docker_req and kwargs.get("use_container") is not False:
153             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
154
155         resources = self.builder.resources
156         if resources is not None:
157             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
158             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
159             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
160
161         try:
162             response = self.arvrunner.api.jobs().create(body={
163                 "owner_uuid": self.arvrunner.project_uuid,
164                 "script": "crunchrunner",
165                 "repository": "arvados",
166                 "script_version": "master",
167                 "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
168                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
169                 "runtime_constraints": runtime_constraints
170             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
171
172             self.arvrunner.jobs[response["uuid"]] = self
173
174             self.arvrunner.pipeline["components"][self.name] = {"job": response}
175             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
176                                                                                      body={
177                                                                                          "components": self.arvrunner.pipeline["components"]
178                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
179
180             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
181
182             if response["state"] in ("Complete", "Failed", "Cancelled"):
183                 self.done(response)
184         except Exception as e:
185             logger.error("Got error %s" % str(e))
186             self.output_callback({}, "permanentFail")
187
188     def update_pipeline_component(self, record):
189         self.arvrunner.pipeline["components"][self.name] = {"job": record}
190         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
191                                                                                  body={
192                                                                                     "components": self.arvrunner.pipeline["components"]
193                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
194
195     def done(self, record):
196         try:
197             self.update_pipeline_component(record)
198         except:
199             pass
200
201         try:
202             if record["state"] == "Complete":
203                 processStatus = "success"
204             else:
205                 processStatus = "permanentFail"
206
207             try:
208                 outputs = {}
209                 if record["output"]:
210                     logc = arvados.collection.Collection(record["log"])
211                     log = logc.open(logc.keys()[0])
212                     tmpdir = None
213                     outdir = None
214                     keepdir = None
215                     for l in log.readlines():
216                         g = tmpdirre.match(l)
217                         if g:
218                             tmpdir = g.group(1)
219                         g = outdirre.match(l)
220                         if g:
221                             outdir = g.group(1)
222                         g = keepre.match(l)
223                         if g:
224                             keepdir = g.group(1)
225                         if tmpdir and outdir and keepdir:
226                             break
227
228                     self.builder.outdir = outdir
229                     self.builder.pathmapper.keepdir = keepdir
230                     outputs = self.collect_outputs("keep:" + record["output"])
231             except Exception as e:
232                 logger.exception("Got exception while collecting job outputs:")
233                 processStatus = "permanentFail"
234
235             self.output_callback(outputs, processStatus)
236         finally:
237             del self.arvrunner.jobs[record["uuid"]]
238
239
240 class ArvPathMapper(cwltool.pathmapper.PathMapper):
241     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
242         self._pathmap = arvrunner.get_uploaded()
243         uploadfiles = []
244
245         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
246
247         for src in referenced_files:
248             if isinstance(src, basestring) and pdh_path.match(src):
249                 self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
250             if src not in self._pathmap:
251                 ab = cwltool.pathmapper.abspath(src, basedir)
252                 st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
253                 if kwargs.get("conformance_test"):
254                     self._pathmap[src] = (src, ab)
255                 elif isinstance(st, arvados.commands.run.UploadFile):
256                     uploadfiles.append((src, ab, st))
257                 elif isinstance(st, arvados.commands.run.ArvFile):
258                     self._pathmap[src] = (ab, st.fn)
259                 else:
260                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
261
262         if uploadfiles:
263             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
264                                              arvrunner.api,
265                                              dry_run=kwargs.get("dry_run"),
266                                              num_retries=3,
267                                              fnPattern="$(task.keep)/%s/%s",
268                                              project=arvrunner.project_uuid)
269
270         for src, ab, st in uploadfiles:
271             arvrunner.add_uploaded(src, (ab, st.fn))
272             self._pathmap[src] = (ab, st.fn)
273
274         self.keepdir = None
275
276     def reversemap(self, target):
277         if target.startswith("keep:"):
278             return (target, target)
279         elif self.keepdir and target.startswith(self.keepdir):
280             return (target, "keep:" + target[len(self.keepdir)+1:])
281         else:
282             return super(ArvPathMapper, self).reversemap(target)
283
284
285 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
286     def __init__(self, arvrunner, toolpath_object, **kwargs):
287         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
288         self.arvrunner = arvrunner
289
290     def makeJobRunner(self):
291         return ArvadosJob(self.arvrunner)
292
293     def makePathMapper(self, reffiles, input_basedir, **kwargs):
294         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
295
296
297 class ArvCwlRunner(object):
298     def __init__(self, api_client):
299         self.api = api_client
300         self.jobs = {}
301         self.lock = threading.Lock()
302         self.cond = threading.Condition(self.lock)
303         self.final_output = None
304         self.uploaded = {}
305         self.num_retries = 4
306
307     def arvMakeTool(self, toolpath_object, **kwargs):
308         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
309             return ArvadosCommandTool(self, toolpath_object, **kwargs)
310         else:
311             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
312
313     def output_callback(self, out, processStatus):
314         if processStatus == "success":
315             logger.info("Overall job status is %s", processStatus)
316             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
317                                                  body={"state": "Complete"}).execute(num_retries=self.num_retries)
318
319         else:
320             logger.warn("Overall job status is %s", processStatus)
321             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
322                                                  body={"state": "Failed"}).execute(num_retries=self.num_retries)
323         self.final_output = out
324
325
326     def on_message(self, event):
327         if "object_uuid" in event:
328             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
329                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
330                     uuid = event["object_uuid"]
331                     with self.lock:
332                         j = self.jobs[uuid]
333                         logger.info("Job %s (%s) is Running", j.name, uuid)
334                         j.running = True
335                         j.update_pipeline_component(event["properties"]["new_attributes"])
336                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
337                     uuid = event["object_uuid"]
338                     try:
339                         self.cond.acquire()
340                         j = self.jobs[uuid]
341                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
342                         j.done(event["properties"]["new_attributes"])
343                         self.cond.notify()
344                     finally:
345                         self.cond.release()
346
347     def get_uploaded(self):
348         return self.uploaded.copy()
349
350     def add_uploaded(self, src, pair):
351         self.uploaded[src] = pair
352
353     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
354         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
355
356         try:
357             self.api.collections().get(uuid=crunchrunner_pdh).execute()
358         except arvados.errors.ApiError as e:
359             import httplib2
360             h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
361             resp, content = h.request(crunchrunner_download, "GET")
362             resp2, content2 = h.request(certs_download, "GET")
363             with arvados.collection.Collection() as col:
364                 with col.open("crunchrunner", "w") as f:
365                     f.write(content)
366                 with col.open("ca-certificates.crt", "w") as f:
367                     f.write(content2)
368
369                 col.save_new("crunchrunner binary", ensure_unique_name=True)
370
371         self.fs_access = CollectionFsAccess(input_basedir)
372
373         kwargs["fs_access"] = self.fs_access
374         kwargs["enable_reuse"] = args.enable_reuse
375
376         kwargs["outdir"] = "$(task.outdir)"
377         kwargs["tmpdir"] = "$(task.tmpdir)"
378
379         useruuid = self.api.users().current().execute()["uuid"]
380         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
381
382         if kwargs.get("conformance_test"):
383             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
384         else:
385             self.pipeline = self.api.pipeline_instances().create(
386                 body={
387                     "owner_uuid": self.project_uuid,
388                     "name": shortname(tool.tool["id"]),
389                     "components": {},
390                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
391
392             logger.info("Pipeline instance %s", self.pipeline["uuid"])
393
394             jobiter = tool.job(job_order,
395                                input_basedir,
396                                self.output_callback,
397                                docker_outdir="$(task.outdir)",
398                                **kwargs)
399
400             try:
401                 self.cond.acquire()
402                 # Will continue to hold the lock for the duration of this code
403                 # except when in cond.wait(), at which point on_message can update
404                 # job state and process output callbacks.
405
406                 for runnable in jobiter:
407                     if runnable:
408                         runnable.run(**kwargs)
409                     else:
410                         if self.jobs:
411                             self.cond.wait(1)
412                         else:
413                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
414                             break
415
416                 while self.jobs:
417                     self.cond.wait(1)
418
419                 events.close()
420
421                 if self.final_output is None:
422                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
423
424                 # create final output collection
425             except:
426                 if sys.exc_info()[0] is KeyboardInterrupt:
427                     logger.error("Interrupted, marking pipeline as failed")
428                 else:
429                     logger.exception("Caught unhandled exception, marking pipeline as failed")
430                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
431                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
432             finally:
433                 self.cond.release()
434
435             return self.final_output
436
437
438 def main(args, stdout, stderr, api_client=None):
439     args.insert(0, "--leave-outputs")
440     parser = cwltool.main.arg_parser()
441     exgroup = parser.add_mutually_exclusive_group()
442     exgroup.add_argument("--enable-reuse", action="store_true",
443                         default=True, dest="enable_reuse",
444                         help="")
445     exgroup.add_argument("--disable-reuse", action="store_false",
446                         default=True, dest="enable_reuse",
447                         help="")
448     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
449
450     try:
451         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
452     except Exception as e:
453         logger.error(e)
454         return 1
455
456     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)