Merge branch '6347-log-timestamps'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import schema_salad
22 from schema_salad.sourceline import SourceLine
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import NoFollowPathMapper
37 from ._version import __version__
38
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
44
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
48
49 arvados.log_handler.setFormatter(logging.Formatter(
50         '%(asctime)s %(name)s %(levelname)s: %(message)s',
51         '%Y-%m-%d %H:%M:%S'))
52
53 class ArvCwlRunner(object):
54     """Execute a CWL tool or workflow, submit work (using either jobs or
55     containers API), wait for them to complete, and report output.
56
57     """
58
59     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
60         self.api = api_client
61         self.processes = {}
62         self.lock = threading.Lock()
63         self.cond = threading.Condition(self.lock)
64         self.final_output = None
65         self.final_status = None
66         self.uploaded = {}
67         self.num_retries = num_retries
68         self.uuid = None
69         self.stop_polling = threading.Event()
70         self.poll_api = None
71         self.pipeline = None
72         self.final_output_collection = None
73         self.output_name = output_name
74         self.output_tags = output_tags
75         self.project_uuid = None
76
77         if keep_client is not None:
78             self.keep_client = keep_client
79         else:
80             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
81
82         self.work_api = None
83         expected_api = ["jobs", "containers"]
84         for api in expected_api:
85             try:
86                 methods = self.api._rootDesc.get('resources')[api]['methods']
87                 if ('httpMethod' in methods['create'] and
88                     (work_api == api or work_api is None)):
89                     self.work_api = api
90                     break
91             except KeyError:
92                 pass
93
94         if not self.work_api:
95             if work_api is None:
96                 raise Exception("No supported APIs")
97             else:
98                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
99
100     def arv_make_tool(self, toolpath_object, **kwargs):
101         kwargs["work_api"] = self.work_api
102         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
103                                                 api_client=self.api,
104                                                 keep_client=self.keep_client)
105         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106             return ArvadosCommandTool(self, toolpath_object, **kwargs)
107         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108             return ArvadosWorkflow(self, toolpath_object, **kwargs)
109         else:
110             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
111
112     def output_callback(self, out, processStatus):
113         if processStatus == "success":
114             logger.info("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
118         else:
119             logger.warn("Overall process status is %s", processStatus)
120             if self.pipeline:
121                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
123         self.final_status = processStatus
124         self.final_output = out
125
126     def on_message(self, event):
127         if "object_uuid" in event:
128             if event["object_uuid"] in self.processes and event["event_type"] == "update":
129                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130                     uuid = event["object_uuid"]
131                     with self.lock:
132                         j = self.processes[uuid]
133                         logger.info("%s %s is Running", self.label(j), uuid)
134                         j.running = True
135                         j.update_pipeline_component(event["properties"]["new_attributes"])
136                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137                     uuid = event["object_uuid"]
138                     try:
139                         self.cond.acquire()
140                         j = self.processes[uuid]
141                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142                         with Perf(metrics, "done %s" % j.name):
143                             j.done(event["properties"]["new_attributes"])
144                         self.cond.notify()
145                     finally:
146                         self.cond.release()
147
148     def label(self, obj):
149         return "[%s %s]" % (self.work_api[0:-1], obj.name)
150
151     def poll_states(self):
152         """Poll status of jobs or containers listed in the processes dict.
153
154         Runs in a separate thread.
155         """
156
157         try:
158             while True:
159                 self.stop_polling.wait(15)
160                 if self.stop_polling.is_set():
161                     break
162                 with self.lock:
163                     keys = self.processes.keys()
164                 if not keys:
165                     continue
166
167                 if self.work_api == "containers":
168                     table = self.poll_api.container_requests()
169                 elif self.work_api == "jobs":
170                     table = self.poll_api.jobs()
171
172                 try:
173                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174                 except Exception as e:
175                     logger.warn("Error checking states on API server: %s", e)
176                     continue
177
178                 for p in proc_states["items"]:
179                     self.on_message({
180                         "object_uuid": p["uuid"],
181                         "event_type": "update",
182                         "properties": {
183                             "new_attributes": p
184                         }
185                     })
186         except:
187             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
188             self.cond.acquire()
189             self.processes.clear()
190             self.cond.notify()
191             self.cond.release()
192         finally:
193             self.stop_polling.set()
194
195     def get_uploaded(self):
196         return self.uploaded.copy()
197
198     def add_uploaded(self, src, pair):
199         self.uploaded[src] = pair
200
201     def check_features(self, obj):
202         if isinstance(obj, dict):
203             if obj.get("writable"):
204                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
205             if obj.get("class") == "CommandLineTool":
206                 if self.work_api == "containers":
207                     if obj.get("stdin"):
208                         raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
209                     if obj.get("stderr"):
210                         raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
211             if obj.get("class") == "DockerRequirement":
212                 if obj.get("dockerOutputDirectory"):
213                     # TODO: can be supported by containers API, but not jobs API.
214                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
215                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
216             for v in obj.itervalues():
217                 self.check_features(v)
218         elif isinstance(obj, list):
219             for i,v in enumerate(obj):
220                 with SourceLine(obj, i, UnsupportedRequirement):
221                     self.check_features(v)
222
223     def make_output_collection(self, name, tagsString, outputObj):
224         outputObj = copy.deepcopy(outputObj)
225
226         files = []
227         def capture(fileobj):
228             files.append(fileobj)
229
230         adjustDirObjs(outputObj, capture)
231         adjustFileObjs(outputObj, capture)
232
233         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
234
235         final = arvados.collection.Collection(api_client=self.api,
236                                               keep_client=self.keep_client,
237                                               num_retries=self.num_retries)
238
239         srccollections = {}
240         for k,v in generatemapper.items():
241             if k.startswith("_:"):
242                 if v.type == "Directory":
243                     continue
244                 if v.type == "CreateFile":
245                     with final.open(v.target, "wb") as f:
246                         f.write(v.resolved.encode("utf-8"))
247                     continue
248
249             if not k.startswith("keep:"):
250                 raise Exception("Output source is not in keep or a literal")
251             sp = k.split("/")
252             srccollection = sp[0][5:]
253             if srccollection not in srccollections:
254                 try:
255                     srccollections[srccollection] = arvados.collection.CollectionReader(
256                         srccollection,
257                         api_client=self.api,
258                         keep_client=self.keep_client,
259                         num_retries=self.num_retries)
260                 except arvados.errors.ArgumentError as e:
261                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
262                     raise
263             reader = srccollections[srccollection]
264             try:
265                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
266                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
267             except IOError as e:
268                 logger.warn("While preparing output collection: %s", e)
269
270         def rewrite(fileobj):
271             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
272             for k in ("basename", "listing", "contents"):
273                 if k in fileobj:
274                     del fileobj[k]
275
276         adjustDirObjs(outputObj, rewrite)
277         adjustFileObjs(outputObj, rewrite)
278
279         with final.open("cwl.output.json", "w") as f:
280             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
281
282         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
283
284         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
285                     final.api_response()["name"],
286                     final.manifest_locator())
287
288         final_uuid = final.manifest_locator()
289         tags = tagsString.split(',')
290         for tag in tags:
291              self.api.links().create(body={
292                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
293                 }).execute(num_retries=self.num_retries)
294
295         def finalcollection(fileobj):
296             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
297
298         adjustDirObjs(outputObj, finalcollection)
299         adjustFileObjs(outputObj, finalcollection)
300
301         return (outputObj, final)
302
303     def set_crunch_output(self):
304         if self.work_api == "containers":
305             try:
306                 current = self.api.containers().current().execute(num_retries=self.num_retries)
307             except ApiError as e:
308                 # Status code 404 just means we're not running in a container.
309                 if e.resp.status != 404:
310                     logger.info("Getting current container: %s", e)
311                 return
312             try:
313                 self.api.containers().update(uuid=current['uuid'],
314                                              body={
315                                                  'output': self.final_output_collection.portable_data_hash(),
316                                              }).execute(num_retries=self.num_retries)
317                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
318                                               body={
319                                                   'is_trashed': True
320                                               }).execute(num_retries=self.num_retries)
321             except Exception as e:
322                 logger.info("Setting container output: %s", e)
323         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
324             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
325                                    body={
326                                        'output': self.final_output_collection.portable_data_hash(),
327                                        'success': self.final_status == "success",
328                                        'progress':1.0
329                                    }).execute(num_retries=self.num_retries)
330
331     def arv_executor(self, tool, job_order, **kwargs):
332         self.debug = kwargs.get("debug")
333
334         tool.visit(self.check_features)
335
336         self.project_uuid = kwargs.get("project_uuid")
337         self.pipeline = None
338         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
339                                                                  api_client=self.api,
340                                                                  keep_client=self.keep_client)
341         self.fs_access = make_fs_access(kwargs["basedir"])
342
343         if not kwargs.get("name"):
344             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
345
346         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
347         # Also uploads docker images.
348         upload_workflow_deps(self, tool)
349
350         # Reload tool object which may have been updated by
351         # upload_workflow_deps
352         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
353                                   makeTool=self.arv_make_tool,
354                                   loader=tool.doc_loader,
355                                   avsc_names=tool.doc_schema,
356                                   metadata=tool.metadata)
357
358         # Upload local file references in the job order.
359         job_order = upload_job_order(self, "%s input" % kwargs["name"],
360                                      tool, job_order)
361
362         existing_uuid = kwargs.get("update_workflow")
363         if existing_uuid or kwargs.get("create_workflow"):
364             # Create a pipeline template or workflow record and exit.
365             if self.work_api == "jobs":
366                 tmpl = RunnerTemplate(self, tool, job_order,
367                                       kwargs.get("enable_reuse"),
368                                       uuid=existing_uuid,
369                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
370                                       name=kwargs["name"])
371                 tmpl.save()
372                 # cwltool.main will write our return value to stdout.
373                 return (tmpl.uuid, "success")
374             elif self.work_api == "containers":
375                 return (upload_workflow(self, tool, job_order,
376                                         self.project_uuid,
377                                         uuid=existing_uuid,
378                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
379                                         name=kwargs["name"]),
380                         "success")
381
382         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
383
384         kwargs["make_fs_access"] = make_fs_access
385         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
386         kwargs["use_container"] = True
387         kwargs["tmpdir_prefix"] = "tmp"
388         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
389
390         if self.work_api == "containers":
391             kwargs["outdir"] = "/var/spool/cwl"
392             kwargs["docker_outdir"] = "/var/spool/cwl"
393             kwargs["tmpdir"] = "/tmp"
394             kwargs["docker_tmpdir"] = "/tmp"
395         elif self.work_api == "jobs":
396             kwargs["outdir"] = "$(task.outdir)"
397             kwargs["docker_outdir"] = "$(task.outdir)"
398             kwargs["tmpdir"] = "$(task.tmpdir)"
399
400         runnerjob = None
401         if kwargs.get("submit"):
402             # Submit a runner job to run the workflow for us.
403             if self.work_api == "containers":
404                 if tool.tool["class"] == "CommandLineTool":
405                     kwargs["runnerjob"] = tool.tool["id"]
406                     upload_dependencies(self,
407                                         kwargs["name"],
408                                         tool.doc_loader,
409                                         tool.tool,
410                                         tool.tool["id"],
411                                         False)
412                     runnerjob = tool.job(job_order,
413                                          self.output_callback,
414                                          **kwargs).next()
415                 else:
416                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
417                                                 self.output_name,
418                                                 self.output_tags,
419                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
420                                                 name=kwargs.get("name"),
421                                                 on_error=kwargs.get("on_error"),
422                                                 submit_runner_image=kwargs.get("submit_runner_image"))
423             elif self.work_api == "jobs":
424                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
425                                       self.output_name,
426                                       self.output_tags,
427                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
428                                       name=kwargs.get("name"),
429                                       on_error=kwargs.get("on_error"),
430                                       submit_runner_image=kwargs.get("submit_runner_image"))
431
432         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
433             # Create pipeline for local run
434             self.pipeline = self.api.pipeline_instances().create(
435                 body={
436                     "owner_uuid": self.project_uuid,
437                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
438                     "components": {},
439                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
440             logger.info("Pipeline instance %s", self.pipeline["uuid"])
441
442         if runnerjob and not kwargs.get("wait"):
443             runnerjob.run(wait=kwargs.get("wait"))
444             return (runnerjob.uuid, "success")
445
446         self.poll_api = arvados.api('v1')
447         self.polling_thread = threading.Thread(target=self.poll_states)
448         self.polling_thread.start()
449
450         if runnerjob:
451             jobiter = iter((runnerjob,))
452         else:
453             if "cwl_runner_job" in kwargs:
454                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
455             jobiter = tool.job(job_order,
456                                self.output_callback,
457                                **kwargs)
458
459         try:
460             self.cond.acquire()
461             # Will continue to hold the lock for the duration of this code
462             # except when in cond.wait(), at which point on_message can update
463             # job state and process output callbacks.
464
465             loopperf = Perf(metrics, "jobiter")
466             loopperf.__enter__()
467             for runnable in jobiter:
468                 loopperf.__exit__()
469
470                 if self.stop_polling.is_set():
471                     break
472
473                 if runnable:
474                     with Perf(metrics, "run"):
475                         runnable.run(**kwargs)
476                 else:
477                     if self.processes:
478                         self.cond.wait(1)
479                     else:
480                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
481                         break
482                 loopperf.__enter__()
483             loopperf.__exit__()
484
485             while self.processes:
486                 self.cond.wait(1)
487
488         except UnsupportedRequirement:
489             raise
490         except:
491             if sys.exc_info()[0] is KeyboardInterrupt:
492                 logger.error("Interrupted, marking pipeline as failed")
493             else:
494                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
495             if self.pipeline:
496                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
497                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
498             if runnerjob and runnerjob.uuid and self.work_api == "containers":
499                 self.api.container_requests().update(uuid=runnerjob.uuid,
500                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
501         finally:
502             self.cond.release()
503             self.stop_polling.set()
504             self.polling_thread.join()
505
506         if self.final_status == "UnsupportedRequirement":
507             raise UnsupportedRequirement("Check log for details.")
508
509         if self.final_output is None:
510             raise WorkflowException("Workflow did not return a result.")
511
512         if kwargs.get("submit") and isinstance(runnerjob, Runner):
513             logger.info("Final output collection %s", runnerjob.final_output)
514         else:
515             if self.output_name is None:
516                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
517             if self.output_tags is None:
518                 self.output_tags = ""
519             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
520             self.set_crunch_output()
521
522         if kwargs.get("compute_checksum"):
523             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
524             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
525
526         return (self.final_output, self.final_status)
527
528
529 def versionstring():
530     """Print version string of key packages for provenance and debugging."""
531
532     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
533     arvpkg = pkg_resources.require("arvados-python-client")
534     cwlpkg = pkg_resources.require("cwltool")
535
536     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
537                                     "arvados-python-client", arvpkg[0].version,
538                                     "cwltool", cwlpkg[0].version)
539
540
541 def arg_parser():  # type: () -> argparse.ArgumentParser
542     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
543
544     parser.add_argument("--basedir", type=str,
545                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
546     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
547                         help="Output directory, default current directory")
548
549     parser.add_argument("--eval-timeout",
550                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
551                         type=float,
552                         default=20)
553     parser.add_argument("--version", action="store_true", help="Print version and exit")
554
555     exgroup = parser.add_mutually_exclusive_group()
556     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
557     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
558     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
559
560     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
561
562     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
563
564     exgroup = parser.add_mutually_exclusive_group()
565     exgroup.add_argument("--enable-reuse", action="store_true",
566                         default=True, dest="enable_reuse",
567                         help="")
568     exgroup.add_argument("--disable-reuse", action="store_false",
569                         default=True, dest="enable_reuse",
570                         help="")
571
572     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
573     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
574     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
575     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
576                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
577                         default=False)
578
579     exgroup = parser.add_mutually_exclusive_group()
580     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
581                         default=True, dest="submit")
582     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
583                         default=True, dest="submit")
584     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
585                          dest="create_workflow")
586     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
587     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
588
589     exgroup = parser.add_mutually_exclusive_group()
590     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
591                         default=True, dest="wait")
592     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
593                         default=True, dest="wait")
594
595     exgroup = parser.add_mutually_exclusive_group()
596     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
597                         default=True, dest="log_timestamps")
598     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
599                         default=True, dest="log_timestamps")
600
601     parser.add_argument("--api", type=str,
602                         default=None, dest="work_api",
603                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
604
605     parser.add_argument("--compute-checksum", action="store_true", default=False,
606                         help="Compute checksum of contents while collecting outputs",
607                         dest="compute_checksum")
608
609     parser.add_argument("--submit-runner-ram", type=int,
610                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
611                         default=1024)
612
613     parser.add_argument("--submit-runner-image", type=str,
614                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
615                         default=None)
616
617     parser.add_argument("--name", type=str,
618                         help="Name to use for workflow execution instance.",
619                         default=None)
620
621     parser.add_argument("--on-error", type=str,
622                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
623                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
624
625     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
626     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
627
628     return parser
629
630 def add_arv_hints():
631     cache = {}
632     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
633     cache["http://arvados.org/cwl"] = res.read()
634     res.close()
635     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
636     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
637     for n in extnames.names:
638         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
639             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
640         document_loader.idx["http://arvados.org/cwl#"+n] = {}
641
642 def main(args, stdout, stderr, api_client=None, keep_client=None):
643     parser = arg_parser()
644
645     job_order_object = None
646     arvargs = parser.parse_args(args)
647
648     if arvargs.version:
649         print versionstring()
650         return
651
652     if arvargs.update_workflow:
653         if arvargs.update_workflow.find('-7fd4e-') == 5:
654             want_api = 'containers'
655         elif arvargs.update_workflow.find('-p5p6p-') == 5:
656             want_api = 'jobs'
657         else:
658             want_api = None
659         if want_api and arvargs.work_api and want_api != arvargs.work_api:
660             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
661                 arvargs.update_workflow, want_api, arvargs.work_api))
662             return 1
663         arvargs.work_api = want_api
664
665     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
666         job_order_object = ({}, "")
667
668     add_arv_hints()
669
670     try:
671         if api_client is None:
672             api_client=arvados.api('v1', model=OrderedJsonModel())
673         if keep_client is None:
674             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
675         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
676                               num_retries=4, output_name=arvargs.output_name,
677                               output_tags=arvargs.output_tags)
678     except Exception as e:
679         logger.error(e)
680         return 1
681
682     if arvargs.debug:
683         logger.setLevel(logging.DEBUG)
684         logging.getLogger('arvados').setLevel(logging.DEBUG)
685
686     if arvargs.quiet:
687         logger.setLevel(logging.WARN)
688         logging.getLogger('arvados').setLevel(logging.WARN)
689         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
690
691     if arvargs.metrics:
692         metrics.setLevel(logging.DEBUG)
693         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
694
695     if arvargs.log_timestamps:
696         arvados.log_handler.setFormatter(logging.Formatter(
697             '%(asctime)s %(name)s %(levelname)s: %(message)s',
698             '%Y-%m-%d %H:%M:%S'))
699     else:
700         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
701
702     arvargs.conformance_test = None
703     arvargs.use_container = True
704     arvargs.relax_path_checks = True
705     arvargs.validate = None
706
707     return cwltool.main.main(args=arvargs,
708                              stdout=stdout,
709                              stderr=stderr,
710                              executor=runner.arv_executor,
711                              makeTool=runner.arv_make_tool,
712                              versionfunc=versionstring,
713                              job_order_object=job_order_object,
714                              make_fs_access=partial(CollectionFsAccess,
715                                                     api_client=api_client,
716                                                     keep_client=keep_client),
717                              fetcher_constructor=partial(CollectionFetcher,
718                                                          api_client=api_client,
719                                                          keep_client=keep_client,
720                                                          num_retries=runner.num_retries),
721                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
722                              logger_handler=arvados.log_handler)