Merge branch '10077-workbench-integration-tests' closes #10077
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from. runner import Runner
28 from .arvtool import ArvadosCommandTool
29 from .arvworkflow import ArvadosWorkflow, upload_workflow
30 from .fsaccess import CollectionFsAccess
31 from .perf import Perf
32 from .pathmapper import FinalOutputPathMapper
33
34 from cwltool.pack import pack
35 from cwltool.process import shortname, UnsupportedRequirement, getListing
36 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
37 from cwltool.draft2tool import compute_checksums
38 from arvados.api import OrderedJsonModel
39
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42 logger.setLevel(logging.INFO)
43
44
45 class ArvCwlRunner(object):
46     """Execute a CWL tool or workflow, submit work (using either jobs or
47     containers API), wait for them to complete, and report output.
48
49     """
50
51     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
52         self.api = api_client
53         self.processes = {}
54         self.lock = threading.Lock()
55         self.cond = threading.Condition(self.lock)
56         self.final_output = None
57         self.final_status = None
58         self.uploaded = {}
59         self.num_retries = 4
60         self.uuid = None
61         self.work_api = work_api
62         self.stop_polling = threading.Event()
63         self.poll_api = None
64         self.pipeline = None
65         self.final_output_collection = None
66         self.output_name = output_name
67         if keep_client is not None:
68             self.keep_client = keep_client
69         else:
70             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
71
72         if self.work_api is None:
73             # todo: autodetect API to use.
74             self.work_api = "jobs"
75
76         if self.work_api not in ("containers", "jobs"):
77             raise Exception("Unsupported API '%s'" % self.work_api)
78
79     def arv_make_tool(self, toolpath_object, **kwargs):
80         kwargs["work_api"] = self.work_api
81         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
82             return ArvadosCommandTool(self, toolpath_object, **kwargs)
83         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
84             return ArvadosWorkflow(self, toolpath_object, **kwargs)
85         else:
86             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
87
88     def output_callback(self, out, processStatus):
89         if processStatus == "success":
90             logger.info("Overall process status is %s", processStatus)
91             if self.pipeline:
92                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
93                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
94         else:
95             logger.warn("Overall process status is %s", processStatus)
96             if self.pipeline:
97                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
98                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
99         self.final_status = processStatus
100         self.final_output = out
101
102     def on_message(self, event):
103         if "object_uuid" in event:
104             if event["object_uuid"] in self.processes and event["event_type"] == "update":
105                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
106                     uuid = event["object_uuid"]
107                     with self.lock:
108                         j = self.processes[uuid]
109                         logger.info("Job %s (%s) is Running", j.name, uuid)
110                         j.running = True
111                         j.update_pipeline_component(event["properties"]["new_attributes"])
112                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
113                     uuid = event["object_uuid"]
114                     try:
115                         self.cond.acquire()
116                         j = self.processes[uuid]
117                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
118                         with Perf(metrics, "done %s" % j.name):
119                             j.done(event["properties"]["new_attributes"])
120                         self.cond.notify()
121                     finally:
122                         self.cond.release()
123
124     def poll_states(self):
125         """Poll status of jobs or containers listed in the processes dict.
126
127         Runs in a separate thread.
128         """
129
130         while True:
131             self.stop_polling.wait(15)
132             if self.stop_polling.is_set():
133                 break
134             with self.lock:
135                 keys = self.processes.keys()
136             if not keys:
137                 continue
138
139             if self.work_api == "containers":
140                 table = self.poll_api.containers()
141             elif self.work_api == "jobs":
142                 table = self.poll_api.jobs()
143
144             try:
145                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
146             except Exception as e:
147                 logger.warn("Error checking states on API server: %s", e)
148                 continue
149
150             for p in proc_states["items"]:
151                 self.on_message({
152                     "object_uuid": p["uuid"],
153                     "event_type": "update",
154                     "properties": {
155                         "new_attributes": p
156                     }
157                 })
158
159     def get_uploaded(self):
160         return self.uploaded.copy()
161
162     def add_uploaded(self, src, pair):
163         self.uploaded[src] = pair
164
165     def check_writable(self, obj):
166         if isinstance(obj, dict):
167             if obj.get("writable"):
168                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
169             for v in obj.itervalues():
170                 self.check_writable(v)
171         if isinstance(obj, list):
172             for v in obj:
173                 self.check_writable(v)
174
175     def make_output_collection(self, name, outputObj):
176         outputObj = copy.deepcopy(outputObj)
177
178         files = []
179         def capture(fileobj):
180             files.append(fileobj)
181
182         adjustDirObjs(outputObj, capture)
183         adjustFileObjs(outputObj, capture)
184
185         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
186
187         final = arvados.collection.Collection(api_client=self.api,
188                                               keep_client=self.keep_client,
189                                               num_retries=self.num_retries)
190
191         srccollections = {}
192         for k,v in generatemapper.items():
193             sp = k.split("/")
194             srccollection = sp[0][5:]
195             if srccollection not in srccollections:
196                 srccollections[srccollection] = arvados.collection.CollectionReader(
197                     srccollection,
198                     api_client=self.api,
199                     keep_client=self.keep_client,
200                     num_retries=self.num_retries)
201             reader = srccollections[srccollection]
202             try:
203                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
204                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
205             except IOError as e:
206                 logger.warn("While preparing output collection: %s", e)
207
208         def rewrite(fileobj):
209             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
210             for k in ("basename", "size", "listing"):
211                 if k in fileobj:
212                     del fileobj[k]
213
214         adjustDirObjs(outputObj, rewrite)
215         adjustFileObjs(outputObj, rewrite)
216
217         with final.open("cwl.output.json", "w") as f:
218             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
219
220         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
221
222         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
223                     final.api_response()["name"],
224                     final.manifest_locator())
225
226         self.final_output_collection = final
227
228     def arv_executor(self, tool, job_order, **kwargs):
229         self.debug = kwargs.get("debug")
230
231         tool.visit(self.check_writable)
232
233         useruuid = self.api.users().current().execute()["uuid"]
234         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
235         self.pipeline = None
236         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
237                                                                  api_client=self.api,
238                                                                  keep_client=self.keep_client)
239         self.fs_access = make_fs_access(kwargs["basedir"])
240
241         if kwargs.get("create_template"):
242             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
243             tmpl.save()
244             # cwltool.main will write our return value to stdout.
245             return tmpl.uuid
246
247         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
248             return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
249
250         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
251
252         kwargs["make_fs_access"] = make_fs_access
253         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
254         kwargs["use_container"] = True
255         kwargs["tmpdir_prefix"] = "tmp"
256         kwargs["on_error"] = "continue"
257         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
258
259         if self.work_api == "containers":
260             kwargs["outdir"] = "/var/spool/cwl"
261             kwargs["docker_outdir"] = "/var/spool/cwl"
262             kwargs["tmpdir"] = "/tmp"
263             kwargs["docker_tmpdir"] = "/tmp"
264         elif self.work_api == "jobs":
265             kwargs["outdir"] = "$(task.outdir)"
266             kwargs["docker_outdir"] = "$(task.outdir)"
267             kwargs["tmpdir"] = "$(task.tmpdir)"
268
269         runnerjob = None
270         if kwargs.get("submit"):
271             if self.work_api == "containers":
272                 if tool.tool["class"] == "CommandLineTool":
273                     runnerjob = tool.job(job_order,
274                                          self.output_callback,
275                                          **kwargs).next()
276                 else:
277                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
278             else:
279                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
280
281         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
282             # Create pipeline for local run
283             self.pipeline = self.api.pipeline_instances().create(
284                 body={
285                     "owner_uuid": self.project_uuid,
286                     "name": shortname(tool.tool["id"]),
287                     "components": {},
288                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
289             logger.info("Pipeline instance %s", self.pipeline["uuid"])
290
291         if runnerjob and not kwargs.get("wait"):
292             runnerjob.run()
293             return runnerjob.uuid
294
295         self.poll_api = arvados.api('v1')
296         self.polling_thread = threading.Thread(target=self.poll_states)
297         self.polling_thread.start()
298
299         if runnerjob:
300             jobiter = iter((runnerjob,))
301         else:
302             if "cwl_runner_job" in kwargs:
303                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
304             jobiter = tool.job(job_order,
305                                self.output_callback,
306                                **kwargs)
307
308         try:
309             self.cond.acquire()
310             # Will continue to hold the lock for the duration of this code
311             # except when in cond.wait(), at which point on_message can update
312             # job state and process output callbacks.
313
314             loopperf = Perf(metrics, "jobiter")
315             loopperf.__enter__()
316             for runnable in jobiter:
317                 loopperf.__exit__()
318                 if runnable:
319                     with Perf(metrics, "run"):
320                         runnable.run(**kwargs)
321                 else:
322                     if self.processes:
323                         self.cond.wait(1)
324                     else:
325                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
326                         break
327                 loopperf.__enter__()
328             loopperf.__exit__()
329
330             while self.processes:
331                 self.cond.wait(1)
332
333         except UnsupportedRequirement:
334             raise
335         except:
336             if sys.exc_info()[0] is KeyboardInterrupt:
337                 logger.error("Interrupted, marking pipeline as failed")
338             else:
339                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
340             if self.pipeline:
341                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
342                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
343             if runnerjob and runnerjob.uuid and self.work_api == "containers":
344                 self.api.container_requests().update(uuid=runnerjob.uuid,
345                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
346         finally:
347             self.cond.release()
348             self.stop_polling.set()
349             self.polling_thread.join()
350
351         if self.final_status == "UnsupportedRequirement":
352             raise UnsupportedRequirement("Check log for details.")
353
354         if self.final_status != "success":
355             raise WorkflowException("Workflow failed.")
356
357         if self.final_output is None:
358             raise WorkflowException("Workflow did not return a result.")
359
360         if kwargs.get("submit") and isinstance(runnerjob, Runner):
361             logger.info("Final output collection %s", runnerjob.final_output)
362         else:
363             if self.output_name is None:
364                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
365             self.make_output_collection(self.output_name, self.final_output)
366
367         if kwargs.get("compute_checksum"):
368             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
369             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
370
371         return self.final_output
372
373
374 def versionstring():
375     """Print version string of key packages for provenance and debugging."""
376
377     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
378     arvpkg = pkg_resources.require("arvados-python-client")
379     cwlpkg = pkg_resources.require("cwltool")
380
381     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
382                                     "arvados-python-client", arvpkg[0].version,
383                                     "cwltool", cwlpkg[0].version)
384
385
386 def arg_parser():  # type: () -> argparse.ArgumentParser
387     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
388
389     parser.add_argument("--basedir", type=str,
390                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
391     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
392                         help="Output directory, default current directory")
393
394     parser.add_argument("--eval-timeout",
395                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
396                         type=float,
397                         default=20)
398     parser.add_argument("--version", action="store_true", help="Print version and exit")
399
400     exgroup = parser.add_mutually_exclusive_group()
401     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
402     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
403     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
404
405     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
406
407     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
408
409     exgroup = parser.add_mutually_exclusive_group()
410     exgroup.add_argument("--enable-reuse", action="store_true",
411                         default=True, dest="enable_reuse",
412                         help="")
413     exgroup.add_argument("--disable-reuse", action="store_false",
414                         default=True, dest="enable_reuse",
415                         help="")
416
417     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
418     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
419     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
420                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
421                         default=False)
422
423     exgroup = parser.add_mutually_exclusive_group()
424     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
425                         default=True, dest="submit")
426     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
427                         default=True, dest="submit")
428     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
429     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
430     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
431
432     exgroup = parser.add_mutually_exclusive_group()
433     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
434                         default=True, dest="wait")
435     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
436                         default=True, dest="wait")
437
438     parser.add_argument("--api", type=str,
439                         default=None, dest="work_api",
440                         help="Select work submission API, one of 'jobs' or 'containers'.")
441
442     parser.add_argument("--compute-checksum", action="store_true", default=False,
443                         help="Compute checksum of contents while collecting outputs",
444                         dest="compute_checksum")
445
446     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
447     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
448
449     return parser
450
451 def add_arv_hints():
452     cache = {}
453     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
454     cache["http://arvados.org/cwl"] = res.read()
455     res.close()
456     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
457     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
458     for n in extnames.names:
459         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
460             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
461         document_loader.idx["http://arvados.org/cwl#"+n] = {}
462
463 def main(args, stdout, stderr, api_client=None, keep_client=None):
464     parser = arg_parser()
465
466     job_order_object = None
467     arvargs = parser.parse_args(args)
468     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
469         job_order_object = ({}, "")
470
471     add_arv_hints()
472
473     try:
474         if api_client is None:
475             api_client=arvados.api('v1', model=OrderedJsonModel())
476         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
477     except Exception as e:
478         logger.error(e)
479         return 1
480
481     if arvargs.debug:
482         logger.setLevel(logging.DEBUG)
483
484     if arvargs.quiet:
485         logger.setLevel(logging.WARN)
486         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
487
488     if arvargs.metrics:
489         metrics.setLevel(logging.DEBUG)
490         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
491
492     arvargs.conformance_test = None
493     arvargs.use_container = True
494
495     return cwltool.main.main(args=arvargs,
496                              stdout=stdout,
497                              stderr=stderr,
498                              executor=runner.arv_executor,
499                              makeTool=runner.arv_make_tool,
500                              versionfunc=versionstring,
501                              job_order_object=job_order_object,
502                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))