Merge branch 'master' into 10797-ruby-2.3
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import schema_salad
22 from schema_salad.sourceline import SourceLine
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
38
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
44
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
48
49 arvados.log_handler.setFormatter(logging.Formatter(
50         '%(asctime)s %(name)s %(levelname)s: %(message)s',
51         '%Y-%m-%d %H:%M:%S'))
52
53 class ArvCwlRunner(object):
54     """Execute a CWL tool or workflow, submit work (using either jobs or
55     containers API), wait for them to complete, and report output.
56
57     """
58
59     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
60         self.api = api_client
61         self.processes = {}
62         self.lock = threading.Lock()
63         self.cond = threading.Condition(self.lock)
64         self.final_output = None
65         self.final_status = None
66         self.uploaded = {}
67         self.num_retries = num_retries
68         self.uuid = None
69         self.stop_polling = threading.Event()
70         self.poll_api = None
71         self.pipeline = None
72         self.final_output_collection = None
73         self.output_name = output_name
74         self.output_tags = output_tags
75         self.project_uuid = None
76
77         if keep_client is not None:
78             self.keep_client = keep_client
79         else:
80             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
81
82         self.work_api = None
83         expected_api = ["jobs", "containers"]
84         for api in expected_api:
85             try:
86                 methods = self.api._rootDesc.get('resources')[api]['methods']
87                 if ('httpMethod' in methods['create'] and
88                     (work_api == api or work_api is None)):
89                     self.work_api = api
90                     break
91             except KeyError:
92                 pass
93
94         if not self.work_api:
95             if work_api is None:
96                 raise Exception("No supported APIs")
97             else:
98                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
99
100     def arv_make_tool(self, toolpath_object, **kwargs):
101         kwargs["work_api"] = self.work_api
102         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
103                                                 api_client=self.api,
104                                                 keep_client=self.keep_client)
105         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106             return ArvadosCommandTool(self, toolpath_object, **kwargs)
107         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108             return ArvadosWorkflow(self, toolpath_object, **kwargs)
109         else:
110             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
111
112     def output_callback(self, out, processStatus):
113         if processStatus == "success":
114             logger.info("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
118         else:
119             logger.warn("Overall process status is %s", processStatus)
120             if self.pipeline:
121                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
123         self.final_status = processStatus
124         self.final_output = out
125
126     def on_message(self, event):
127         if "object_uuid" in event:
128             if event["object_uuid"] in self.processes and event["event_type"] == "update":
129                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130                     uuid = event["object_uuid"]
131                     with self.lock:
132                         j = self.processes[uuid]
133                         logger.info("%s %s is Running", self.label(j), uuid)
134                         j.running = True
135                         j.update_pipeline_component(event["properties"]["new_attributes"])
136                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137                     uuid = event["object_uuid"]
138                     try:
139                         self.cond.acquire()
140                         j = self.processes[uuid]
141                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142                         with Perf(metrics, "done %s" % j.name):
143                             j.done(event["properties"]["new_attributes"])
144                         self.cond.notify()
145                     finally:
146                         self.cond.release()
147
148     def label(self, obj):
149         return "[%s %s]" % (self.work_api[0:-1], obj.name)
150
151     def poll_states(self):
152         """Poll status of jobs or containers listed in the processes dict.
153
154         Runs in a separate thread.
155         """
156
157         try:
158             while True:
159                 self.stop_polling.wait(15)
160                 if self.stop_polling.is_set():
161                     break
162                 with self.lock:
163                     keys = self.processes.keys()
164                 if not keys:
165                     continue
166
167                 if self.work_api == "containers":
168                     table = self.poll_api.container_requests()
169                 elif self.work_api == "jobs":
170                     table = self.poll_api.jobs()
171
172                 try:
173                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174                 except Exception as e:
175                     logger.warn("Error checking states on API server: %s", e)
176                     continue
177
178                 for p in proc_states["items"]:
179                     self.on_message({
180                         "object_uuid": p["uuid"],
181                         "event_type": "update",
182                         "properties": {
183                             "new_attributes": p
184                         }
185                     })
186         except:
187             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
188             self.cond.acquire()
189             self.processes.clear()
190             self.cond.notify()
191             self.cond.release()
192         finally:
193             self.stop_polling.set()
194
195     def get_uploaded(self):
196         return self.uploaded.copy()
197
198     def add_uploaded(self, src, pair):
199         self.uploaded[src] = pair
200
201     def check_features(self, obj):
202         if isinstance(obj, dict):
203             if obj.get("class") == "InitialWorkDirRequirement":
204                 if self.work_api == "containers":
205                     raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206             if obj.get("writable"):
207                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208             if obj.get("class") == "CommandLineTool":
209                 if self.work_api == "containers":
210                     if obj.get("stdin"):
211                         raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212                     if obj.get("stderr"):
213                         raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214             if obj.get("class") == "DockerRequirement":
215                 if obj.get("dockerOutputDirectory"):
216                     # TODO: can be supported by containers API, but not jobs API.
217                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
218                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
219             for v in obj.itervalues():
220                 self.check_features(v)
221         elif isinstance(obj, list):
222             for i,v in enumerate(obj):
223                 with SourceLine(obj, i, UnsupportedRequirement):
224                     self.check_features(v)
225
226     def make_output_collection(self, name, tagsString, outputObj):
227         outputObj = copy.deepcopy(outputObj)
228
229         files = []
230         def capture(fileobj):
231             files.append(fileobj)
232
233         adjustDirObjs(outputObj, capture)
234         adjustFileObjs(outputObj, capture)
235
236         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
237
238         final = arvados.collection.Collection(api_client=self.api,
239                                               keep_client=self.keep_client,
240                                               num_retries=self.num_retries)
241
242         srccollections = {}
243         for k,v in generatemapper.items():
244             if k.startswith("_:"):
245                 if v.type == "Directory":
246                     continue
247                 if v.type == "CreateFile":
248                     with final.open(v.target, "wb") as f:
249                         f.write(v.resolved.encode("utf-8"))
250                     continue
251
252             if not k.startswith("keep:"):
253                 raise Exception("Output source is not in keep or a literal")
254             sp = k.split("/")
255             srccollection = sp[0][5:]
256             if srccollection not in srccollections:
257                 try:
258                     srccollections[srccollection] = arvados.collection.CollectionReader(
259                         srccollection,
260                         api_client=self.api,
261                         keep_client=self.keep_client,
262                         num_retries=self.num_retries)
263                 except arvados.errors.ArgumentError as e:
264                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
265                     raise
266             reader = srccollections[srccollection]
267             try:
268                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
269                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
270             except IOError as e:
271                 logger.warn("While preparing output collection: %s", e)
272
273         def rewrite(fileobj):
274             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
275             for k in ("basename", "listing", "contents"):
276                 if k in fileobj:
277                     del fileobj[k]
278
279         adjustDirObjs(outputObj, rewrite)
280         adjustFileObjs(outputObj, rewrite)
281
282         with final.open("cwl.output.json", "w") as f:
283             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
284
285         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
286
287         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
288                     final.api_response()["name"],
289                     final.manifest_locator())
290
291         final_uuid = final.manifest_locator()
292         tags = tagsString.split(',')
293         for tag in tags:
294              self.api.links().create(body={
295                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
296                 }).execute(num_retries=self.num_retries)
297
298         def finalcollection(fileobj):
299             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
300
301         adjustDirObjs(outputObj, finalcollection)
302         adjustFileObjs(outputObj, finalcollection)
303
304         return (outputObj, final)
305
306     def set_crunch_output(self):
307         if self.work_api == "containers":
308             try:
309                 current = self.api.containers().current().execute(num_retries=self.num_retries)
310             except ApiError as e:
311                 # Status code 404 just means we're not running in a container.
312                 if e.resp.status != 404:
313                     logger.info("Getting current container: %s", e)
314                 return
315             try:
316                 self.api.containers().update(uuid=current['uuid'],
317                                              body={
318                                                  'output': self.final_output_collection.portable_data_hash(),
319                                              }).execute(num_retries=self.num_retries)
320             except Exception as e:
321                 logger.info("Setting container output: %s", e)
322         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
323             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
324                                    body={
325                                        'output': self.final_output_collection.portable_data_hash(),
326                                        'success': self.final_status == "success",
327                                        'progress':1.0
328                                    }).execute(num_retries=self.num_retries)
329
330     def arv_executor(self, tool, job_order, **kwargs):
331         self.debug = kwargs.get("debug")
332
333         tool.visit(self.check_features)
334
335         self.project_uuid = kwargs.get("project_uuid")
336         self.pipeline = None
337         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
338                                                                  api_client=self.api,
339                                                                  keep_client=self.keep_client)
340         self.fs_access = make_fs_access(kwargs["basedir"])
341
342         if not kwargs.get("name"):
343             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
344
345         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
346         # Also uploads docker images.
347         upload_workflow_deps(self, tool)
348
349         # Reload tool object which may have been updated by
350         # upload_workflow_deps
351         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
352                                   makeTool=self.arv_make_tool,
353                                   loader=tool.doc_loader,
354                                   avsc_names=tool.doc_schema,
355                                   metadata=tool.metadata)
356
357         # Upload local file references in the job order.
358         job_order = upload_job_order(self, "%s input" % kwargs["name"],
359                                      tool, job_order)
360
361         existing_uuid = kwargs.get("update_workflow")
362         if existing_uuid or kwargs.get("create_workflow"):
363             # Create a pipeline template or workflow record and exit.
364             if self.work_api == "jobs":
365                 tmpl = RunnerTemplate(self, tool, job_order,
366                                       kwargs.get("enable_reuse"),
367                                       uuid=existing_uuid,
368                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
369                                       name=kwargs["name"])
370                 tmpl.save()
371                 # cwltool.main will write our return value to stdout.
372                 return (tmpl.uuid, "success")
373             elif self.work_api == "containers":
374                 return (upload_workflow(self, tool, job_order,
375                                         self.project_uuid,
376                                         uuid=existing_uuid,
377                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
378                                         name=kwargs["name"]),
379                         "success")
380
381         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
382
383         kwargs["make_fs_access"] = make_fs_access
384         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
385         kwargs["use_container"] = True
386         kwargs["tmpdir_prefix"] = "tmp"
387         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
388
389         if self.work_api == "containers":
390             kwargs["outdir"] = "/var/spool/cwl"
391             kwargs["docker_outdir"] = "/var/spool/cwl"
392             kwargs["tmpdir"] = "/tmp"
393             kwargs["docker_tmpdir"] = "/tmp"
394         elif self.work_api == "jobs":
395             kwargs["outdir"] = "$(task.outdir)"
396             kwargs["docker_outdir"] = "$(task.outdir)"
397             kwargs["tmpdir"] = "$(task.tmpdir)"
398
399         runnerjob = None
400         if kwargs.get("submit"):
401             # Submit a runner job to run the workflow for us.
402             if self.work_api == "containers":
403                 if tool.tool["class"] == "CommandLineTool":
404                     kwargs["runnerjob"] = tool.tool["id"]
405                     upload_dependencies(self,
406                                         kwargs["name"],
407                                         tool.doc_loader,
408                                         tool.tool,
409                                         tool.tool["id"],
410                                         False)
411                     runnerjob = tool.job(job_order,
412                                          self.output_callback,
413                                          **kwargs).next()
414                 else:
415                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
416                                                 self.output_name,
417                                                 self.output_tags,
418                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
419                                                 name=kwargs.get("name"),
420                                                 on_error=kwargs.get("on_error"),
421                                                 submit_runner_image=kwargs.get("submit_runner_image"))
422             elif self.work_api == "jobs":
423                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
424                                       self.output_name,
425                                       self.output_tags,
426                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
427                                       name=kwargs.get("name"),
428                                       on_error=kwargs.get("on_error"),
429                                       submit_runner_image=kwargs.get("submit_runner_image"))
430
431         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
432             # Create pipeline for local run
433             self.pipeline = self.api.pipeline_instances().create(
434                 body={
435                     "owner_uuid": self.project_uuid,
436                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
437                     "components": {},
438                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
439             logger.info("Pipeline instance %s", self.pipeline["uuid"])
440
441         if runnerjob and not kwargs.get("wait"):
442             runnerjob.run(wait=kwargs.get("wait"))
443             return (runnerjob.uuid, "success")
444
445         self.poll_api = arvados.api('v1')
446         self.polling_thread = threading.Thread(target=self.poll_states)
447         self.polling_thread.start()
448
449         if runnerjob:
450             jobiter = iter((runnerjob,))
451         else:
452             if "cwl_runner_job" in kwargs:
453                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
454             jobiter = tool.job(job_order,
455                                self.output_callback,
456                                **kwargs)
457
458         try:
459             self.cond.acquire()
460             # Will continue to hold the lock for the duration of this code
461             # except when in cond.wait(), at which point on_message can update
462             # job state and process output callbacks.
463
464             loopperf = Perf(metrics, "jobiter")
465             loopperf.__enter__()
466             for runnable in jobiter:
467                 loopperf.__exit__()
468
469                 if self.stop_polling.is_set():
470                     break
471
472                 if runnable:
473                     with Perf(metrics, "run"):
474                         runnable.run(**kwargs)
475                 else:
476                     if self.processes:
477                         self.cond.wait(1)
478                     else:
479                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
480                         break
481                 loopperf.__enter__()
482             loopperf.__exit__()
483
484             while self.processes:
485                 self.cond.wait(1)
486
487         except UnsupportedRequirement:
488             raise
489         except:
490             if sys.exc_info()[0] is KeyboardInterrupt:
491                 logger.error("Interrupted, marking pipeline as failed")
492             else:
493                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
494             if self.pipeline:
495                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
496                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
497             if runnerjob and runnerjob.uuid and self.work_api == "containers":
498                 self.api.container_requests().update(uuid=runnerjob.uuid,
499                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
500         finally:
501             self.cond.release()
502             self.stop_polling.set()
503             self.polling_thread.join()
504
505         if self.final_status == "UnsupportedRequirement":
506             raise UnsupportedRequirement("Check log for details.")
507
508         if self.final_output is None:
509             raise WorkflowException("Workflow did not return a result.")
510
511         if kwargs.get("submit") and isinstance(runnerjob, Runner):
512             logger.info("Final output collection %s", runnerjob.final_output)
513         else:
514             if self.output_name is None:
515                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
516             if self.output_tags is None:
517                 self.output_tags = ""
518             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
519             self.set_crunch_output()
520
521         if kwargs.get("compute_checksum"):
522             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
523             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
524
525         return (self.final_output, self.final_status)
526
527
528 def versionstring():
529     """Print version string of key packages for provenance and debugging."""
530
531     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
532     arvpkg = pkg_resources.require("arvados-python-client")
533     cwlpkg = pkg_resources.require("cwltool")
534
535     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
536                                     "arvados-python-client", arvpkg[0].version,
537                                     "cwltool", cwlpkg[0].version)
538
539
540 def arg_parser():  # type: () -> argparse.ArgumentParser
541     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
542
543     parser.add_argument("--basedir", type=str,
544                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
545     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
546                         help="Output directory, default current directory")
547
548     parser.add_argument("--eval-timeout",
549                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
550                         type=float,
551                         default=20)
552     parser.add_argument("--version", action="store_true", help="Print version and exit")
553
554     exgroup = parser.add_mutually_exclusive_group()
555     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
556     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
557     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
558
559     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
560
561     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
562
563     exgroup = parser.add_mutually_exclusive_group()
564     exgroup.add_argument("--enable-reuse", action="store_true",
565                         default=True, dest="enable_reuse",
566                         help="")
567     exgroup.add_argument("--disable-reuse", action="store_false",
568                         default=True, dest="enable_reuse",
569                         help="")
570
571     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
572     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
573     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
574     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
575                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
576                         default=False)
577
578     exgroup = parser.add_mutually_exclusive_group()
579     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
580                         default=True, dest="submit")
581     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
582                         default=True, dest="submit")
583     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
584                          dest="create_workflow")
585     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
586     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
587
588     exgroup = parser.add_mutually_exclusive_group()
589     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
590                         default=True, dest="wait")
591     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
592                         default=True, dest="wait")
593
594     exgroup = parser.add_mutually_exclusive_group()
595     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
596                         default=True, dest="log_timestamps")
597     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
598                         default=True, dest="log_timestamps")
599
600     parser.add_argument("--api", type=str,
601                         default=None, dest="work_api",
602                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
603
604     parser.add_argument("--compute-checksum", action="store_true", default=False,
605                         help="Compute checksum of contents while collecting outputs",
606                         dest="compute_checksum")
607
608     parser.add_argument("--submit-runner-ram", type=int,
609                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
610                         default=1024)
611
612     parser.add_argument("--submit-runner-image", type=str,
613                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
614                         default=None)
615
616     parser.add_argument("--name", type=str,
617                         help="Name to use for workflow execution instance.",
618                         default=None)
619
620     parser.add_argument("--on-error", type=str,
621                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
622                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
623
624     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
625     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
626
627     return parser
628
629 def add_arv_hints():
630     cache = {}
631     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
632     cache["http://arvados.org/cwl"] = res.read()
633     res.close()
634     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
635     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
636     for n in extnames.names:
637         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
638             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
639         document_loader.idx["http://arvados.org/cwl#"+n] = {}
640
641 def main(args, stdout, stderr, api_client=None, keep_client=None):
642     parser = arg_parser()
643
644     job_order_object = None
645     arvargs = parser.parse_args(args)
646
647     if arvargs.version:
648         print versionstring()
649         return
650
651     if arvargs.update_workflow:
652         if arvargs.update_workflow.find('-7fd4e-') == 5:
653             want_api = 'containers'
654         elif arvargs.update_workflow.find('-p5p6p-') == 5:
655             want_api = 'jobs'
656         else:
657             want_api = None
658         if want_api and arvargs.work_api and want_api != arvargs.work_api:
659             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
660                 arvargs.update_workflow, want_api, arvargs.work_api))
661             return 1
662         arvargs.work_api = want_api
663
664     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
665         job_order_object = ({}, "")
666
667     add_arv_hints()
668
669     try:
670         if api_client is None:
671             api_client=arvados.api('v1', model=OrderedJsonModel())
672         if keep_client is None:
673             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
674         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
675                               num_retries=4, output_name=arvargs.output_name,
676                               output_tags=arvargs.output_tags)
677     except Exception as e:
678         logger.error(e)
679         return 1
680
681     if arvargs.debug:
682         logger.setLevel(logging.DEBUG)
683         logging.getLogger('arvados').setLevel(logging.DEBUG)
684
685     if arvargs.quiet:
686         logger.setLevel(logging.WARN)
687         logging.getLogger('arvados').setLevel(logging.WARN)
688         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
689
690     if arvargs.metrics:
691         metrics.setLevel(logging.DEBUG)
692         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
693
694     if arvargs.log_timestamps:
695         arvados.log_handler.setFormatter(logging.Formatter(
696             '%(asctime)s %(name)s %(levelname)s: %(message)s',
697             '%Y-%m-%d %H:%M:%S'))
698     else:
699         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
700
701     arvargs.conformance_test = None
702     arvargs.use_container = True
703     arvargs.relax_path_checks = True
704     arvargs.validate = None
705
706     return cwltool.main.main(args=arvargs,
707                              stdout=stdout,
708                              stderr=stderr,
709                              executor=runner.arv_executor,
710                              makeTool=runner.arv_make_tool,
711                              versionfunc=versionstring,
712                              job_order_object=job_order_object,
713                              make_fs_access=partial(CollectionFsAccess,
714                                                     api_client=api_client,
715                                                     keep_client=keep_client),
716                              fetcher_constructor=partial(CollectionFetcher,
717                                                          api_client=api_client,
718                                                          keep_client=keep_client,
719                                                          num_retries=runner.num_retries),
720                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
721                              logger_handler=arvados.log_handler)