11100: Separate "trash intermediate on success" behavior from "output intermediate...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77         self.intermediate_output_ttl = 0
78         self.intermediate_output_collections = []
79         self.trash_intermediate = False
80
81         if keep_client is not None:
82             self.keep_client = keep_client
83         else:
84             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
85
86         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
87
88         self.work_api = None
89         expected_api = ["jobs", "containers"]
90         for api in expected_api:
91             try:
92                 methods = self.api._rootDesc.get('resources')[api]['methods']
93                 if ('httpMethod' in methods['create'] and
94                     (work_api == api or work_api is None)):
95                     self.work_api = api
96                     break
97             except KeyError:
98                 pass
99
100         if not self.work_api:
101             if work_api is None:
102                 raise Exception("No supported APIs")
103             else:
104                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
105
106     def arv_make_tool(self, toolpath_object, **kwargs):
107         kwargs["work_api"] = self.work_api
108         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
109                                                 api_client=self.api,
110                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
111                                                 num_retries=self.num_retries)
112         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
113             return ArvadosCommandTool(self, toolpath_object, **kwargs)
114         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
115             return ArvadosWorkflow(self, toolpath_object, **kwargs)
116         else:
117             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
118
119     def output_callback(self, out, processStatus):
120         if processStatus == "success":
121             logger.info("Overall process status is %s", processStatus)
122             if self.pipeline:
123                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
124                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
125         else:
126             logger.warn("Overall process status is %s", processStatus)
127             if self.pipeline:
128                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
129                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
130         self.final_status = processStatus
131         self.final_output = out
132
133     def on_message(self, event):
134         if "object_uuid" in event:
135             if event["object_uuid"] in self.processes and event["event_type"] == "update":
136                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
137                     uuid = event["object_uuid"]
138                     with self.lock:
139                         j = self.processes[uuid]
140                         logger.info("%s %s is Running", self.label(j), uuid)
141                         j.running = True
142                         j.update_pipeline_component(event["properties"]["new_attributes"])
143                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
144                     uuid = event["object_uuid"]
145                     try:
146                         self.cond.acquire()
147                         j = self.processes[uuid]
148                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
149                         with Perf(metrics, "done %s" % j.name):
150                             j.done(event["properties"]["new_attributes"])
151                         self.cond.notify()
152                     finally:
153                         self.cond.release()
154
155     def label(self, obj):
156         return "[%s %s]" % (self.work_api[0:-1], obj.name)
157
158     def poll_states(self):
159         """Poll status of jobs or containers listed in the processes dict.
160
161         Runs in a separate thread.
162         """
163
164         try:
165             while True:
166                 self.stop_polling.wait(15)
167                 if self.stop_polling.is_set():
168                     break
169                 with self.lock:
170                     keys = self.processes.keys()
171                 if not keys:
172                     continue
173
174                 if self.work_api == "containers":
175                     table = self.poll_api.container_requests()
176                 elif self.work_api == "jobs":
177                     table = self.poll_api.jobs()
178
179                 try:
180                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
181                 except Exception as e:
182                     logger.warn("Error checking states on API server: %s", e)
183                     continue
184
185                 for p in proc_states["items"]:
186                     self.on_message({
187                         "object_uuid": p["uuid"],
188                         "event_type": "update",
189                         "properties": {
190                             "new_attributes": p
191                         }
192                     })
193         except:
194             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
195             self.cond.acquire()
196             self.processes.clear()
197             self.cond.notify()
198             self.cond.release()
199         finally:
200             self.stop_polling.set()
201
202     def get_uploaded(self):
203         return self.uploaded.copy()
204
205     def add_uploaded(self, src, pair):
206         self.uploaded[src] = pair
207
208     def add_intermediate_output(self, uuid):
209         if uuid:
210             self.intermediate_output_collections.append(uuid)
211
212     def trash_intermediate_output(self):
213         logger.info("Cleaning up intermediate output collections")
214         for i in self.intermediate_output_collections:
215             try:
216                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
217             except:
218                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
219             if sys.exc_info()[0] is KeyboardInterrupt:
220                 break
221
222     def check_features(self, obj):
223         if isinstance(obj, dict):
224             if obj.get("writable"):
225                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
226             if obj.get("class") == "DockerRequirement":
227                 if obj.get("dockerOutputDirectory"):
228                     # TODO: can be supported by containers API, but not jobs API.
229                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
230                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
231             for v in obj.itervalues():
232                 self.check_features(v)
233         elif isinstance(obj, list):
234             for i,v in enumerate(obj):
235                 with SourceLine(obj, i, UnsupportedRequirement):
236                     self.check_features(v)
237
238     def make_output_collection(self, name, tagsString, outputObj):
239         outputObj = copy.deepcopy(outputObj)
240
241         files = []
242         def capture(fileobj):
243             files.append(fileobj)
244
245         adjustDirObjs(outputObj, capture)
246         adjustFileObjs(outputObj, capture)
247
248         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
249
250         final = arvados.collection.Collection(api_client=self.api,
251                                               keep_client=self.keep_client,
252                                               num_retries=self.num_retries)
253
254         for k,v in generatemapper.items():
255             if k.startswith("_:"):
256                 if v.type == "Directory":
257                     continue
258                 if v.type == "CreateFile":
259                     with final.open(v.target, "wb") as f:
260                         f.write(v.resolved.encode("utf-8"))
261                     continue
262
263             if not k.startswith("keep:"):
264                 raise Exception("Output source is not in keep or a literal")
265             sp = k.split("/")
266             srccollection = sp[0][5:]
267             try:
268                 reader = self.collection_cache.get(srccollection)
269                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
270                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
271             except arvados.errors.ArgumentError as e:
272                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
273                 raise
274             except IOError as e:
275                 logger.warn("While preparing output collection: %s", e)
276
277         def rewrite(fileobj):
278             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
279             for k in ("basename", "listing", "contents"):
280                 if k in fileobj:
281                     del fileobj[k]
282
283         adjustDirObjs(outputObj, rewrite)
284         adjustFileObjs(outputObj, rewrite)
285
286         with final.open("cwl.output.json", "w") as f:
287             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
288
289         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
290
291         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
292                     final.api_response()["name"],
293                     final.manifest_locator())
294
295         final_uuid = final.manifest_locator()
296         tags = tagsString.split(',')
297         for tag in tags:
298              self.api.links().create(body={
299                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
300                 }).execute(num_retries=self.num_retries)
301
302         def finalcollection(fileobj):
303             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
304
305         adjustDirObjs(outputObj, finalcollection)
306         adjustFileObjs(outputObj, finalcollection)
307
308         return (outputObj, final)
309
310     def set_crunch_output(self):
311         if self.work_api == "containers":
312             try:
313                 current = self.api.containers().current().execute(num_retries=self.num_retries)
314             except ApiError as e:
315                 # Status code 404 just means we're not running in a container.
316                 if e.resp.status != 404:
317                     logger.info("Getting current container: %s", e)
318                 return
319             try:
320                 self.api.containers().update(uuid=current['uuid'],
321                                              body={
322                                                  'output': self.final_output_collection.portable_data_hash(),
323                                              }).execute(num_retries=self.num_retries)
324                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
325                                               body={
326                                                   'is_trashed': True
327                                               }).execute(num_retries=self.num_retries)
328             except Exception as e:
329                 logger.info("Setting container output: %s", e)
330         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
331             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
332                                    body={
333                                        'output': self.final_output_collection.portable_data_hash(),
334                                        'success': self.final_status == "success",
335                                        'progress':1.0
336                                    }).execute(num_retries=self.num_retries)
337
338     def arv_executor(self, tool, job_order, **kwargs):
339         self.debug = kwargs.get("debug")
340
341         tool.visit(self.check_features)
342
343         self.project_uuid = kwargs.get("project_uuid")
344         self.pipeline = None
345         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
346                                                                  collection_cache=self.collection_cache)
347         self.fs_access = make_fs_access(kwargs["basedir"])
348
349         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
350         self.trash_intermediate = kwargs["trash_intermediate"]
351         if self.intermediate_output_ttl and self.work_api != "containers":
352             raise Exception("--intermediate-output-ttl is only supported when using the containers api.")
353
354         if not kwargs.get("name"):
355             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
356
357         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
358         # Also uploads docker images.
359         upload_workflow_deps(self, tool)
360
361         # Reload tool object which may have been updated by
362         # upload_workflow_deps
363         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
364                                   makeTool=self.arv_make_tool,
365                                   loader=tool.doc_loader,
366                                   avsc_names=tool.doc_schema,
367                                   metadata=tool.metadata)
368
369         # Upload local file references in the job order.
370         job_order = upload_job_order(self, "%s input" % kwargs["name"],
371                                      tool, job_order)
372
373         existing_uuid = kwargs.get("update_workflow")
374         if existing_uuid or kwargs.get("create_workflow"):
375             # Create a pipeline template or workflow record and exit.
376             if self.work_api == "jobs":
377                 tmpl = RunnerTemplate(self, tool, job_order,
378                                       kwargs.get("enable_reuse"),
379                                       uuid=existing_uuid,
380                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
381                                       name=kwargs["name"])
382                 tmpl.save()
383                 # cwltool.main will write our return value to stdout.
384                 return (tmpl.uuid, "success")
385             elif self.work_api == "containers":
386                 return (upload_workflow(self, tool, job_order,
387                                         self.project_uuid,
388                                         uuid=existing_uuid,
389                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
390                                         name=kwargs["name"]),
391                         "success")
392
393         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
394
395         kwargs["make_fs_access"] = make_fs_access
396         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
397         kwargs["use_container"] = True
398         kwargs["tmpdir_prefix"] = "tmp"
399         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
400
401         if self.work_api == "containers":
402             kwargs["outdir"] = "/var/spool/cwl"
403             kwargs["docker_outdir"] = "/var/spool/cwl"
404             kwargs["tmpdir"] = "/tmp"
405             kwargs["docker_tmpdir"] = "/tmp"
406         elif self.work_api == "jobs":
407             kwargs["outdir"] = "$(task.outdir)"
408             kwargs["docker_outdir"] = "$(task.outdir)"
409             kwargs["tmpdir"] = "$(task.tmpdir)"
410
411         runnerjob = None
412         if kwargs.get("submit"):
413             # Submit a runner job to run the workflow for us.
414             if self.work_api == "containers":
415                 if tool.tool["class"] == "CommandLineTool":
416                     kwargs["runnerjob"] = tool.tool["id"]
417                     upload_dependencies(self,
418                                         kwargs["name"],
419                                         tool.doc_loader,
420                                         tool.tool,
421                                         tool.tool["id"],
422                                         False)
423                     runnerjob = tool.job(job_order,
424                                          self.output_callback,
425                                          **kwargs).next()
426                 else:
427                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
428                                                 self.output_name,
429                                                 self.output_tags,
430                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
431                                                 name=kwargs.get("name"),
432                                                 on_error=kwargs.get("on_error"),
433                                                 submit_runner_image=kwargs.get("submit_runner_image"),
434                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
435             elif self.work_api == "jobs":
436                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
437                                       self.output_name,
438                                       self.output_tags,
439                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
440                                       name=kwargs.get("name"),
441                                       on_error=kwargs.get("on_error"),
442                                       submit_runner_image=kwargs.get("submit_runner_image"))
443
444         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
445             # Create pipeline for local run
446             self.pipeline = self.api.pipeline_instances().create(
447                 body={
448                     "owner_uuid": self.project_uuid,
449                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
450                     "components": {},
451                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
452             logger.info("Pipeline instance %s", self.pipeline["uuid"])
453
454         if runnerjob and not kwargs.get("wait"):
455             runnerjob.run(wait=kwargs.get("wait"))
456             return (runnerjob.uuid, "success")
457
458         self.poll_api = arvados.api('v1')
459         self.polling_thread = threading.Thread(target=self.poll_states)
460         self.polling_thread.start()
461
462         if runnerjob:
463             jobiter = iter((runnerjob,))
464         else:
465             if "cwl_runner_job" in kwargs:
466                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
467             jobiter = tool.job(job_order,
468                                self.output_callback,
469                                **kwargs)
470
471         try:
472             self.cond.acquire()
473             # Will continue to hold the lock for the duration of this code
474             # except when in cond.wait(), at which point on_message can update
475             # job state and process output callbacks.
476
477             loopperf = Perf(metrics, "jobiter")
478             loopperf.__enter__()
479             for runnable in jobiter:
480                 loopperf.__exit__()
481
482                 if self.stop_polling.is_set():
483                     break
484
485                 if runnable:
486                     with Perf(metrics, "run"):
487                         runnable.run(**kwargs)
488                 else:
489                     if self.processes:
490                         self.cond.wait(1)
491                     else:
492                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
493                         break
494                 loopperf.__enter__()
495             loopperf.__exit__()
496
497             while self.processes:
498                 self.cond.wait(1)
499
500         except UnsupportedRequirement:
501             raise
502         except:
503             if sys.exc_info()[0] is KeyboardInterrupt:
504                 logger.error("Interrupted, marking pipeline as failed")
505             else:
506                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
507             if self.pipeline:
508                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
509                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
510             if runnerjob and runnerjob.uuid and self.work_api == "containers":
511                 self.api.container_requests().update(uuid=runnerjob.uuid,
512                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
513         finally:
514             self.cond.release()
515             self.stop_polling.set()
516             self.polling_thread.join()
517
518         if self.final_status == "UnsupportedRequirement":
519             raise UnsupportedRequirement("Check log for details.")
520
521         if self.final_output is None:
522             raise WorkflowException("Workflow did not return a result.")
523
524         if kwargs.get("submit") and isinstance(runnerjob, Runner):
525             logger.info("Final output collection %s", runnerjob.final_output)
526         else:
527             if self.output_name is None:
528                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
529             if self.output_tags is None:
530                 self.output_tags = ""
531             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
532             self.set_crunch_output()
533
534         if kwargs.get("compute_checksum"):
535             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
536             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
537
538         if self.trash_intermediate and self.final_status == "success":
539             self.trash_intermediate_output()
540
541         return (self.final_output, self.final_status)
542
543
544 def versionstring():
545     """Print version string of key packages for provenance and debugging."""
546
547     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
548     arvpkg = pkg_resources.require("arvados-python-client")
549     cwlpkg = pkg_resources.require("cwltool")
550
551     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
552                                     "arvados-python-client", arvpkg[0].version,
553                                     "cwltool", cwlpkg[0].version)
554
555
556 def arg_parser():  # type: () -> argparse.ArgumentParser
557     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
558
559     parser.add_argument("--basedir", type=str,
560                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
561     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
562                         help="Output directory, default current directory")
563
564     parser.add_argument("--eval-timeout",
565                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
566                         type=float,
567                         default=20)
568
569     exgroup = parser.add_mutually_exclusive_group()
570     exgroup.add_argument("--print-dot", action="store_true",
571                          help="Print workflow visualization in graphviz format and exit")
572     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
573     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
574
575     exgroup = parser.add_mutually_exclusive_group()
576     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
577     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
578     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
579
580     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
581
582     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
583
584     exgroup = parser.add_mutually_exclusive_group()
585     exgroup.add_argument("--enable-reuse", action="store_true",
586                         default=True, dest="enable_reuse",
587                         help="Enable job or container reuse (default)")
588     exgroup.add_argument("--disable-reuse", action="store_false",
589                         default=True, dest="enable_reuse",
590                         help="Disable job or container reuse")
591
592     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
593     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
594     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
595     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
596                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
597                         default=False)
598
599     exgroup = parser.add_mutually_exclusive_group()
600     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
601                         default=True, dest="submit")
602     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
603                         default=True, dest="submit")
604     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
605                          dest="create_workflow")
606     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
607     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
608
609     exgroup = parser.add_mutually_exclusive_group()
610     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
611                         default=True, dest="wait")
612     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
613                         default=True, dest="wait")
614
615     exgroup = parser.add_mutually_exclusive_group()
616     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
617                         default=True, dest="log_timestamps")
618     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
619                         default=True, dest="log_timestamps")
620
621     parser.add_argument("--api", type=str,
622                         default=None, dest="work_api",
623                         choices=("jobs", "containers"),
624                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
625
626     parser.add_argument("--compute-checksum", action="store_true", default=False,
627                         help="Compute checksum of contents while collecting outputs",
628                         dest="compute_checksum")
629
630     parser.add_argument("--submit-runner-ram", type=int,
631                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
632                         default=1024)
633
634     parser.add_argument("--submit-runner-image", type=str,
635                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
636                         default=None)
637
638     parser.add_argument("--name", type=str,
639                         help="Name to use for workflow execution instance.",
640                         default=None)
641
642     parser.add_argument("--on-error", type=str,
643                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
644                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
645
646     parser.add_argument("--enable-dev", action="store_true",
647                         help="Enable loading and running development versions "
648                              "of CWL spec.", default=False)
649
650     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
651                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
652                         default=0)
653
654     exgroup = parser.add_mutually_exclusive_group()
655     exgroup.add_argument("--trash-intermediate", action="store_true",
656                         default=False, dest="trash_intermediate",
657                          help="Immediately trash intermediate outputs on workflow success.")
658     exgroup.add_argument("--no-trash-intermediate", action="store_false",
659                         default=False, dest="trash_intermediate",
660                         help="Do not trash intermediate outputs (default).")
661
662     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
663     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
664
665     return parser
666
667 def add_arv_hints():
668     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
669     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
670     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
671     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
672     res.close()
673     cwltool.process.supportedProcessRequirements.extend([
674         "http://arvados.org/cwl#RunInSingleContainer",
675         "http://arvados.org/cwl#OutputDirType",
676         "http://arvados.org/cwl#RuntimeConstraints",
677         "http://arvados.org/cwl#PartitionRequirement",
678         "http://arvados.org/cwl#APIRequirement",
679         "http://commonwl.org/cwltool#LoadListingRequirement",
680         "http://arvados.org/cwl#IntermediateOutput"
681     ])
682
683 def main(args, stdout, stderr, api_client=None, keep_client=None):
684     parser = arg_parser()
685
686     job_order_object = None
687     arvargs = parser.parse_args(args)
688
689     if arvargs.version:
690         print versionstring()
691         return
692
693     if arvargs.update_workflow:
694         if arvargs.update_workflow.find('-7fd4e-') == 5:
695             want_api = 'containers'
696         elif arvargs.update_workflow.find('-p5p6p-') == 5:
697             want_api = 'jobs'
698         else:
699             want_api = None
700         if want_api and arvargs.work_api and want_api != arvargs.work_api:
701             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
702                 arvargs.update_workflow, want_api, arvargs.work_api))
703             return 1
704         arvargs.work_api = want_api
705
706     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
707         job_order_object = ({}, "")
708
709     add_arv_hints()
710
711     try:
712         if api_client is None:
713             api_client=arvados.api('v1', model=OrderedJsonModel())
714         if keep_client is None:
715             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
716         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
717                               num_retries=4, output_name=arvargs.output_name,
718                               output_tags=arvargs.output_tags)
719     except Exception as e:
720         logger.error(e)
721         return 1
722
723     if arvargs.debug:
724         logger.setLevel(logging.DEBUG)
725         logging.getLogger('arvados').setLevel(logging.DEBUG)
726
727     if arvargs.quiet:
728         logger.setLevel(logging.WARN)
729         logging.getLogger('arvados').setLevel(logging.WARN)
730         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
731
732     if arvargs.metrics:
733         metrics.setLevel(logging.DEBUG)
734         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
735
736     if arvargs.log_timestamps:
737         arvados.log_handler.setFormatter(logging.Formatter(
738             '%(asctime)s %(name)s %(levelname)s: %(message)s',
739             '%Y-%m-%d %H:%M:%S'))
740     else:
741         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
742
743     arvargs.conformance_test = None
744     arvargs.use_container = True
745     arvargs.relax_path_checks = True
746     arvargs.validate = None
747
748     make_fs_access = partial(CollectionFsAccess,
749                            collection_cache=runner.collection_cache)
750
751     return cwltool.main.main(args=arvargs,
752                              stdout=stdout,
753                              stderr=stderr,
754                              executor=runner.arv_executor,
755                              makeTool=runner.arv_make_tool,
756                              versionfunc=versionstring,
757                              job_order_object=job_order_object,
758                              make_fs_access=make_fs_access,
759                              fetcher_constructor=partial(CollectionFetcher,
760                                                          api_client=api_client,
761                                                          fs_access=make_fs_access(""),
762                                                          num_retries=runner.num_retries),
763                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
764                              logger_handler=arvados.log_handler,
765                              custom_schema_callback=add_arv_hints)