Add build/run-build-packages-python-and-ruby.sh script to handle upload
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import arvados
7 import arvados.events
8 import arvados.commands.keepdocker
9 import arvados.commands.run
10 import arvados.collection
11 import arvados.util
12 import cwltool.draft2tool
13 import cwltool.workflow
14 import cwltool.main
15 from cwltool.process import shortname
16 from cwltool.errors import WorkflowException
17 import threading
18 import cwltool.docker
19 import fnmatch
20 import logging
21 import re
22 import os
23 import sys
24 import functools
25 import json
26 import pkg_resources  # part of setuptools
27
28 from cwltool.process import get_feature, adjustFiles, scandeps
29 from arvados.api import OrderedJsonModel
30
31 logger = logging.getLogger('arvados.cwl-runner')
32 logger.setLevel(logging.INFO)
33
34 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
35 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
36 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
37
38
39 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
40     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
41
42     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
43         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
44
45     sp = dockerRequirement["dockerImageId"].split(":")
46     image_name = sp[0]
47     image_tag = sp[1] if len(sp) > 1 else None
48
49     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
50                                                             image_name=image_name,
51                                                             image_tag=image_tag)
52
53     if not images:
54         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
55         args = ["--project-uuid="+project_uuid, image_name]
56         if image_tag:
57             args.append(image_tag)
58         logger.info("Uploading Docker image %s", ":".join(args[1:]))
59         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
60
61     return dockerRequirement["dockerImageId"]
62
63
64 class CollectionFsAccess(cwltool.process.StdFsAccess):
65     """Implement the cwltool FsAccess interface for Arvados Collections."""
66
67     def __init__(self, basedir):
68         self.collections = {}
69         self.basedir = basedir
70
71     def get_collection(self, path):
72         p = path.split("/")
73         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
74             pdh = p[0][5:]
75             if pdh not in self.collections:
76                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
77             return (self.collections[pdh], "/".join(p[1:]))
78         else:
79             return (None, path)
80
81     def _match(self, collection, patternsegments, parent):
82         if not patternsegments:
83             return []
84
85         if not isinstance(collection, arvados.collection.RichCollectionBase):
86             return []
87
88         ret = []
89         # iterate over the files and subcollections in 'collection'
90         for filename in collection:
91             if patternsegments[0] == '.':
92                 # Pattern contains something like "./foo" so just shift
93                 # past the "./"
94                 ret.extend(self._match(collection, patternsegments[1:], parent))
95             elif fnmatch.fnmatch(filename, patternsegments[0]):
96                 cur = os.path.join(parent, filename)
97                 if len(patternsegments) == 1:
98                     ret.append(cur)
99                 else:
100                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
101         return ret
102
103     def glob(self, pattern):
104         collection, rest = self.get_collection(pattern)
105         patternsegments = rest.split("/")
106         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
107
108     def open(self, fn, mode):
109         collection, rest = self.get_collection(fn)
110         if collection:
111             return collection.open(rest, mode)
112         else:
113             return open(self._abs(fn), mode)
114
115     def exists(self, fn):
116         collection, rest = self.get_collection(fn)
117         if collection:
118             return collection.exists(rest)
119         else:
120             return os.path.exists(self._abs(fn))
121
122 class ArvadosJob(object):
123     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
124
125     def __init__(self, runner):
126         self.arvrunner = runner
127         self.running = False
128
129     def run(self, dry_run=False, pull_image=True, **kwargs):
130         script_parameters = {
131             "command": self.command_line
132         }
133         runtime_constraints = {}
134
135         if self.generatefiles:
136             vwd = arvados.collection.Collection()
137             script_parameters["task.vwd"] = {}
138             for t in self.generatefiles:
139                 if isinstance(self.generatefiles[t], dict):
140                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
141                     vwd.copy(rest, t, source_collection=src)
142                 else:
143                     with vwd.open(t, "w") as f:
144                         f.write(self.generatefiles[t])
145             vwd.save_new()
146             for t in self.generatefiles:
147                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
148
149         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
150         if self.environment:
151             script_parameters["task.env"].update(self.environment)
152
153         if self.stdin:
154             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
155
156         if self.stdout:
157             script_parameters["task.stdout"] = self.stdout
158
159         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
160         if docker_req and kwargs.get("use_container") is not False:
161             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
162         else:
163             runtime_constraints["docker_image"] = "arvados/jobs"
164
165         resources = self.builder.resources
166         if resources is not None:
167             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
168             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
169             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
170
171         try:
172             response = self.arvrunner.api.jobs().create(body={
173                 "owner_uuid": self.arvrunner.project_uuid,
174                 "script": "crunchrunner",
175                 "repository": "arvados",
176                 "script_version": "master",
177                 "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
178                 "script_parameters": {"tasks": [script_parameters]},
179                 "runtime_constraints": runtime_constraints
180             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
181
182             self.arvrunner.jobs[response["uuid"]] = self
183
184             self.arvrunner.pipeline["components"][self.name] = {"job": response}
185             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
186                                                                                      body={
187                                                                                          "components": self.arvrunner.pipeline["components"]
188                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
189
190             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
191
192             if response["state"] in ("Complete", "Failed", "Cancelled"):
193                 self.done(response)
194         except Exception as e:
195             logger.error("Got error %s" % str(e))
196             self.output_callback({}, "permanentFail")
197
198     def update_pipeline_component(self, record):
199         self.arvrunner.pipeline["components"][self.name] = {"job": record}
200         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
201                                                                                  body={
202                                                                                     "components": self.arvrunner.pipeline["components"]
203                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
204
205     def done(self, record):
206         try:
207             self.update_pipeline_component(record)
208         except:
209             pass
210
211         try:
212             if record["state"] == "Complete":
213                 processStatus = "success"
214             else:
215                 processStatus = "permanentFail"
216
217             try:
218                 outputs = {}
219                 if record["output"]:
220                     logc = arvados.collection.Collection(record["log"])
221                     log = logc.open(logc.keys()[0])
222                     tmpdir = None
223                     outdir = None
224                     keepdir = None
225                     for l in log:
226                         # Determine the tmpdir, outdir and keepdir paths from
227                         # the job run.  Unfortunately, we can't take the first
228                         # values we find (which are expected to be near the
229                         # top) and stop scanning because if the node fails and
230                         # the job restarts on a different node these values
231                         # will different runs, and we need to know about the
232                         # final run that actually produced output.
233
234                         g = tmpdirre.match(l)
235                         if g:
236                             tmpdir = g.group(1)
237                         g = outdirre.match(l)
238                         if g:
239                             outdir = g.group(1)
240                         g = keepre.match(l)
241                         if g:
242                             keepdir = g.group(1)
243
244                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
245
246                     # check if collection already exists with same owner, name and content
247                     collection_exists = self.arvrunner.api.collections().list(
248                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
249                                  ['portable_data_hash', '=', record["output"]],
250                                  ["name", "=", colname]]
251                     ).execute(num_retries=self.arvrunner.num_retries)
252
253                     if not collection_exists["items"]:
254                         # Create a collection located in the same project as the
255                         # pipeline with the contents of the output.
256                         # First, get output record.
257                         collections = self.arvrunner.api.collections().list(
258                             limit=1,
259                             filters=[['portable_data_hash', '=', record["output"]]],
260                             select=["manifest_text"]
261                         ).execute(num_retries=self.arvrunner.num_retries)
262
263                         if not collections["items"]:
264                             raise WorkflowException(
265                                 "Job output '%s' cannot be found on API server" % (
266                                     record["output"]))
267
268                         # Create new collection in the parent project
269                         # with the output contents.
270                         self.arvrunner.api.collections().create(body={
271                             "owner_uuid": self.arvrunner.project_uuid,
272                             "name": colname,
273                             "portable_data_hash": record["output"],
274                             "manifest_text": collections["items"][0]["manifest_text"]
275                         }, ensure_unique_name=True).execute(
276                             num_retries=self.arvrunner.num_retries)
277
278                     self.builder.outdir = outdir
279                     self.builder.pathmapper.keepdir = keepdir
280                     outputs = self.collect_outputs("keep:" + record["output"])
281             except WorkflowException as e:
282                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
283                 processStatus = "permanentFail"
284             except Exception as e:
285                 logger.exception("Got unknown exception while collecting job outputs:")
286                 processStatus = "permanentFail"
287
288             self.output_callback(outputs, processStatus)
289         finally:
290             del self.arvrunner.jobs[record["uuid"]]
291
292
293 class RunnerJob(object):
294     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
295
296     def __init__(self, runner, tool, job_order, enable_reuse):
297         self.arvrunner = runner
298         self.tool = tool
299         self.job_order = job_order
300         self.running = False
301         self.enable_reuse = enable_reuse
302
303     def update_pipeline_component(self, record):
304         pass
305
306     def upload_docker(self, tool):
307         if isinstance(tool, cwltool.draft2tool.CommandLineTool):
308             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
309             if docker_req:
310                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
311         elif isinstance(tool, cwltool.workflow.Workflow):
312             for s in tool.steps:
313                 self.upload_docker(s.embedded_tool)
314
315     def run(self, dry_run=False, pull_image=True, **kwargs):
316         self.upload_docker(self.tool)
317
318         workflowfiles = set()
319         jobfiles = set()
320         workflowfiles.add(self.tool.tool["id"])
321
322         self.name = os.path.basename(self.tool.tool["id"])
323
324         def visitFiles(files, path):
325             files.add(path)
326             return path
327
328         document_loader, _, _ = cwltool.process.get_schema()
329         def loadref(b, u):
330             return document_loader.resolve_ref(u, base_url=b)[0]
331
332         sc = scandeps("", self.tool.tool,
333                       set(("$import", "run")),
334                       set(("$include", "$schemas", "path")),
335                       loadref)
336         adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
337         adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
338
339         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
340                                        "%s",
341                                        "%s/%s",
342                                        name=self.name,
343                                        **kwargs)
344
345         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
346                                   "%s",
347                                   "%s/%s",
348                                   name=os.path.basename(self.job_order.get("id", "#")),
349                                   **kwargs)
350
351         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
352
353         if "id" in self.job_order:
354             del self.job_order["id"]
355
356         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
357
358         response = self.arvrunner.api.jobs().create(body={
359             "script": "cwl-runner",
360             "script_version": "master",
361             "repository": "arvados",
362             "script_parameters": self.job_order,
363             "runtime_constraints": {
364                 "docker_image": "arvados/jobs"
365             }
366         }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
367
368         self.arvrunner.jobs[response["uuid"]] = self
369
370         logger.info("Submitted job %s", response["uuid"])
371
372         if response["state"] in ("Complete", "Failed", "Cancelled"):
373             self.done(response)
374
375     def done(self, record):
376         if record["state"] == "Complete":
377             processStatus = "success"
378         else:
379             processStatus = "permanentFail"
380
381         outputs = None
382         try:
383             try:
384                 outc = arvados.collection.Collection(record["output"])
385                 with outc.open("cwl.output.json") as f:
386                     outputs = json.load(f)
387             except Exception as e:
388                 logger.error("While getting final output object: %s", e)
389             self.arvrunner.output_callback(outputs, processStatus)
390         finally:
391             del self.arvrunner.jobs[record["uuid"]]
392
393 class ArvPathMapper(cwltool.pathmapper.PathMapper):
394     """Convert container-local paths to and from Keep collection ids."""
395
396     def __init__(self, arvrunner, referenced_files, basedir,
397                  collection_pattern, file_pattern, name=None, **kwargs):
398         self._pathmap = arvrunner.get_uploaded()
399         uploadfiles = set()
400
401         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
402
403         for src in referenced_files:
404             if isinstance(src, basestring) and pdh_path.match(src):
405                 self._pathmap[src] = (src, collection_pattern % src[5:])
406             if "#" in src:
407                 src = src[:src.index("#")]
408             if src not in self._pathmap:
409                 ab = cwltool.pathmapper.abspath(src, basedir)
410                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
411                 if kwargs.get("conformance_test"):
412                     self._pathmap[src] = (src, ab)
413                 elif isinstance(st, arvados.commands.run.UploadFile):
414                     uploadfiles.add((src, ab, st))
415                 elif isinstance(st, arvados.commands.run.ArvFile):
416                     self._pathmap[src] = (ab, st.fn)
417                 else:
418                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
419
420         if uploadfiles:
421             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
422                                              arvrunner.api,
423                                              dry_run=kwargs.get("dry_run"),
424                                              num_retries=3,
425                                              fnPattern=file_pattern,
426                                              name=name,
427                                              project=arvrunner.project_uuid)
428
429         for src, ab, st in uploadfiles:
430             arvrunner.add_uploaded(src, (ab, st.fn))
431             self._pathmap[src] = (ab, st.fn)
432
433         self.keepdir = None
434
435     def reversemap(self, target):
436         if target.startswith("keep:"):
437             return (target, target)
438         elif self.keepdir and target.startswith(self.keepdir):
439             return (target, "keep:" + target[len(self.keepdir)+1:])
440         else:
441             return super(ArvPathMapper, self).reversemap(target)
442
443
444 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
445     """Wrap cwltool CommandLineTool to override selected methods."""
446
447     def __init__(self, arvrunner, toolpath_object, **kwargs):
448         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
449         self.arvrunner = arvrunner
450
451     def makeJobRunner(self):
452         return ArvadosJob(self.arvrunner)
453
454     def makePathMapper(self, reffiles, input_basedir, **kwargs):
455         return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
456                              "$(task.keep)/%s",
457                              "$(task.keep)/%s/%s",
458                              **kwargs)
459
460
461 class ArvCwlRunner(object):
462     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
463     complete, and report output."""
464
465     def __init__(self, api_client):
466         self.api = api_client
467         self.jobs = {}
468         self.lock = threading.Lock()
469         self.cond = threading.Condition(self.lock)
470         self.final_output = None
471         self.uploaded = {}
472         self.num_retries = 4
473
474     def arvMakeTool(self, toolpath_object, **kwargs):
475         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
476             return ArvadosCommandTool(self, toolpath_object, **kwargs)
477         else:
478             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
479
480     def output_callback(self, out, processStatus):
481         if processStatus == "success":
482             logger.info("Overall job status is %s", processStatus)
483             if self.pipeline:
484                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
485                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
486
487         else:
488             logger.warn("Overall job status is %s", processStatus)
489             if self.pipeline:
490                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
491                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
492         self.final_output = out
493
494
495     def on_message(self, event):
496         if "object_uuid" in event:
497             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
498                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
499                     uuid = event["object_uuid"]
500                     with self.lock:
501                         j = self.jobs[uuid]
502                         logger.info("Job %s (%s) is Running", j.name, uuid)
503                         j.running = True
504                         j.update_pipeline_component(event["properties"]["new_attributes"])
505                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
506                     uuid = event["object_uuid"]
507                     try:
508                         self.cond.acquire()
509                         j = self.jobs[uuid]
510                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
511                         j.done(event["properties"]["new_attributes"])
512                         self.cond.notify()
513                     finally:
514                         self.cond.release()
515
516     def get_uploaded(self):
517         return self.uploaded.copy()
518
519     def add_uploaded(self, src, pair):
520         self.uploaded[src] = pair
521
522     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
523         self.debug = args.debug
524
525         if args.quiet:
526             logger.setLevel(logging.WARN)
527             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
528
529         useruuid = self.api.users().current().execute()["uuid"]
530         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
531         self.pipeline = None
532
533         if args.submit:
534             runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
535             if not args.wait:
536                 runnerjob.run()
537                 return
538
539         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
540
541         self.fs_access = CollectionFsAccess(input_basedir)
542
543         kwargs["fs_access"] = self.fs_access
544         kwargs["enable_reuse"] = args.enable_reuse
545
546         kwargs["outdir"] = "$(task.outdir)"
547         kwargs["tmpdir"] = "$(task.tmpdir)"
548
549         if kwargs.get("conformance_test"):
550             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
551         else:
552             if args.submit:
553                 jobiter = iter((runnerjob,))
554             else:
555                 components = {}
556                 if "cwl_runner_job" in kwargs:
557                     components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
558
559                 self.pipeline = self.api.pipeline_instances().create(
560                     body={
561                         "owner_uuid": self.project_uuid,
562                         "name": shortname(tool.tool["id"]),
563                         "components": components,
564                         "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
565
566                 logger.info("Pipeline instance %s", self.pipeline["uuid"])
567
568                 jobiter = tool.job(job_order,
569                                    input_basedir,
570                                    self.output_callback,
571                                    docker_outdir="$(task.outdir)",
572                                    **kwargs)
573
574             try:
575                 self.cond.acquire()
576                 # Will continue to hold the lock for the duration of this code
577                 # except when in cond.wait(), at which point on_message can update
578                 # job state and process output callbacks.
579
580                 for runnable in jobiter:
581                     if runnable:
582                         runnable.run(**kwargs)
583                     else:
584                         if self.jobs:
585                             self.cond.wait(1)
586                         else:
587                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
588                             break
589
590                 while self.jobs:
591                     self.cond.wait(1)
592
593                 events.close()
594
595                 if self.final_output is None:
596                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
597
598                 # create final output collection
599             except:
600                 if sys.exc_info()[0] is KeyboardInterrupt:
601                     logger.error("Interrupted, marking pipeline as failed")
602                 else:
603                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
604                 if self.pipeline:
605                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
606                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
607             finally:
608                 self.cond.release()
609
610             return self.final_output
611
612 def versionstring():
613     """Print version string of key packages for provenance and debugging."""
614
615     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
616     arvpkg = pkg_resources.require("arvados-python-client")
617     cwlpkg = pkg_resources.require("cwltool")
618
619     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
620                                     "arvados-python-client", arvpkg[0].version,
621                                     "cwltool", cwlpkg[0].version)
622
623 def main(args, stdout, stderr, api_client=None):
624     args.insert(0, "--leave-outputs")
625     parser = cwltool.main.arg_parser()
626
627     exgroup = parser.add_mutually_exclusive_group()
628     exgroup.add_argument("--enable-reuse", action="store_true",
629                         default=True, dest="enable_reuse",
630                         help="")
631     exgroup.add_argument("--disable-reuse", action="store_false",
632                         default=True, dest="enable_reuse",
633                         help="")
634
635     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
636
637     exgroup = parser.add_mutually_exclusive_group()
638     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
639                         default=True, dest="submit")
640     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
641                         default=True, dest="submit")
642
643     exgroup = parser.add_mutually_exclusive_group()
644     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
645                         default=True, dest="wait")
646     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
647                         default=True, dest="wait")
648
649     try:
650         if api_client is None:
651             api_client=arvados.api('v1', model=OrderedJsonModel())
652         runner = ArvCwlRunner(api_client)
653     except Exception as e:
654         logger.error(e)
655         return 1
656
657     return cwltool.main.main(args,
658                              stdout=stdout,
659                              stderr=stderr,
660                              executor=runner.arvExecutor,
661                              makeTool=runner.arvMakeTool,
662                              parser=parser,
663                              versionfunc=versionstring)