10165: Always make an output collection when a workflow completes.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from .arvtool import ArvadosCommandTool
28 from .arvworkflow import ArvadosWorkflow, upload_workflow
29 from .fsaccess import CollectionFsAccess
30 from .perf import Perf
31 from .pathmapper import InitialWorkDirPathMapper
32
33 from cwltool.pack import pack
34 from cwltool.process import shortname, UnsupportedRequirement
35 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
36 from cwltool.draft2tool import compute_checksums
37 from arvados.api import OrderedJsonModel
38
39 logger = logging.getLogger('arvados.cwl-runner')
40 metrics = logging.getLogger('arvados.cwl-runner.metrics')
41 logger.setLevel(logging.INFO)
42
43
44 class ArvCwlRunner(object):
45     """Execute a CWL tool or workflow, submit work (using either jobs or
46     containers API), wait for them to complete, and report output.
47
48     """
49
50     def __init__(self, api_client, work_api=None):
51         self.api = api_client
52         self.processes = {}
53         self.lock = threading.Lock()
54         self.cond = threading.Condition(self.lock)
55         self.final_output = None
56         self.final_status = None
57         self.uploaded = {}
58         self.num_retries = 4
59         self.uuid = None
60         self.work_api = work_api
61         self.stop_polling = threading.Event()
62         self.poll_api = None
63         self.pipeline = None
64         self.final_output_collection = None
65
66         if self.work_api is None:
67             # todo: autodetect API to use.
68             self.work_api = "jobs"
69
70         if self.work_api not in ("containers", "jobs"):
71             raise Exception("Unsupported API '%s'" % self.work_api)
72
73     def arv_make_tool(self, toolpath_object, **kwargs):
74         kwargs["work_api"] = self.work_api
75         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
76             return ArvadosCommandTool(self, toolpath_object, **kwargs)
77         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
78             return ArvadosWorkflow(self, toolpath_object, **kwargs)
79         else:
80             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
81
82     def output_callback(self, out, processStatus):
83         if processStatus == "success":
84             logger.info("Overall process status is %s", processStatus)
85             if self.pipeline:
86                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
87                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
88         else:
89             logger.warn("Overall process status is %s", processStatus)
90             if self.pipeline:
91                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
92                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
93         self.final_status = processStatus
94         self.final_output = out
95
96     def on_message(self, event):
97         if "object_uuid" in event:
98             if event["object_uuid"] in self.processes and event["event_type"] == "update":
99                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
100                     uuid = event["object_uuid"]
101                     with self.lock:
102                         j = self.processes[uuid]
103                         logger.info("Job %s (%s) is Running", j.name, uuid)
104                         j.running = True
105                         j.update_pipeline_component(event["properties"]["new_attributes"])
106                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
107                     uuid = event["object_uuid"]
108                     try:
109                         self.cond.acquire()
110                         j = self.processes[uuid]
111                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
112                         with Perf(metrics, "done %s" % j.name):
113                             j.done(event["properties"]["new_attributes"])
114                         self.cond.notify()
115                     finally:
116                         self.cond.release()
117
118     def poll_states(self):
119         """Poll status of jobs or containers listed in the processes dict.
120
121         Runs in a separate thread.
122         """
123
124         while True:
125             self.stop_polling.wait(15)
126             if self.stop_polling.is_set():
127                 break
128             with self.lock:
129                 keys = self.processes.keys()
130             if not keys:
131                 continue
132
133             if self.work_api == "containers":
134                 table = self.poll_api.containers()
135             elif self.work_api == "jobs":
136                 table = self.poll_api.jobs()
137
138             try:
139                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
140             except Exception as e:
141                 logger.warn("Error checking states on API server: %s", e)
142                 continue
143
144             for p in proc_states["items"]:
145                 self.on_message({
146                     "object_uuid": p["uuid"],
147                     "event_type": "update",
148                     "properties": {
149                         "new_attributes": p
150                     }
151                 })
152
153     def get_uploaded(self):
154         return self.uploaded.copy()
155
156     def add_uploaded(self, src, pair):
157         self.uploaded[src] = pair
158
159     def check_writable(self, obj):
160         if isinstance(obj, dict):
161             if obj.get("writable"):
162                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
163             for v in obj.itervalues():
164                 self.check_writable(v)
165         if isinstance(obj, list):
166             for v in obj:
167                 self.check_writable(v)
168
169     def make_output_collection(self, name, outputObj):
170         outputObj = copy.deepcopy(outputObj)
171
172         files = []
173         def capture(fileobj):
174             files.append(fileobj)
175
176         adjustDirObjs(outputObj, capture)
177         adjustFileObjs(outputObj, capture)
178
179         generatemapper = InitialWorkDirPathMapper(files, "", "",
180                                                   separateDirs=False)
181
182         final = arvados.collection.Collection()
183
184         srccollections = {}
185         for k,v in generatemapper.items():
186             sp = k.split("/")
187             srccollection = sp[0][5:]
188             if srccollection not in srccollections:
189                 srccollections[srccollection] = arvados.collection.CollectionReader(srccollection)
190             reader = srccollections[srccollection]
191             try:
192                 final.copy("/".join(sp[1:]), v.target, source_collection=reader, overwrite=False)
193             except IOError as e:
194                 logger.warn("While preparing output collection: %s", e)
195
196         def rewrite(fileobj):
197             fileobj["location"] = generatemapper.mapper(fileobj["location"])
198
199         adjustDirObjs(outputObj, capture)
200         adjustFileObjs(outputObj, capture)
201
202         with final.open("cwl.output.json", "w") as f:
203             json.dump(outputObj, f, indent=4)
204
205         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
206
207         logger.info("Final output collection %s (%s)", final.portable_data_hash(), final.manifest_locator())
208
209         self.final_output_collection = final
210
211     def arv_executor(self, tool, job_order, **kwargs):
212         self.debug = kwargs.get("debug")
213
214         tool.visit(self.check_writable)
215
216         useruuid = self.api.users().current().execute()["uuid"]
217         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
218         self.pipeline = None
219         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
220         self.fs_access = make_fs_access(kwargs["basedir"])
221
222         if kwargs.get("create_template"):
223             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
224             tmpl.save()
225             # cwltool.main will write our return value to stdout.
226             return tmpl.uuid
227
228         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
229             return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
230
231         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
232
233         kwargs["make_fs_access"] = make_fs_access
234         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
235         kwargs["use_container"] = True
236         kwargs["tmpdir_prefix"] = "tmp"
237         kwargs["on_error"] = "continue"
238         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
239
240         if self.work_api == "containers":
241             kwargs["outdir"] = "/var/spool/cwl"
242             kwargs["docker_outdir"] = "/var/spool/cwl"
243             kwargs["tmpdir"] = "/tmp"
244             kwargs["docker_tmpdir"] = "/tmp"
245         elif self.work_api == "jobs":
246             kwargs["outdir"] = "$(task.outdir)"
247             kwargs["docker_outdir"] = "$(task.outdir)"
248             kwargs["tmpdir"] = "$(task.tmpdir)"
249
250         runnerjob = None
251         if kwargs.get("submit"):
252             if self.work_api == "containers":
253                 if tool.tool["class"] == "CommandLineTool":
254                     runnerjob = tool.job(job_order,
255                                          self.output_callback,
256                                          **kwargs).next()
257                 else:
258                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
259             else:
260                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
261
262         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
263             # Create pipeline for local run
264             self.pipeline = self.api.pipeline_instances().create(
265                 body={
266                     "owner_uuid": self.project_uuid,
267                     "name": shortname(tool.tool["id"]),
268                     "components": {},
269                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
270             logger.info("Pipeline instance %s", self.pipeline["uuid"])
271
272         if runnerjob and not kwargs.get("wait"):
273             runnerjob.run()
274             return runnerjob.uuid
275
276         self.poll_api = arvados.api('v1')
277         self.polling_thread = threading.Thread(target=self.poll_states)
278         self.polling_thread.start()
279
280         if runnerjob:
281             jobiter = iter((runnerjob,))
282         else:
283             if "cwl_runner_job" in kwargs:
284                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
285             jobiter = tool.job(job_order,
286                                self.output_callback,
287                                **kwargs)
288
289         try:
290             self.cond.acquire()
291             # Will continue to hold the lock for the duration of this code
292             # except when in cond.wait(), at which point on_message can update
293             # job state and process output callbacks.
294
295             loopperf = Perf(metrics, "jobiter")
296             loopperf.__enter__()
297             for runnable in jobiter:
298                 loopperf.__exit__()
299                 if runnable:
300                     with Perf(metrics, "run"):
301                         runnable.run(**kwargs)
302                 else:
303                     if self.processes:
304                         self.cond.wait(1)
305                     else:
306                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
307                         break
308                 loopperf.__enter__()
309             loopperf.__exit__()
310
311             while self.processes:
312                 self.cond.wait(1)
313
314         except UnsupportedRequirement:
315             raise
316         except:
317             if sys.exc_info()[0] is KeyboardInterrupt:
318                 logger.error("Interrupted, marking pipeline as failed")
319             else:
320                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
321             if self.pipeline:
322                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
323                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
324             if runnerjob and runnerjob.uuid and self.work_api == "containers":
325                 self.api.container_requests().update(uuid=runnerjob.uuid,
326                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
327         finally:
328             self.cond.release()
329             self.stop_polling.set()
330             self.polling_thread.join()
331
332         if self.final_status == "UnsupportedRequirement":
333             raise UnsupportedRequirement("Check log for details.")
334
335         if self.final_status != "success":
336             raise WorkflowException("Workflow failed.")
337
338         if self.final_output is None:
339             raise WorkflowException("Workflow did not return a result.")
340
341         if kwargs.get("compute_checksum"):
342             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
343
344         if kwargs.get("submit"):
345             logger.info("Final output collection %s", runnerjob.final_output)
346         else:
347             self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
348                                         self.final_output)
349
350         return self.final_output
351
352
353 def versionstring():
354     """Print version string of key packages for provenance and debugging."""
355
356     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
357     arvpkg = pkg_resources.require("arvados-python-client")
358     cwlpkg = pkg_resources.require("cwltool")
359
360     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
361                                     "arvados-python-client", arvpkg[0].version,
362                                     "cwltool", cwlpkg[0].version)
363
364
365 def arg_parser():  # type: () -> argparse.ArgumentParser
366     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
367
368     parser.add_argument("--basedir", type=str,
369                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
370     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
371                         help="Output directory, default current directory")
372
373     parser.add_argument("--eval-timeout",
374                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
375                         type=float,
376                         default=20)
377     parser.add_argument("--version", action="store_true", help="Print version and exit")
378
379     exgroup = parser.add_mutually_exclusive_group()
380     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
381     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
382     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
383
384     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
385
386     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
387
388     exgroup = parser.add_mutually_exclusive_group()
389     exgroup.add_argument("--enable-reuse", action="store_true",
390                         default=True, dest="enable_reuse",
391                         help="")
392     exgroup.add_argument("--disable-reuse", action="store_false",
393                         default=True, dest="enable_reuse",
394                         help="")
395
396     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
397     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
398                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
399                         default=False)
400
401     exgroup = parser.add_mutually_exclusive_group()
402     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
403                         default=True, dest="submit")
404     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
405                         default=True, dest="submit")
406     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
407     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
408     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
409
410     exgroup = parser.add_mutually_exclusive_group()
411     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
412                         default=True, dest="wait")
413     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
414                         default=True, dest="wait")
415
416     parser.add_argument("--api", type=str,
417                         default=None, dest="work_api",
418                         help="Select work submission API, one of 'jobs' or 'containers'.")
419
420     parser.add_argument("--compute-checksum", action="store_true", default=False,
421                         help="Compute checksum of contents while collecting outputs",
422                         dest="compute_checksum")
423
424     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
425     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
426
427     return parser
428
429 def add_arv_hints():
430     cache = {}
431     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
432     cache["http://arvados.org/cwl"] = res.read()
433     res.close()
434     _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
435     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
436     for n in extnames.names:
437         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
438             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
439
440 def main(args, stdout, stderr, api_client=None):
441     parser = arg_parser()
442
443     job_order_object = None
444     arvargs = parser.parse_args(args)
445     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
446         job_order_object = ({}, "")
447
448     add_arv_hints()
449
450     try:
451         if api_client is None:
452             api_client=arvados.api('v1', model=OrderedJsonModel())
453         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
454     except Exception as e:
455         logger.error(e)
456         return 1
457
458     if arvargs.debug:
459         logger.setLevel(logging.DEBUG)
460
461     if arvargs.quiet:
462         logger.setLevel(logging.WARN)
463         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
464
465     if arvargs.metrics:
466         metrics.setLevel(logging.DEBUG)
467         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
468
469     arvargs.conformance_test = None
470     arvargs.use_container = True
471
472     return cwltool.main.main(args=arvargs,
473                              stdout=stdout,
474                              stderr=stderr,
475                              executor=runner.arv_executor,
476                              makeTool=runner.arv_make_tool,
477                              versionfunc=versionstring,
478                              job_order_object=job_order_object,
479                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))