Merge branch '8815-crunchrunner-everywhere' into 8654-arv-jobs-cwl-runner
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 from cwltool.errors import WorkflowException
15 import threading
16 import cwltool.docker
17 import fnmatch
18 import logging
19 import re
20 import os
21 import sys
22 import functools
23 import json
24 import pkg_resources  # part of setuptools
25
26 from cwltool.process import get_feature, adjustFiles, scandeps
27 from arvados.api import OrderedJsonModel
28
29 logger = logging.getLogger('arvados.cwl-runner')
30 logger.setLevel(logging.INFO)
31
32 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
33 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
34 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
35
36
37 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
38     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
39         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
40
41     sp = dockerRequirement["dockerImageId"].split(":")
42     image_name = sp[0]
43     image_tag = sp[1] if len(sp) > 1 else None
44
45     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
46                                                             image_name=image_name,
47                                                             image_tag=image_tag)
48
49     if not images:
50         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
51         args = ["--project-uuid="+project_uuid, image_name]
52         if image_tag:
53             args.append(image_tag)
54         logger.info("Uploading Docker image %s", ":".join(args[1:]))
55         arvados.commands.keepdocker.main(args)
56
57     return dockerRequirement["dockerImageId"]
58
59
60 class CollectionFsAccess(cwltool.process.StdFsAccess):
61     def __init__(self, basedir):
62         self.collections = {}
63         self.basedir = basedir
64
65     def get_collection(self, path):
66         p = path.split("/")
67         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
68             pdh = p[0][5:]
69             if pdh not in self.collections:
70                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
71             return (self.collections[pdh], "/".join(p[1:]))
72         else:
73             return (None, path)
74
75     def _match(self, collection, patternsegments, parent):
76         if not patternsegments:
77             return []
78
79         if not isinstance(collection, arvados.collection.RichCollectionBase):
80             return []
81
82         ret = []
83         # iterate over the files and subcollections in 'collection'
84         for filename in collection:
85             if patternsegments[0] == '.':
86                 # Pattern contains something like "./foo" so just shift
87                 # past the "./"
88                 ret.extend(self._match(collection, patternsegments[1:], parent))
89             elif fnmatch.fnmatch(filename, patternsegments[0]):
90                 cur = os.path.join(parent, filename)
91                 if len(patternsegments) == 1:
92                     ret.append(cur)
93                 else:
94                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
95         return ret
96
97     def glob(self, pattern):
98         collection, rest = self.get_collection(pattern)
99         patternsegments = rest.split("/")
100         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
101
102     def open(self, fn, mode):
103         collection, rest = self.get_collection(fn)
104         if collection:
105             return collection.open(rest, mode)
106         else:
107             return open(self._abs(fn), mode)
108
109     def exists(self, fn):
110         collection, rest = self.get_collection(fn)
111         if collection:
112             return collection.exists(rest)
113         else:
114             return os.path.exists(self._abs(fn))
115
116 class ArvadosJob(object):
117     def __init__(self, runner):
118         self.arvrunner = runner
119         self.running = False
120
121     def run(self, dry_run=False, pull_image=True, **kwargs):
122         script_parameters = {
123             "command": self.command_line
124         }
125         runtime_constraints = {}
126
127         if self.generatefiles:
128             vwd = arvados.collection.Collection()
129             script_parameters["task.vwd"] = {}
130             for t in self.generatefiles:
131                 if isinstance(self.generatefiles[t], dict):
132                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
133                     vwd.copy(rest, t, source_collection=src)
134                 else:
135                     with vwd.open(t, "w") as f:
136                         f.write(self.generatefiles[t])
137             vwd.save_new()
138             for t in self.generatefiles:
139                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
140
141         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
142         if self.environment:
143             script_parameters["task.env"].update(self.environment)
144
145         if self.stdin:
146             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
147
148         if self.stdout:
149             script_parameters["task.stdout"] = self.stdout
150
151         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
152         if docker_req and kwargs.get("use_container") is not False:
153             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
154         else:
155             runtime_constraints["docker_image"] = "arvados/jobs"
156
157         resources = self.builder.resources
158         if resources is not None:
159             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
160             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
161             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
162
163         try:
164             response = self.arvrunner.api.jobs().create(body={
165                 "owner_uuid": self.arvrunner.project_uuid,
166                 "script": "crunchrunner",
167                 "repository": "arvados",
168                 "script_version": "master",
169                 "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
170                 "script_parameters": {"tasks": [script_parameters]},
171                 "runtime_constraints": runtime_constraints
172             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
173
174             self.arvrunner.jobs[response["uuid"]] = self
175
176             self.arvrunner.pipeline["components"][self.name] = {"job": response}
177             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
178                                                                                      body={
179                                                                                          "components": self.arvrunner.pipeline["components"]
180                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
181
182             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
183
184             if response["state"] in ("Complete", "Failed", "Cancelled"):
185                 self.done(response)
186         except Exception as e:
187             logger.error("Got error %s" % str(e))
188             self.output_callback({}, "permanentFail")
189
190     def update_pipeline_component(self, record):
191         self.arvrunner.pipeline["components"][self.name] = {"job": record}
192         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
193                                                                                  body={
194                                                                                     "components": self.arvrunner.pipeline["components"]
195                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
196
197     def done(self, record):
198         try:
199             self.update_pipeline_component(record)
200         except:
201             pass
202
203         try:
204             if record["state"] == "Complete":
205                 processStatus = "success"
206             else:
207                 processStatus = "permanentFail"
208
209             try:
210                 outputs = {}
211                 if record["output"]:
212                     logc = arvados.collection.Collection(record["log"])
213                     log = logc.open(logc.keys()[0])
214                     tmpdir = None
215                     outdir = None
216                     keepdir = None
217                     for l in log:
218                         # Determine the tmpdir, outdir and keepdir paths from
219                         # the job run.  Unfortunately, we can't take the first
220                         # values we find (which are expected to be near the
221                         # top) and stop scanning because if the node fails and
222                         # the job restarts on a different node these values
223                         # will different runs, and we need to know about the
224                         # final run that actually produced output.
225
226                         g = tmpdirre.match(l)
227                         if g:
228                             tmpdir = g.group(1)
229                         g = outdirre.match(l)
230                         if g:
231                             outdir = g.group(1)
232                         g = keepre.match(l)
233                         if g:
234                             keepdir = g.group(1)
235
236                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
237
238                     # check if collection already exists with same owner, name and content
239                     collection_exists = self.arvrunner.api.collections().list(
240                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
241                                  ['portable_data_hash', '=', record["output"]],
242                                  ["name", "=", colname]]
243                     ).execute(num_retries=self.arvrunner.num_retries)
244
245                     if not collection_exists["items"]:
246                         # Create a collection located in the same project as the
247                         # pipeline with the contents of the output.
248                         # First, get output record.
249                         collections = self.arvrunner.api.collections().list(
250                             limit=1,
251                             filters=[['portable_data_hash', '=', record["output"]]],
252                             select=["manifest_text"]
253                         ).execute(num_retries=self.arvrunner.num_retries)
254
255                         if not collections["items"]:
256                             raise WorkflowException(
257                                 "Job output '%s' cannot be found on API server" % (
258                                     record["output"]))
259
260                         # Create new collection in the parent project
261                         # with the output contents.
262                         self.arvrunner.api.collections().create(body={
263                             "owner_uuid": self.arvrunner.project_uuid,
264                             "name": colname,
265                             "portable_data_hash": record["output"],
266                             "manifest_text": collections["items"][0]["manifest_text"]
267                         }, ensure_unique_name=True).execute(
268                             num_retries=self.arvrunner.num_retries)
269
270                     self.builder.outdir = outdir
271                     self.builder.pathmapper.keepdir = keepdir
272                     outputs = self.collect_outputs("keep:" + record["output"])
273             except WorkflowException as e:
274                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
275                 processStatus = "permanentFail"
276             except Exception as e:
277                 logger.exception("Got unknown exception while collecting job outputs:")
278                 processStatus = "permanentFail"
279
280             self.output_callback(outputs, processStatus)
281         finally:
282             del self.arvrunner.jobs[record["uuid"]]
283
284
285 class RunnerJob(object):
286     def __init__(self, runner, tool, job_order, enable_reuse):
287         self.arvrunner = runner
288         self.tool = tool
289         self.job_order = job_order
290         self.running = False
291         self.enable_reuse = enable_reuse
292
293     def update_pipeline_component(self, record):
294         pass
295
296     def upload_docker(self, tool):
297         if isinstance(tool, cwltool.draft2tool.CommandLineTool):
298             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
299             if docker_req:
300                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
301         elif isinstance(tool, cwltool.workflow.Workflow):
302             for s in tool.steps:
303                 self.upload_docker(s.embedded_tool)
304
305     def run(self, dry_run=False, pull_image=True, **kwargs):
306         self.upload_docker(self.tool)
307
308         workflowfiles = set()
309         jobfiles = set()
310         workflowfiles.add(self.tool.tool["id"])
311
312         self.name = os.path.basename(self.tool.tool["id"])
313
314         def visitFiles(files, path):
315             files.add(path)
316             return path
317
318         document_loader, _, _ = cwltool.process.get_schema()
319         def loadref(b, u):
320             return document_loader.resolve_ref(u, base_url=b)[0]
321
322         sc = scandeps("", self.tool.tool,
323                       set(("$import", "run")),
324                       set(("$include", "$schemas", "path")),
325                       loadref)
326         adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
327         adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
328
329         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
330                                        "%s",
331                                        "%s/%s",
332                                        name=self.name,
333                                        **kwargs)
334
335         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
336                                   "%s",
337                                   "%s/%s",
338                                   name=os.path.basename(self.job_order.get("id", "#")),
339                                   **kwargs)
340
341         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
342
343         if "id" in self.job_order:
344             del self.job_order["id"]
345
346         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
347
348         response = self.arvrunner.api.jobs().create(body={
349             "script": "cwl-runner",
350             "script_version": "8654-arv-jobs-cwl-runner",
351             "repository": "arvados",
352             "script_parameters": self.job_order,
353             "runtime_constraints": {
354                 "docker_image": "arvados/jobs"
355             }
356         }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
357
358         self.arvrunner.jobs[response["uuid"]] = self
359
360         logger.info("Submitted job %s", response["uuid"])
361
362         if response["state"] in ("Complete", "Failed", "Cancelled"):
363             self.done(response)
364
365     def done(self, record):
366         if record["state"] == "Complete":
367             processStatus = "success"
368         else:
369             processStatus = "permanentFail"
370
371         outputs = None
372         try:
373             try:
374                 outc = arvados.collection.Collection(record["output"])
375                 with outc.open("cwl.output.json") as f:
376                     outputs = json.load(f)
377             except Exception as e:
378                 logger.error("While getting final output object: %s", e)
379             self.arvrunner.output_callback(outputs, processStatus)
380         finally:
381             del self.arvrunner.jobs[record["uuid"]]
382
383 class ArvPathMapper(cwltool.pathmapper.PathMapper):
384     def __init__(self, arvrunner, referenced_files, basedir,
385                  collection_pattern, file_pattern, name=None, **kwargs):
386         self._pathmap = arvrunner.get_uploaded()
387         uploadfiles = set()
388
389         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
390
391         for src in referenced_files:
392             if isinstance(src, basestring) and pdh_path.match(src):
393                 self._pathmap[src] = (src, collection_pattern % src[5:])
394             if "#" in src:
395                 src = src[:src.index("#")]
396             if src not in self._pathmap:
397                 ab = cwltool.pathmapper.abspath(src, basedir)
398                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
399                 if kwargs.get("conformance_test"):
400                     self._pathmap[src] = (src, ab)
401                 elif isinstance(st, arvados.commands.run.UploadFile):
402                     uploadfiles.add((src, ab, st))
403                 elif isinstance(st, arvados.commands.run.ArvFile):
404                     self._pathmap[src] = (ab, st.fn)
405                 else:
406                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
407
408         if uploadfiles:
409             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
410                                              arvrunner.api,
411                                              dry_run=kwargs.get("dry_run"),
412                                              num_retries=3,
413                                              fnPattern=file_pattern,
414                                              name=name,
415                                              project=arvrunner.project_uuid)
416
417         for src, ab, st in uploadfiles:
418             arvrunner.add_uploaded(src, (ab, st.fn))
419             self._pathmap[src] = (ab, st.fn)
420
421         self.keepdir = None
422
423     def reversemap(self, target):
424         if target.startswith("keep:"):
425             return (target, target)
426         elif self.keepdir and target.startswith(self.keepdir):
427             return (target, "keep:" + target[len(self.keepdir)+1:])
428         else:
429             return super(ArvPathMapper, self).reversemap(target)
430
431
432 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
433     def __init__(self, arvrunner, toolpath_object, **kwargs):
434         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
435         self.arvrunner = arvrunner
436
437     def makeJobRunner(self):
438         return ArvadosJob(self.arvrunner)
439
440     def makePathMapper(self, reffiles, input_basedir, **kwargs):
441         return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
442                              "$(task.keep)/%s",
443                              "$(task.keep)/%s/%s",
444                              **kwargs)
445
446
447 class ArvCwlRunner(object):
448     def __init__(self, api_client):
449         self.api = api_client
450         self.jobs = {}
451         self.lock = threading.Lock()
452         self.cond = threading.Condition(self.lock)
453         self.final_output = None
454         self.uploaded = {}
455         self.num_retries = 4
456
457     def arvMakeTool(self, toolpath_object, **kwargs):
458         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
459             return ArvadosCommandTool(self, toolpath_object, **kwargs)
460         else:
461             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
462
463     def output_callback(self, out, processStatus):
464         if processStatus == "success":
465             logger.info("Overall job status is %s", processStatus)
466             if self.pipeline:
467                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
468                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
469
470         else:
471             logger.warn("Overall job status is %s", processStatus)
472             if self.pipeline:
473                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
474                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
475         self.final_output = out
476
477
478     def on_message(self, event):
479         if "object_uuid" in event:
480             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
481                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
482                     uuid = event["object_uuid"]
483                     with self.lock:
484                         j = self.jobs[uuid]
485                         logger.info("Job %s (%s) is Running", j.name, uuid)
486                         j.running = True
487                         j.update_pipeline_component(event["properties"]["new_attributes"])
488                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
489                     uuid = event["object_uuid"]
490                     try:
491                         self.cond.acquire()
492                         j = self.jobs[uuid]
493                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
494                         j.done(event["properties"]["new_attributes"])
495                         self.cond.notify()
496                     finally:
497                         self.cond.release()
498
499     def get_uploaded(self):
500         return self.uploaded.copy()
501
502     def add_uploaded(self, src, pair):
503         self.uploaded[src] = pair
504
505     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
506         self.debug = args.debug
507
508         if args.quiet:
509             logger.setLevel(logging.WARN)
510             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
511
512         useruuid = self.api.users().current().execute()["uuid"]
513         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
514         self.pipeline = None
515
516         if args.submit:
517             runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
518             if not args.wait:
519                 runnerjob.run()
520                 return
521
522         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
523
524         self.fs_access = CollectionFsAccess(input_basedir)
525
526         kwargs["fs_access"] = self.fs_access
527         kwargs["enable_reuse"] = args.enable_reuse
528
529         kwargs["outdir"] = "$(task.outdir)"
530         kwargs["tmpdir"] = "$(task.tmpdir)"
531
532         if kwargs.get("conformance_test"):
533             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
534         else:
535             if args.submit:
536                 jobiter = iter((runnerjob,))
537             else:
538                 components = {}
539                 if "cwl_runner_job" in kwargs:
540                     components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
541
542                 self.pipeline = self.api.pipeline_instances().create(
543                     body={
544                         "owner_uuid": self.project_uuid,
545                         "name": shortname(tool.tool["id"]),
546                         "components": components,
547                         "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
548
549                 logger.info("Pipeline instance %s", self.pipeline["uuid"])
550
551                 jobiter = tool.job(job_order,
552                                    input_basedir,
553                                    self.output_callback,
554                                    docker_outdir="$(task.outdir)",
555                                    **kwargs)
556
557             try:
558                 self.cond.acquire()
559                 # Will continue to hold the lock for the duration of this code
560                 # except when in cond.wait(), at which point on_message can update
561                 # job state and process output callbacks.
562
563                 for runnable in jobiter:
564                     if runnable:
565                         runnable.run(**kwargs)
566                     else:
567                         if self.jobs:
568                             self.cond.wait(1)
569                         else:
570                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
571                             break
572
573                 while self.jobs:
574                     self.cond.wait(1)
575
576                 events.close()
577
578                 if self.final_output is None:
579                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
580
581                 # create final output collection
582             except:
583                 if sys.exc_info()[0] is KeyboardInterrupt:
584                     logger.error("Interrupted, marking pipeline as failed")
585                 else:
586                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
587                 if self.pipeline:
588                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
589                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
590             finally:
591                 self.cond.release()
592
593             return self.final_output
594
595 def versionstring():
596     cwlpkg = pkg_resources.require("cwltool")
597     arvpkg = pkg_resources.require("arvados-python-client")
598     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
599
600     return "%s %s, %s %s, %s %s" % (sys.argv[0],
601                                         "arvados-cwl-runner", arvcwlpkg[0].version,
602                                         "arvados-python-client", cwlpkg[0].version,
603                                         "cwltool", arvpkg[0].version)
604
605 def main(args, stdout, stderr, api_client=None):
606     args.insert(0, "--leave-outputs")
607     parser = cwltool.main.arg_parser()
608
609     exgroup = parser.add_mutually_exclusive_group()
610     exgroup.add_argument("--enable-reuse", action="store_true",
611                         default=True, dest="enable_reuse",
612                         help="")
613     exgroup.add_argument("--disable-reuse", action="store_false",
614                         default=True, dest="enable_reuse",
615                         help="")
616
617     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
618
619     exgroup = parser.add_mutually_exclusive_group()
620     exgroup.add_argument("--submit", action="store_true", help="Submit runner job so workflow can run unattended.",
621                         default=True, dest="submit")
622     exgroup.add_argument("--local", action="store_false", help="Workflow runner runs on local host and submits jobs.",
623                         default=True, dest="submit")
624
625     exgroup = parser.add_mutually_exclusive_group()
626     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
627                         default=True, dest="wait")
628     exgroup.add_argument("--no-wait", action="store_false", help="Exit after submitting workflow runner job.",
629                         default=True, dest="wait")
630
631     try:
632         if api_client is None:
633             api_client=arvados.api('v1', model=OrderedJsonModel())
634         runner = ArvCwlRunner(api_client)
635     except Exception as e:
636         logger.error(e)
637         return 1
638
639     return cwltool.main.main(args,
640                              stdout=stdout,
641                              stderr=stderr,
642                              executor=runner.arvExecutor,
643                              makeTool=runner.arvMakeTool,
644                              parser=parser,
645                              versionfunc=versionstring)