Merge branch '9307-cwl-use-tmp-output' closes #9307 closes #9308
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from. runner import Runner
28 from .arvtool import ArvadosCommandTool
29 from .arvworkflow import ArvadosWorkflow, upload_workflow
30 from .fsaccess import CollectionFsAccess
31 from .perf import Perf
32 from .pathmapper import FinalOutputPathMapper
33
34 from cwltool.pack import pack
35 from cwltool.process import shortname, UnsupportedRequirement
36 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
37 from cwltool.draft2tool import compute_checksums
38 from arvados.api import OrderedJsonModel
39
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42 logger.setLevel(logging.INFO)
43
44
45 class ArvCwlRunner(object):
46     """Execute a CWL tool or workflow, submit work (using either jobs or
47     containers API), wait for them to complete, and report output.
48
49     """
50
51     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
52         self.api = api_client
53         self.processes = {}
54         self.lock = threading.Lock()
55         self.cond = threading.Condition(self.lock)
56         self.final_output = None
57         self.final_status = None
58         self.uploaded = {}
59         self.num_retries = 4
60         self.uuid = None
61         self.work_api = work_api
62         self.stop_polling = threading.Event()
63         self.poll_api = None
64         self.pipeline = None
65         self.final_output_collection = None
66         self.output_name = output_name
67         if keep_client is not None:
68             self.keep_client = keep_client
69         else:
70             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
71
72         if self.work_api is None:
73             # todo: autodetect API to use.
74             self.work_api = "jobs"
75
76         if self.work_api not in ("containers", "jobs"):
77             raise Exception("Unsupported API '%s'" % self.work_api)
78
79     def arv_make_tool(self, toolpath_object, **kwargs):
80         kwargs["work_api"] = self.work_api
81         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
82             return ArvadosCommandTool(self, toolpath_object, **kwargs)
83         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
84             return ArvadosWorkflow(self, toolpath_object, **kwargs)
85         else:
86             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
87
88     def output_callback(self, out, processStatus):
89         if processStatus == "success":
90             logger.info("Overall process status is %s", processStatus)
91             if self.pipeline:
92                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
93                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
94         else:
95             logger.warn("Overall process status is %s", processStatus)
96             if self.pipeline:
97                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
98                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
99         self.final_status = processStatus
100         self.final_output = out
101
102     def on_message(self, event):
103         if "object_uuid" in event:
104             if event["object_uuid"] in self.processes and event["event_type"] == "update":
105                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
106                     uuid = event["object_uuid"]
107                     with self.lock:
108                         j = self.processes[uuid]
109                         logger.info("Job %s (%s) is Running", j.name, uuid)
110                         j.running = True
111                         j.update_pipeline_component(event["properties"]["new_attributes"])
112                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
113                     uuid = event["object_uuid"]
114                     try:
115                         self.cond.acquire()
116                         j = self.processes[uuid]
117                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
118                         with Perf(metrics, "done %s" % j.name):
119                             j.done(event["properties"]["new_attributes"])
120                         self.cond.notify()
121                     finally:
122                         self.cond.release()
123
124     def poll_states(self):
125         """Poll status of jobs or containers listed in the processes dict.
126
127         Runs in a separate thread.
128         """
129
130         while True:
131             self.stop_polling.wait(15)
132             if self.stop_polling.is_set():
133                 break
134             with self.lock:
135                 keys = self.processes.keys()
136             if not keys:
137                 continue
138
139             if self.work_api == "containers":
140                 table = self.poll_api.containers()
141             elif self.work_api == "jobs":
142                 table = self.poll_api.jobs()
143
144             try:
145                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
146             except Exception as e:
147                 logger.warn("Error checking states on API server: %s", e)
148                 continue
149
150             for p in proc_states["items"]:
151                 self.on_message({
152                     "object_uuid": p["uuid"],
153                     "event_type": "update",
154                     "properties": {
155                         "new_attributes": p
156                     }
157                 })
158
159     def get_uploaded(self):
160         return self.uploaded.copy()
161
162     def add_uploaded(self, src, pair):
163         self.uploaded[src] = pair
164
165     def check_writable(self, obj):
166         if isinstance(obj, dict):
167             if obj.get("writable"):
168                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
169             for v in obj.itervalues():
170                 self.check_writable(v)
171         if isinstance(obj, list):
172             for v in obj:
173                 self.check_writable(v)
174
175     def make_output_collection(self, name, outputObj):
176         outputObj = copy.deepcopy(outputObj)
177
178         files = []
179         def capture(fileobj):
180             files.append(fileobj)
181
182         adjustDirObjs(outputObj, capture)
183         adjustFileObjs(outputObj, capture)
184
185         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
186
187         final = arvados.collection.Collection(api_client=self.api,
188                                               keep_client=self.keep_client,
189                                               num_retries=self.num_retries)
190
191         srccollections = {}
192         for k,v in generatemapper.items():
193             sp = k.split("/")
194             srccollection = sp[0][5:]
195             if srccollection not in srccollections:
196                 srccollections[srccollection] = arvados.collection.CollectionReader(
197                     srccollection,
198                     api_client=self.api,
199                     keep_client=self.keep_client,
200                     num_retries=self.num_retries)
201             reader = srccollections[srccollection]
202             try:
203                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
204                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
205             except IOError as e:
206                 logger.warn("While preparing output collection: %s", e)
207
208         def rewrite(fileobj):
209             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
210             for k in ("basename", "size", "listing"):
211                 if k in fileobj:
212                     del fileobj[k]
213
214         adjustDirObjs(outputObj, rewrite)
215         adjustFileObjs(outputObj, rewrite)
216
217         with final.open("cwl.output.json", "w") as f:
218             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
219
220         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
221
222         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
223                     final.api_response()["name"],
224                     final.manifest_locator())
225
226         self.final_output_collection = final
227
228     def arv_executor(self, tool, job_order, **kwargs):
229         self.debug = kwargs.get("debug")
230
231         tool.visit(self.check_writable)
232
233         useruuid = self.api.users().current().execute()["uuid"]
234         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
235         self.pipeline = None
236         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
237                                                                  api_client=self.api,
238                                                                  keep_client=self.keep_client)
239         self.fs_access = make_fs_access(kwargs["basedir"])
240
241         if kwargs.get("create_template"):
242             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
243             tmpl.save()
244             # cwltool.main will write our return value to stdout.
245             return tmpl.uuid
246
247         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
248             return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
249
250         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
251
252         kwargs["make_fs_access"] = make_fs_access
253         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
254         kwargs["use_container"] = True
255         kwargs["tmpdir_prefix"] = "tmp"
256         kwargs["on_error"] = "continue"
257         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
258
259         if self.work_api == "containers":
260             kwargs["outdir"] = "/var/spool/cwl"
261             kwargs["docker_outdir"] = "/var/spool/cwl"
262             kwargs["tmpdir"] = "/tmp"
263             kwargs["docker_tmpdir"] = "/tmp"
264         elif self.work_api == "jobs":
265             kwargs["outdir"] = "$(task.outdir)"
266             kwargs["docker_outdir"] = "$(task.outdir)"
267             kwargs["tmpdir"] = "$(task.tmpdir)"
268
269         runnerjob = None
270         if kwargs.get("submit"):
271             if self.work_api == "containers":
272                 if tool.tool["class"] == "CommandLineTool":
273                     runnerjob = tool.job(job_order,
274                                          self.output_callback,
275                                          **kwargs).next()
276                 else:
277                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
278             else:
279                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
280
281         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
282             # Create pipeline for local run
283             self.pipeline = self.api.pipeline_instances().create(
284                 body={
285                     "owner_uuid": self.project_uuid,
286                     "name": shortname(tool.tool["id"]),
287                     "components": {},
288                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
289             logger.info("Pipeline instance %s", self.pipeline["uuid"])
290
291         if runnerjob and not kwargs.get("wait"):
292             runnerjob.run()
293             return runnerjob.uuid
294
295         self.poll_api = arvados.api('v1')
296         self.polling_thread = threading.Thread(target=self.poll_states)
297         self.polling_thread.start()
298
299         if runnerjob:
300             jobiter = iter((runnerjob,))
301         else:
302             if "cwl_runner_job" in kwargs:
303                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
304             jobiter = tool.job(job_order,
305                                self.output_callback,
306                                **kwargs)
307
308         try:
309             self.cond.acquire()
310             # Will continue to hold the lock for the duration of this code
311             # except when in cond.wait(), at which point on_message can update
312             # job state and process output callbacks.
313
314             loopperf = Perf(metrics, "jobiter")
315             loopperf.__enter__()
316             for runnable in jobiter:
317                 loopperf.__exit__()
318                 if runnable:
319                     with Perf(metrics, "run"):
320                         runnable.run(**kwargs)
321                 else:
322                     if self.processes:
323                         self.cond.wait(1)
324                     else:
325                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
326                         break
327                 loopperf.__enter__()
328             loopperf.__exit__()
329
330             while self.processes:
331                 self.cond.wait(1)
332
333         except UnsupportedRequirement:
334             raise
335         except:
336             if sys.exc_info()[0] is KeyboardInterrupt:
337                 logger.error("Interrupted, marking pipeline as failed")
338             else:
339                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
340             if self.pipeline:
341                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
342                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
343             if runnerjob and runnerjob.uuid and self.work_api == "containers":
344                 self.api.container_requests().update(uuid=runnerjob.uuid,
345                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
346         finally:
347             self.cond.release()
348             self.stop_polling.set()
349             self.polling_thread.join()
350
351         if self.final_status == "UnsupportedRequirement":
352             raise UnsupportedRequirement("Check log for details.")
353
354         if self.final_status != "success":
355             raise WorkflowException("Workflow failed.")
356
357         if self.final_output is None:
358             raise WorkflowException("Workflow did not return a result.")
359
360         if kwargs.get("submit") and isinstance(runnerjob, Runner):
361             logger.info("Final output collection %s", runnerjob.final_output)
362         else:
363             if self.output_name is None:
364                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
365             self.make_output_collection(self.output_name, self.final_output)
366
367         if kwargs.get("compute_checksum"):
368             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
369
370         return self.final_output
371
372
373 def versionstring():
374     """Print version string of key packages for provenance and debugging."""
375
376     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
377     arvpkg = pkg_resources.require("arvados-python-client")
378     cwlpkg = pkg_resources.require("cwltool")
379
380     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
381                                     "arvados-python-client", arvpkg[0].version,
382                                     "cwltool", cwlpkg[0].version)
383
384
385 def arg_parser():  # type: () -> argparse.ArgumentParser
386     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
387
388     parser.add_argument("--basedir", type=str,
389                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
390     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
391                         help="Output directory, default current directory")
392
393     parser.add_argument("--eval-timeout",
394                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
395                         type=float,
396                         default=20)
397     parser.add_argument("--version", action="store_true", help="Print version and exit")
398
399     exgroup = parser.add_mutually_exclusive_group()
400     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
401     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
402     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
403
404     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
405
406     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
407
408     exgroup = parser.add_mutually_exclusive_group()
409     exgroup.add_argument("--enable-reuse", action="store_true",
410                         default=True, dest="enable_reuse",
411                         help="")
412     exgroup.add_argument("--disable-reuse", action="store_false",
413                         default=True, dest="enable_reuse",
414                         help="")
415
416     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
417     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
418     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
419                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
420                         default=False)
421
422     exgroup = parser.add_mutually_exclusive_group()
423     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
424                         default=True, dest="submit")
425     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
426                         default=True, dest="submit")
427     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
428     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
429     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
430
431     exgroup = parser.add_mutually_exclusive_group()
432     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
433                         default=True, dest="wait")
434     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
435                         default=True, dest="wait")
436
437     parser.add_argument("--api", type=str,
438                         default=None, dest="work_api",
439                         help="Select work submission API, one of 'jobs' or 'containers'.")
440
441     parser.add_argument("--compute-checksum", action="store_true", default=False,
442                         help="Compute checksum of contents while collecting outputs",
443                         dest="compute_checksum")
444
445     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
446     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
447
448     return parser
449
450 def add_arv_hints():
451     cache = {}
452     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
453     cache["http://arvados.org/cwl"] = res.read()
454     res.close()
455     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
456     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
457     for n in extnames.names:
458         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
459             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
460         document_loader.idx["http://arvados.org/cwl#"+n] = {}
461
462 def main(args, stdout, stderr, api_client=None, keep_client=None):
463     parser = arg_parser()
464
465     job_order_object = None
466     arvargs = parser.parse_args(args)
467     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
468         job_order_object = ({}, "")
469
470     add_arv_hints()
471
472     try:
473         if api_client is None:
474             api_client=arvados.api('v1', model=OrderedJsonModel())
475         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
476     except Exception as e:
477         logger.error(e)
478         return 1
479
480     if arvargs.debug:
481         logger.setLevel(logging.DEBUG)
482
483     if arvargs.quiet:
484         logger.setLevel(logging.WARN)
485         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
486
487     if arvargs.metrics:
488         metrics.setLevel(logging.DEBUG)
489         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
490
491     arvargs.conformance_test = None
492     arvargs.use_container = True
493
494     return cwltool.main.main(args=arvargs,
495                              stdout=stdout,
496                              stderr=stderr,
497                              executor=runner.arv_executor,
498                              makeTool=runner.arv_make_tool,
499                              versionfunc=versionstring,
500                              job_order_object=job_order_object,
501                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))