Merge branch 'master' into 9956-keepstore-config
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from. runner import Runner, upload_instance
28 from .arvtool import ArvadosCommandTool
29 from .arvworkflow import ArvadosWorkflow, upload_workflow
30 from .fsaccess import CollectionFsAccess
31 from .perf import Perf
32 from .pathmapper import FinalOutputPathMapper
33
34 from cwltool.pack import pack
35 from cwltool.process import shortname, UnsupportedRequirement, getListing
36 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
37 from cwltool.draft2tool import compute_checksums
38 from arvados.api import OrderedJsonModel
39
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42 logger.setLevel(logging.INFO)
43
44
45 class ArvCwlRunner(object):
46     """Execute a CWL tool or workflow, submit work (using either jobs or
47     containers API), wait for them to complete, and report output.
48
49     """
50
51     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
52         self.api = api_client
53         self.processes = {}
54         self.lock = threading.Lock()
55         self.cond = threading.Condition(self.lock)
56         self.final_output = None
57         self.final_status = None
58         self.uploaded = {}
59         self.num_retries = 4
60         self.uuid = None
61         self.work_api = work_api
62         self.stop_polling = threading.Event()
63         self.poll_api = None
64         self.pipeline = None
65         self.final_output_collection = None
66         self.output_name = output_name
67         self.project_uuid = None
68
69         if keep_client is not None:
70             self.keep_client = keep_client
71         else:
72             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
73
74         if self.work_api is None:
75             # todo: autodetect API to use.
76             self.work_api = "jobs"
77
78         if self.work_api not in ("containers", "jobs"):
79             raise Exception("Unsupported API '%s'" % self.work_api)
80
81     def arv_make_tool(self, toolpath_object, **kwargs):
82         kwargs["work_api"] = self.work_api
83         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
84             return ArvadosCommandTool(self, toolpath_object, **kwargs)
85         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
86             return ArvadosWorkflow(self, toolpath_object, **kwargs)
87         else:
88             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
89
90     def output_callback(self, out, processStatus):
91         if processStatus == "success":
92             logger.info("Overall process status is %s", processStatus)
93             if self.pipeline:
94                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
95                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
96         else:
97             logger.warn("Overall process status is %s", processStatus)
98             if self.pipeline:
99                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
100                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
101         self.final_status = processStatus
102         self.final_output = out
103
104     def on_message(self, event):
105         if "object_uuid" in event:
106             if event["object_uuid"] in self.processes and event["event_type"] == "update":
107                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
108                     uuid = event["object_uuid"]
109                     with self.lock:
110                         j = self.processes[uuid]
111                         logger.info("Job %s (%s) is Running", j.name, uuid)
112                         j.running = True
113                         j.update_pipeline_component(event["properties"]["new_attributes"])
114                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
115                     uuid = event["object_uuid"]
116                     try:
117                         self.cond.acquire()
118                         j = self.processes[uuid]
119                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
120                         with Perf(metrics, "done %s" % j.name):
121                             j.done(event["properties"]["new_attributes"])
122                         self.cond.notify()
123                     finally:
124                         self.cond.release()
125
126     def poll_states(self):
127         """Poll status of jobs or containers listed in the processes dict.
128
129         Runs in a separate thread.
130         """
131
132         while True:
133             self.stop_polling.wait(15)
134             if self.stop_polling.is_set():
135                 break
136             with self.lock:
137                 keys = self.processes.keys()
138             if not keys:
139                 continue
140
141             if self.work_api == "containers":
142                 table = self.poll_api.containers()
143             elif self.work_api == "jobs":
144                 table = self.poll_api.jobs()
145
146             try:
147                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
148             except Exception as e:
149                 logger.warn("Error checking states on API server: %s", e)
150                 continue
151
152             for p in proc_states["items"]:
153                 self.on_message({
154                     "object_uuid": p["uuid"],
155                     "event_type": "update",
156                     "properties": {
157                         "new_attributes": p
158                     }
159                 })
160
161     def get_uploaded(self):
162         return self.uploaded.copy()
163
164     def add_uploaded(self, src, pair):
165         self.uploaded[src] = pair
166
167     def check_writable(self, obj):
168         if isinstance(obj, dict):
169             if obj.get("writable"):
170                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
171             for v in obj.itervalues():
172                 self.check_writable(v)
173         if isinstance(obj, list):
174             for v in obj:
175                 self.check_writable(v)
176
177     def make_output_collection(self, name, outputObj):
178         outputObj = copy.deepcopy(outputObj)
179
180         files = []
181         def capture(fileobj):
182             files.append(fileobj)
183
184         adjustDirObjs(outputObj, capture)
185         adjustFileObjs(outputObj, capture)
186
187         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
188
189         final = arvados.collection.Collection(api_client=self.api,
190                                               keep_client=self.keep_client,
191                                               num_retries=self.num_retries)
192
193         srccollections = {}
194         for k,v in generatemapper.items():
195             sp = k.split("/")
196             srccollection = sp[0][5:]
197             if srccollection not in srccollections:
198                 srccollections[srccollection] = arvados.collection.CollectionReader(
199                     srccollection,
200                     api_client=self.api,
201                     keep_client=self.keep_client,
202                     num_retries=self.num_retries)
203             reader = srccollections[srccollection]
204             try:
205                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
206                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
207             except IOError as e:
208                 logger.warn("While preparing output collection: %s", e)
209
210         def rewrite(fileobj):
211             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
212             for k in ("basename", "size", "listing"):
213                 if k in fileobj:
214                     del fileobj[k]
215
216         adjustDirObjs(outputObj, rewrite)
217         adjustFileObjs(outputObj, rewrite)
218
219         with final.open("cwl.output.json", "w") as f:
220             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
221
222         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
223
224         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
225                     final.api_response()["name"],
226                     final.manifest_locator())
227
228         self.final_output_collection = final
229
230     def arv_executor(self, tool, job_order, **kwargs):
231         self.debug = kwargs.get("debug")
232
233         tool.visit(self.check_writable)
234
235         useruuid = self.api.users().current().execute()["uuid"]
236         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
237         self.pipeline = None
238         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
239                                                                  api_client=self.api,
240                                                                  keep_client=self.keep_client)
241         self.fs_access = make_fs_access(kwargs["basedir"])
242
243         if kwargs.get("create_template"):
244             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
245             tmpl.save()
246             # cwltool.main will write our return value to stdout.
247             return tmpl.uuid
248
249         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
250             return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
251
252         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
253
254         kwargs["make_fs_access"] = make_fs_access
255         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
256         kwargs["use_container"] = True
257         kwargs["tmpdir_prefix"] = "tmp"
258         kwargs["on_error"] = "continue"
259         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
260
261         if self.work_api == "containers":
262             kwargs["outdir"] = "/var/spool/cwl"
263             kwargs["docker_outdir"] = "/var/spool/cwl"
264             kwargs["tmpdir"] = "/tmp"
265             kwargs["docker_tmpdir"] = "/tmp"
266         elif self.work_api == "jobs":
267             kwargs["outdir"] = "$(task.outdir)"
268             kwargs["docker_outdir"] = "$(task.outdir)"
269             kwargs["tmpdir"] = "$(task.tmpdir)"
270
271         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
272
273         runnerjob = None
274         if kwargs.get("submit"):
275             if self.work_api == "containers":
276                 if tool.tool["class"] == "CommandLineTool":
277                     runnerjob = tool.job(job_order,
278                                          self.output_callback,
279                                          **kwargs).next()
280                 else:
281                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
282             else:
283                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
284
285         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
286             # Create pipeline for local run
287             self.pipeline = self.api.pipeline_instances().create(
288                 body={
289                     "owner_uuid": self.project_uuid,
290                     "name": shortname(tool.tool["id"]),
291                     "components": {},
292                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
293             logger.info("Pipeline instance %s", self.pipeline["uuid"])
294
295         if runnerjob and not kwargs.get("wait"):
296             runnerjob.run(wait=kwargs.get("wait"))
297             return runnerjob.uuid
298
299         self.poll_api = arvados.api('v1')
300         self.polling_thread = threading.Thread(target=self.poll_states)
301         self.polling_thread.start()
302
303         if runnerjob:
304             jobiter = iter((runnerjob,))
305         else:
306             if "cwl_runner_job" in kwargs:
307                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
308             jobiter = tool.job(job_order,
309                                self.output_callback,
310                                **kwargs)
311
312         try:
313             self.cond.acquire()
314             # Will continue to hold the lock for the duration of this code
315             # except when in cond.wait(), at which point on_message can update
316             # job state and process output callbacks.
317
318             loopperf = Perf(metrics, "jobiter")
319             loopperf.__enter__()
320             for runnable in jobiter:
321                 loopperf.__exit__()
322                 if runnable:
323                     with Perf(metrics, "run"):
324                         runnable.run(**kwargs)
325                 else:
326                     if self.processes:
327                         self.cond.wait(1)
328                     else:
329                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
330                         break
331                 loopperf.__enter__()
332             loopperf.__exit__()
333
334             while self.processes:
335                 self.cond.wait(1)
336
337         except UnsupportedRequirement:
338             raise
339         except:
340             if sys.exc_info()[0] is KeyboardInterrupt:
341                 logger.error("Interrupted, marking pipeline as failed")
342             else:
343                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
344             if self.pipeline:
345                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
346                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
347             if runnerjob and runnerjob.uuid and self.work_api == "containers":
348                 self.api.container_requests().update(uuid=runnerjob.uuid,
349                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
350         finally:
351             self.cond.release()
352             self.stop_polling.set()
353             self.polling_thread.join()
354
355         if self.final_status == "UnsupportedRequirement":
356             raise UnsupportedRequirement("Check log for details.")
357
358         if self.final_status != "success":
359             raise WorkflowException("Workflow failed.")
360
361         if self.final_output is None:
362             raise WorkflowException("Workflow did not return a result.")
363
364         if kwargs.get("submit") and isinstance(runnerjob, Runner):
365             logger.info("Final output collection %s", runnerjob.final_output)
366         else:
367             if self.output_name is None:
368                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
369             self.make_output_collection(self.output_name, self.final_output)
370
371         if kwargs.get("compute_checksum"):
372             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
373             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
374
375         return self.final_output
376
377
378 def versionstring():
379     """Print version string of key packages for provenance and debugging."""
380
381     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
382     arvpkg = pkg_resources.require("arvados-python-client")
383     cwlpkg = pkg_resources.require("cwltool")
384
385     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
386                                     "arvados-python-client", arvpkg[0].version,
387                                     "cwltool", cwlpkg[0].version)
388
389
390 def arg_parser():  # type: () -> argparse.ArgumentParser
391     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
392
393     parser.add_argument("--basedir", type=str,
394                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
395     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
396                         help="Output directory, default current directory")
397
398     parser.add_argument("--eval-timeout",
399                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
400                         type=float,
401                         default=20)
402     parser.add_argument("--version", action="store_true", help="Print version and exit")
403
404     exgroup = parser.add_mutually_exclusive_group()
405     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
406     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
407     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
408
409     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
410
411     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
412
413     exgroup = parser.add_mutually_exclusive_group()
414     exgroup.add_argument("--enable-reuse", action="store_true",
415                         default=True, dest="enable_reuse",
416                         help="")
417     exgroup.add_argument("--disable-reuse", action="store_false",
418                         default=True, dest="enable_reuse",
419                         help="")
420
421     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
422     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
423     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
424                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
425                         default=False)
426
427     exgroup = parser.add_mutually_exclusive_group()
428     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
429                         default=True, dest="submit")
430     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
431                         default=True, dest="submit")
432     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
433     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
434     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
435
436     exgroup = parser.add_mutually_exclusive_group()
437     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
438                         default=True, dest="wait")
439     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
440                         default=True, dest="wait")
441
442     parser.add_argument("--api", type=str,
443                         default=None, dest="work_api",
444                         help="Select work submission API, one of 'jobs' or 'containers'.")
445
446     parser.add_argument("--compute-checksum", action="store_true", default=False,
447                         help="Compute checksum of contents while collecting outputs",
448                         dest="compute_checksum")
449
450     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
451     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
452
453     return parser
454
455 def add_arv_hints():
456     cache = {}
457     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
458     cache["http://arvados.org/cwl"] = res.read()
459     res.close()
460     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
461     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
462     for n in extnames.names:
463         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
464             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
465         document_loader.idx["http://arvados.org/cwl#"+n] = {}
466
467 def main(args, stdout, stderr, api_client=None, keep_client=None):
468     parser = arg_parser()
469
470     job_order_object = None
471     arvargs = parser.parse_args(args)
472     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
473         job_order_object = ({}, "")
474
475     add_arv_hints()
476
477     try:
478         if api_client is None:
479             api_client=arvados.api('v1', model=OrderedJsonModel())
480         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
481     except Exception as e:
482         logger.error(e)
483         return 1
484
485     if arvargs.debug:
486         logger.setLevel(logging.DEBUG)
487
488     if arvargs.quiet:
489         logger.setLevel(logging.WARN)
490         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
491
492     if arvargs.metrics:
493         metrics.setLevel(logging.DEBUG)
494         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
495
496     arvargs.conformance_test = None
497     arvargs.use_container = True
498
499     return cwltool.main.main(args=arvargs,
500                              stdout=stdout,
501                              stderr=stderr,
502                              executor=runner.arv_executor,
503                              makeTool=runner.arv_make_tool,
504                              versionfunc=versionstring,
505                              job_order_object=job_order_object,
506                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))