10172: Detect when running as crunch container or job and set output.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from. runner import Runner, upload_instance
28 from .arvtool import ArvadosCommandTool
29 from .arvworkflow import ArvadosWorkflow, upload_workflow
30 from .fsaccess import CollectionFsAccess
31 from .perf import Perf
32 from .pathmapper import FinalOutputPathMapper
33
34 from cwltool.pack import pack
35 from cwltool.process import shortname, UnsupportedRequirement, getListing
36 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
37 from cwltool.draft2tool import compute_checksums
38 from arvados.api import OrderedJsonModel
39
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42 logger.setLevel(logging.INFO)
43
44
45 class ArvCwlRunner(object):
46     """Execute a CWL tool or workflow, submit work (using either jobs or
47     containers API), wait for them to complete, and report output.
48
49     """
50
51     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
52         self.api = api_client
53         self.processes = {}
54         self.lock = threading.Lock()
55         self.cond = threading.Condition(self.lock)
56         self.final_output = None
57         self.final_status = None
58         self.uploaded = {}
59         self.num_retries = 4
60         self.uuid = None
61         self.work_api = work_api
62         self.stop_polling = threading.Event()
63         self.poll_api = None
64         self.pipeline = None
65         self.final_output_collection = None
66         self.output_name = output_name
67         self.project_uuid = None
68
69         if keep_client is not None:
70             self.keep_client = keep_client
71         else:
72             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
73
74         if self.work_api is None:
75             # todo: autodetect API to use.
76             self.work_api = "jobs"
77
78         if self.work_api not in ("containers", "jobs"):
79             raise Exception("Unsupported API '%s'" % self.work_api)
80
81     def arv_make_tool(self, toolpath_object, **kwargs):
82         kwargs["work_api"] = self.work_api
83         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
84             return ArvadosCommandTool(self, toolpath_object, **kwargs)
85         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
86             return ArvadosWorkflow(self, toolpath_object, **kwargs)
87         else:
88             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
89
90     def output_callback(self, out, processStatus):
91         if processStatus == "success":
92             logger.info("Overall process status is %s", processStatus)
93             if self.pipeline:
94                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
95                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
96         else:
97             logger.warn("Overall process status is %s", processStatus)
98             if self.pipeline:
99                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
100                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
101         self.final_status = processStatus
102         self.final_output = out
103
104     def on_message(self, event):
105         if "object_uuid" in event:
106             if event["object_uuid"] in self.processes and event["event_type"] == "update":
107                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
108                     uuid = event["object_uuid"]
109                     with self.lock:
110                         j = self.processes[uuid]
111                         logger.info("Job %s (%s) is Running", j.name, uuid)
112                         j.running = True
113                         j.update_pipeline_component(event["properties"]["new_attributes"])
114                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
115                     uuid = event["object_uuid"]
116                     try:
117                         self.cond.acquire()
118                         j = self.processes[uuid]
119                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
120                         with Perf(metrics, "done %s" % j.name):
121                             j.done(event["properties"]["new_attributes"])
122                         self.cond.notify()
123                     finally:
124                         self.cond.release()
125
126     def poll_states(self):
127         """Poll status of jobs or containers listed in the processes dict.
128
129         Runs in a separate thread.
130         """
131
132         while True:
133             self.stop_polling.wait(15)
134             if self.stop_polling.is_set():
135                 break
136             with self.lock:
137                 keys = self.processes.keys()
138             if not keys:
139                 continue
140
141             if self.work_api == "containers":
142                 table = self.poll_api.containers()
143             elif self.work_api == "jobs":
144                 table = self.poll_api.jobs()
145
146             try:
147                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
148             except Exception as e:
149                 logger.warn("Error checking states on API server: %s", e)
150                 continue
151
152             for p in proc_states["items"]:
153                 self.on_message({
154                     "object_uuid": p["uuid"],
155                     "event_type": "update",
156                     "properties": {
157                         "new_attributes": p
158                     }
159                 })
160
161     def get_uploaded(self):
162         return self.uploaded.copy()
163
164     def add_uploaded(self, src, pair):
165         self.uploaded[src] = pair
166
167     def check_writable(self, obj):
168         if isinstance(obj, dict):
169             if obj.get("writable"):
170                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
171             for v in obj.itervalues():
172                 self.check_writable(v)
173         if isinstance(obj, list):
174             for v in obj:
175                 self.check_writable(v)
176
177     def make_output_collection(self, name, outputObj):
178         outputObj = copy.deepcopy(outputObj)
179
180         files = []
181         def capture(fileobj):
182             files.append(fileobj)
183
184         adjustDirObjs(outputObj, capture)
185         adjustFileObjs(outputObj, capture)
186
187         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
188
189         final = arvados.collection.Collection(api_client=self.api,
190                                               keep_client=self.keep_client,
191                                               num_retries=self.num_retries)
192
193         srccollections = {}
194         for k,v in generatemapper.items():
195             sp = k.split("/")
196             srccollection = sp[0][5:]
197             if srccollection not in srccollections:
198                 srccollections[srccollection] = arvados.collection.CollectionReader(
199                     srccollection,
200                     api_client=self.api,
201                     keep_client=self.keep_client,
202                     num_retries=self.num_retries)
203             reader = srccollections[srccollection]
204             try:
205                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
206                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
207             except IOError as e:
208                 logger.warn("While preparing output collection: %s", e)
209
210         def rewrite(fileobj):
211             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
212             for k in ("basename", "size", "listing"):
213                 if k in fileobj:
214                     del fileobj[k]
215
216         adjustDirObjs(outputObj, rewrite)
217         adjustFileObjs(outputObj, rewrite)
218
219         with final.open("cwl.output.json", "w") as f:
220             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
221
222         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
223
224         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
225                     final.api_response()["name"],
226                     final.manifest_locator())
227
228         self.final_output_collection = final
229
230     def set_crunch_output(self):
231         if self.work_api == "containers":
232             try:
233                 current = self.api.containers().current().execute(num_retries=self.num_retries)
234                 self.api.containers().update(uuid=current['uuid'],
235                                              body={
236                                                  'output': self.final_output_collection.portable_data_hash(),
237                                              }).execute(num_retries=self.num_retries)
238             except Exception as e:
239                 logger.info("Setting container output: %s", e)
240         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
241             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
242                                    body={
243                                        'output': self.final_output_collection.portable_data_hash(),
244                                        'success': self.final_status == "success",
245                                        'progress':1.0
246                                    }).execute(num_retries=self.num_retries)
247
248     def arv_executor(self, tool, job_order, **kwargs):
249         self.debug = kwargs.get("debug")
250
251         tool.visit(self.check_writable)
252
253         useruuid = self.api.users().current().execute()["uuid"]
254         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
255         self.pipeline = None
256         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
257                                                                  api_client=self.api,
258                                                                  keep_client=self.keep_client)
259         self.fs_access = make_fs_access(kwargs["basedir"])
260
261         if kwargs.get("create_template"):
262             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
263             tmpl.save()
264             # cwltool.main will write our return value to stdout.
265             return tmpl.uuid
266
267         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
268             return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
269
270         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
271
272         kwargs["make_fs_access"] = make_fs_access
273         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
274         kwargs["use_container"] = True
275         kwargs["tmpdir_prefix"] = "tmp"
276         kwargs["on_error"] = "continue"
277         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
278
279         if self.work_api == "containers":
280             kwargs["outdir"] = "/var/spool/cwl"
281             kwargs["docker_outdir"] = "/var/spool/cwl"
282             kwargs["tmpdir"] = "/tmp"
283             kwargs["docker_tmpdir"] = "/tmp"
284         elif self.work_api == "jobs":
285             kwargs["outdir"] = "$(task.outdir)"
286             kwargs["docker_outdir"] = "$(task.outdir)"
287             kwargs["tmpdir"] = "$(task.tmpdir)"
288
289         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
290
291         runnerjob = None
292         if kwargs.get("submit"):
293             if self.work_api == "containers":
294                 if tool.tool["class"] == "CommandLineTool":
295                     runnerjob = tool.job(job_order,
296                                          self.output_callback,
297                                          **kwargs).next()
298                 else:
299                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
300             else:
301                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
302
303         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
304             # Create pipeline for local run
305             self.pipeline = self.api.pipeline_instances().create(
306                 body={
307                     "owner_uuid": self.project_uuid,
308                     "name": shortname(tool.tool["id"]),
309                     "components": {},
310                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
311             logger.info("Pipeline instance %s", self.pipeline["uuid"])
312
313         if runnerjob and not kwargs.get("wait"):
314             runnerjob.run(wait=kwargs.get("wait"))
315             return runnerjob.uuid
316
317         self.poll_api = arvados.api('v1')
318         self.polling_thread = threading.Thread(target=self.poll_states)
319         self.polling_thread.start()
320
321         if runnerjob:
322             jobiter = iter((runnerjob,))
323         else:
324             if "cwl_runner_job" in kwargs:
325                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
326             jobiter = tool.job(job_order,
327                                self.output_callback,
328                                **kwargs)
329
330         try:
331             self.cond.acquire()
332             # Will continue to hold the lock for the duration of this code
333             # except when in cond.wait(), at which point on_message can update
334             # job state and process output callbacks.
335
336             loopperf = Perf(metrics, "jobiter")
337             loopperf.__enter__()
338             for runnable in jobiter:
339                 loopperf.__exit__()
340                 if runnable:
341                     with Perf(metrics, "run"):
342                         runnable.run(**kwargs)
343                 else:
344                     if self.processes:
345                         self.cond.wait(1)
346                     else:
347                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
348                         break
349                 loopperf.__enter__()
350             loopperf.__exit__()
351
352             while self.processes:
353                 self.cond.wait(1)
354
355         except UnsupportedRequirement:
356             raise
357         except:
358             if sys.exc_info()[0] is KeyboardInterrupt:
359                 logger.error("Interrupted, marking pipeline as failed")
360             else:
361                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
362             if self.pipeline:
363                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
364                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
365             if runnerjob and runnerjob.uuid and self.work_api == "containers":
366                 self.api.container_requests().update(uuid=runnerjob.uuid,
367                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
368         finally:
369             self.cond.release()
370             self.stop_polling.set()
371             self.polling_thread.join()
372
373         if self.final_status == "UnsupportedRequirement":
374             raise UnsupportedRequirement("Check log for details.")
375
376         if self.final_output is None:
377             raise WorkflowException("Workflow did not return a result.")
378
379         if kwargs.get("submit") and isinstance(runnerjob, Runner):
380             logger.info("Final output collection %s", runnerjob.final_output)
381         else:
382             if self.output_name is None:
383                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
384             self.make_output_collection(self.output_name, self.final_output)
385             self.set_crunch_output()
386
387         if self.final_status != "success":
388             raise WorkflowException("Workflow failed.")
389
390         if kwargs.get("compute_checksum"):
391             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
392             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
393
394         return self.final_output
395
396
397 def versionstring():
398     """Print version string of key packages for provenance and debugging."""
399
400     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
401     arvpkg = pkg_resources.require("arvados-python-client")
402     cwlpkg = pkg_resources.require("cwltool")
403
404     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
405                                     "arvados-python-client", arvpkg[0].version,
406                                     "cwltool", cwlpkg[0].version)
407
408
409 def arg_parser():  # type: () -> argparse.ArgumentParser
410     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
411
412     parser.add_argument("--basedir", type=str,
413                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
414     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
415                         help="Output directory, default current directory")
416
417     parser.add_argument("--eval-timeout",
418                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
419                         type=float,
420                         default=20)
421     parser.add_argument("--version", action="store_true", help="Print version and exit")
422
423     exgroup = parser.add_mutually_exclusive_group()
424     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
425     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
426     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
427
428     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
429
430     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
431
432     exgroup = parser.add_mutually_exclusive_group()
433     exgroup.add_argument("--enable-reuse", action="store_true",
434                         default=True, dest="enable_reuse",
435                         help="")
436     exgroup.add_argument("--disable-reuse", action="store_false",
437                         default=True, dest="enable_reuse",
438                         help="")
439
440     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
441     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
442     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
443                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
444                         default=False)
445
446     exgroup = parser.add_mutually_exclusive_group()
447     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
448                         default=True, dest="submit")
449     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
450                         default=True, dest="submit")
451     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
452     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
453     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
454
455     exgroup = parser.add_mutually_exclusive_group()
456     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
457                         default=True, dest="wait")
458     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
459                         default=True, dest="wait")
460
461     parser.add_argument("--api", type=str,
462                         default=None, dest="work_api",
463                         help="Select work submission API, one of 'jobs' or 'containers'.")
464
465     parser.add_argument("--compute-checksum", action="store_true", default=False,
466                         help="Compute checksum of contents while collecting outputs",
467                         dest="compute_checksum")
468
469     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
470     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
471
472     return parser
473
474 def add_arv_hints():
475     cache = {}
476     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
477     cache["http://arvados.org/cwl"] = res.read()
478     res.close()
479     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
480     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
481     for n in extnames.names:
482         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
483             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
484         document_loader.idx["http://arvados.org/cwl#"+n] = {}
485
486 def main(args, stdout, stderr, api_client=None, keep_client=None):
487     parser = arg_parser()
488
489     job_order_object = None
490     arvargs = parser.parse_args(args)
491     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
492         job_order_object = ({}, "")
493
494     add_arv_hints()
495
496     try:
497         if api_client is None:
498             api_client=arvados.api('v1', model=OrderedJsonModel())
499         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
500     except Exception as e:
501         logger.error(e)
502         return 1
503
504     if arvargs.debug:
505         logger.setLevel(logging.DEBUG)
506
507     if arvargs.quiet:
508         logger.setLevel(logging.WARN)
509         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
510
511     if arvargs.metrics:
512         metrics.setLevel(logging.DEBUG)
513         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
514
515     arvargs.conformance_test = None
516     arvargs.use_container = True
517
518     return cwltool.main.main(args=arvargs,
519                              stdout=stdout,
520                              stderr=stderr,
521                              executor=runner.arv_executor,
522                              makeTool=runner.arv_make_tool,
523                              versionfunc=versionstring,
524                              job_order_object=job_order_object,
525                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))