Merge branch 'master' into 8857-cwl-job-reuse
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import arvados
7 import arvados.events
8 import arvados.commands.keepdocker
9 import arvados.commands.run
10 import arvados.collection
11 import arvados.util
12 import cwltool.draft2tool
13 import cwltool.workflow
14 import cwltool.main
15 from cwltool.process import shortname
16 from cwltool.errors import WorkflowException
17 import threading
18 import cwltool.docker
19 import fnmatch
20 import logging
21 import re
22 import os
23 import sys
24 import functools
25 import json
26 import pkg_resources  # part of setuptools
27
28 from cwltool.process import get_feature, adjustFiles, scandeps
29 from arvados.api import OrderedJsonModel
30
31 logger = logging.getLogger('arvados.cwl-runner')
32 logger.setLevel(logging.INFO)
33
34 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
35 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
36 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
37
38
39 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
40     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
41
42     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
43         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
44
45     sp = dockerRequirement["dockerImageId"].split(":")
46     image_name = sp[0]
47     image_tag = sp[1] if len(sp) > 1 else None
48
49     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
50                                                             image_name=image_name,
51                                                             image_tag=image_tag)
52
53     if not images:
54         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
55         args = ["--project-uuid="+project_uuid, image_name]
56         if image_tag:
57             args.append(image_tag)
58         logger.info("Uploading Docker image %s", ":".join(args[1:]))
59         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
60
61     return dockerRequirement["dockerImageId"]
62
63
64 class CollectionFsAccess(cwltool.process.StdFsAccess):
65     """Implement the cwltool FsAccess interface for Arvados Collections."""
66
67     def __init__(self, basedir):
68         self.collections = {}
69         self.basedir = basedir
70
71     def get_collection(self, path):
72         p = path.split("/")
73         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
74             pdh = p[0][5:]
75             if pdh not in self.collections:
76                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
77             return (self.collections[pdh], "/".join(p[1:]))
78         else:
79             return (None, path)
80
81     def _match(self, collection, patternsegments, parent):
82         if not patternsegments:
83             return []
84
85         if not isinstance(collection, arvados.collection.RichCollectionBase):
86             return []
87
88         ret = []
89         # iterate over the files and subcollections in 'collection'
90         for filename in collection:
91             if patternsegments[0] == '.':
92                 # Pattern contains something like "./foo" so just shift
93                 # past the "./"
94                 ret.extend(self._match(collection, patternsegments[1:], parent))
95             elif fnmatch.fnmatch(filename, patternsegments[0]):
96                 cur = os.path.join(parent, filename)
97                 if len(patternsegments) == 1:
98                     ret.append(cur)
99                 else:
100                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
101         return ret
102
103     def glob(self, pattern):
104         collection, rest = self.get_collection(pattern)
105         patternsegments = rest.split("/")
106         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
107
108     def open(self, fn, mode):
109         collection, rest = self.get_collection(fn)
110         if collection:
111             return collection.open(rest, mode)
112         else:
113             return open(self._abs(fn), mode)
114
115     def exists(self, fn):
116         collection, rest = self.get_collection(fn)
117         if collection:
118             return collection.exists(rest)
119         else:
120             return os.path.exists(self._abs(fn))
121
122 class ArvadosJob(object):
123     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
124
125     def __init__(self, runner):
126         self.arvrunner = runner
127         self.running = False
128
129     def run(self, dry_run=False, pull_image=True, **kwargs):
130         script_parameters = {
131             "command": self.command_line
132         }
133         runtime_constraints = {}
134
135         if self.generatefiles:
136             vwd = arvados.collection.Collection()
137             script_parameters["task.vwd"] = {}
138             for t in self.generatefiles:
139                 if isinstance(self.generatefiles[t], dict):
140                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
141                     vwd.copy(rest, t, source_collection=src)
142                 else:
143                     with vwd.open(t, "w") as f:
144                         f.write(self.generatefiles[t])
145             vwd.save_new()
146             for t in self.generatefiles:
147                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
148
149         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
150         if self.environment:
151             script_parameters["task.env"].update(self.environment)
152
153         if self.stdin:
154             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
155
156         if self.stdout:
157             script_parameters["task.stdout"] = self.stdout
158
159         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
160         if docker_req and kwargs.get("use_container") is not False:
161             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
162         else:
163             runtime_constraints["docker_image"] = "arvados/jobs"
164
165         resources = self.builder.resources
166         if resources is not None:
167             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
168             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
169             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
170
171         filters = [["repository", "=", "arvados"],
172                    ["script", "=", "crunchrunner"],
173                    ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
174         if not self.arvrunner.ignore_docker_for_reuse:
175             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
176
177         try:
178             response = self.arvrunner.api.jobs().create(
179                 body={
180                     "owner_uuid": self.arvrunner.project_uuid,
181                     "script": "crunchrunner",
182                     "repository": "arvados",
183                     "script_version": "master",
184                     "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
185                     "script_parameters": {"tasks": [script_parameters]},
186                     "runtime_constraints": runtime_constraints
187                 },
188                 filters=filters,
189                 find_or_create=kwargs.get("enable_reuse", True)
190             ).execute(num_retries=self.arvrunner.num_retries)
191
192             self.arvrunner.jobs[response["uuid"]] = self
193
194             self.arvrunner.pipeline["components"][self.name] = {"job": response}
195             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
196                                                                                      body={
197                                                                                          "components": self.arvrunner.pipeline["components"]
198                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
199
200             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
201
202             if response["state"] in ("Complete", "Failed", "Cancelled"):
203                 self.done(response)
204         except Exception as e:
205             logger.error("Got error %s" % str(e))
206             self.output_callback({}, "permanentFail")
207
208     def update_pipeline_component(self, record):
209         self.arvrunner.pipeline["components"][self.name] = {"job": record}
210         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
211                                                                                  body={
212                                                                                     "components": self.arvrunner.pipeline["components"]
213                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
214
215     def done(self, record):
216         try:
217             self.update_pipeline_component(record)
218         except:
219             pass
220
221         try:
222             if record["state"] == "Complete":
223                 processStatus = "success"
224             else:
225                 processStatus = "permanentFail"
226
227             try:
228                 outputs = {}
229                 if record["output"]:
230                     logc = arvados.collection.Collection(record["log"])
231                     log = logc.open(logc.keys()[0])
232                     tmpdir = None
233                     outdir = None
234                     keepdir = None
235                     for l in log:
236                         # Determine the tmpdir, outdir and keepdir paths from
237                         # the job run.  Unfortunately, we can't take the first
238                         # values we find (which are expected to be near the
239                         # top) and stop scanning because if the node fails and
240                         # the job restarts on a different node these values
241                         # will different runs, and we need to know about the
242                         # final run that actually produced output.
243
244                         g = tmpdirre.match(l)
245                         if g:
246                             tmpdir = g.group(1)
247                         g = outdirre.match(l)
248                         if g:
249                             outdir = g.group(1)
250                         g = keepre.match(l)
251                         if g:
252                             keepdir = g.group(1)
253
254                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
255
256                     # check if collection already exists with same owner, name and content
257                     collection_exists = self.arvrunner.api.collections().list(
258                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
259                                  ['portable_data_hash', '=', record["output"]],
260                                  ["name", "=", colname]]
261                     ).execute(num_retries=self.arvrunner.num_retries)
262
263                     if not collection_exists["items"]:
264                         # Create a collection located in the same project as the
265                         # pipeline with the contents of the output.
266                         # First, get output record.
267                         collections = self.arvrunner.api.collections().list(
268                             limit=1,
269                             filters=[['portable_data_hash', '=', record["output"]]],
270                             select=["manifest_text"]
271                         ).execute(num_retries=self.arvrunner.num_retries)
272
273                         if not collections["items"]:
274                             raise WorkflowException(
275                                 "Job output '%s' cannot be found on API server" % (
276                                     record["output"]))
277
278                         # Create new collection in the parent project
279                         # with the output contents.
280                         self.arvrunner.api.collections().create(body={
281                             "owner_uuid": self.arvrunner.project_uuid,
282                             "name": colname,
283                             "portable_data_hash": record["output"],
284                             "manifest_text": collections["items"][0]["manifest_text"]
285                         }, ensure_unique_name=True).execute(
286                             num_retries=self.arvrunner.num_retries)
287
288                     self.builder.outdir = outdir
289                     self.builder.pathmapper.keepdir = keepdir
290                     outputs = self.collect_outputs("keep:" + record["output"])
291             except WorkflowException as e:
292                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
293                 processStatus = "permanentFail"
294             except Exception as e:
295                 logger.exception("Got unknown exception while collecting job outputs:")
296                 processStatus = "permanentFail"
297
298             self.output_callback(outputs, processStatus)
299         finally:
300             del self.arvrunner.jobs[record["uuid"]]
301
302
303 class RunnerJob(object):
304     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
305
306     def __init__(self, runner, tool, job_order, enable_reuse):
307         self.arvrunner = runner
308         self.tool = tool
309         self.job_order = job_order
310         self.running = False
311         self.enable_reuse = enable_reuse
312
313     def update_pipeline_component(self, record):
314         pass
315
316     def upload_docker(self, tool):
317         if isinstance(tool, cwltool.draft2tool.CommandLineTool):
318             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
319             if docker_req:
320                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
321         elif isinstance(tool, cwltool.workflow.Workflow):
322             for s in tool.steps:
323                 self.upload_docker(s.embedded_tool)
324
325     def run(self, dry_run=False, pull_image=True, **kwargs):
326         self.upload_docker(self.tool)
327
328         workflowfiles = set()
329         jobfiles = set()
330         workflowfiles.add(self.tool.tool["id"])
331
332         self.name = os.path.basename(self.tool.tool["id"])
333
334         def visitFiles(files, path):
335             files.add(path)
336             return path
337
338         document_loader, _, _ = cwltool.process.get_schema()
339         def loadref(b, u):
340             return document_loader.resolve_ref(u, base_url=b)[0]
341
342         sc = scandeps("", self.tool.tool,
343                       set(("$import", "run")),
344                       set(("$include", "$schemas", "path")),
345                       loadref)
346         adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
347         adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
348
349         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
350                                        "%s",
351                                        "%s/%s",
352                                        name=self.name,
353                                        **kwargs)
354
355         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
356                                   "%s",
357                                   "%s/%s",
358                                   name=os.path.basename(self.job_order.get("id", "#")),
359                                   **kwargs)
360
361         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
362
363         if "id" in self.job_order:
364             del self.job_order["id"]
365
366         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
367
368         response = self.arvrunner.api.jobs().create(body={
369             "script": "cwl-runner",
370             "script_version": "master",
371             "repository": "arvados",
372             "script_parameters": self.job_order,
373             "runtime_constraints": {
374                 "docker_image": "arvados/jobs"
375             }
376         }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
377
378         self.arvrunner.jobs[response["uuid"]] = self
379
380         logger.info("Submitted job %s", response["uuid"])
381
382         if response["state"] in ("Complete", "Failed", "Cancelled"):
383             self.done(response)
384
385     def done(self, record):
386         if record["state"] == "Complete":
387             processStatus = "success"
388         else:
389             processStatus = "permanentFail"
390
391         outputs = None
392         try:
393             try:
394                 outc = arvados.collection.Collection(record["output"])
395                 with outc.open("cwl.output.json") as f:
396                     outputs = json.load(f)
397             except Exception as e:
398                 logger.error("While getting final output object: %s", e)
399             self.arvrunner.output_callback(outputs, processStatus)
400         finally:
401             del self.arvrunner.jobs[record["uuid"]]
402
403 class ArvPathMapper(cwltool.pathmapper.PathMapper):
404     """Convert container-local paths to and from Keep collection ids."""
405
406     def __init__(self, arvrunner, referenced_files, basedir,
407                  collection_pattern, file_pattern, name=None, **kwargs):
408         self._pathmap = arvrunner.get_uploaded()
409         uploadfiles = set()
410
411         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
412
413         for src in referenced_files:
414             if isinstance(src, basestring) and pdh_path.match(src):
415                 self._pathmap[src] = (src, collection_pattern % src[5:])
416             if "#" in src:
417                 src = src[:src.index("#")]
418             if src not in self._pathmap:
419                 ab = cwltool.pathmapper.abspath(src, basedir)
420                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
421                 if kwargs.get("conformance_test"):
422                     self._pathmap[src] = (src, ab)
423                 elif isinstance(st, arvados.commands.run.UploadFile):
424                     uploadfiles.add((src, ab, st))
425                 elif isinstance(st, arvados.commands.run.ArvFile):
426                     self._pathmap[src] = (ab, st.fn)
427                 else:
428                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
429
430         if uploadfiles:
431             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
432                                              arvrunner.api,
433                                              dry_run=kwargs.get("dry_run"),
434                                              num_retries=3,
435                                              fnPattern=file_pattern,
436                                              name=name,
437                                              project=arvrunner.project_uuid)
438
439         for src, ab, st in uploadfiles:
440             arvrunner.add_uploaded(src, (ab, st.fn))
441             self._pathmap[src] = (ab, st.fn)
442
443         self.keepdir = None
444
445     def reversemap(self, target):
446         if target.startswith("keep:"):
447             return (target, target)
448         elif self.keepdir and target.startswith(self.keepdir):
449             return (target, "keep:" + target[len(self.keepdir)+1:])
450         else:
451             return super(ArvPathMapper, self).reversemap(target)
452
453
454 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
455     """Wrap cwltool CommandLineTool to override selected methods."""
456
457     def __init__(self, arvrunner, toolpath_object, **kwargs):
458         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
459         self.arvrunner = arvrunner
460
461     def makeJobRunner(self):
462         return ArvadosJob(self.arvrunner)
463
464     def makePathMapper(self, reffiles, input_basedir, **kwargs):
465         return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
466                              "$(task.keep)/%s",
467                              "$(task.keep)/%s/%s",
468                              **kwargs)
469
470
471 class ArvCwlRunner(object):
472     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
473     complete, and report output."""
474
475     def __init__(self, api_client):
476         self.api = api_client
477         self.jobs = {}
478         self.lock = threading.Lock()
479         self.cond = threading.Condition(self.lock)
480         self.final_output = None
481         self.uploaded = {}
482         self.num_retries = 4
483
484     def arvMakeTool(self, toolpath_object, **kwargs):
485         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
486             return ArvadosCommandTool(self, toolpath_object, **kwargs)
487         else:
488             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
489
490     def output_callback(self, out, processStatus):
491         if processStatus == "success":
492             logger.info("Overall job status is %s", processStatus)
493             if self.pipeline:
494                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
495                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
496
497         else:
498             logger.warn("Overall job status is %s", processStatus)
499             if self.pipeline:
500                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
501                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
502         self.final_output = out
503
504
505     def on_message(self, event):
506         if "object_uuid" in event:
507             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
508                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
509                     uuid = event["object_uuid"]
510                     with self.lock:
511                         j = self.jobs[uuid]
512                         logger.info("Job %s (%s) is Running", j.name, uuid)
513                         j.running = True
514                         j.update_pipeline_component(event["properties"]["new_attributes"])
515                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
516                     uuid = event["object_uuid"]
517                     try:
518                         self.cond.acquire()
519                         j = self.jobs[uuid]
520                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
521                         j.done(event["properties"]["new_attributes"])
522                         self.cond.notify()
523                     finally:
524                         self.cond.release()
525
526     def get_uploaded(self):
527         return self.uploaded.copy()
528
529     def add_uploaded(self, src, pair):
530         self.uploaded[src] = pair
531
532     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
533         self.debug = args.debug
534
535         if args.quiet:
536             logger.setLevel(logging.WARN)
537             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
538
539         useruuid = self.api.users().current().execute()["uuid"]
540         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
541         self.pipeline = None
542
543         if args.submit:
544             runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
545             if not args.wait:
546                 runnerjob.run()
547                 return
548
549         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
550
551         self.debug = args.debug
552         self.ignore_docker_for_reuse = args.ignore_docker_for_reuse
553         self.fs_access = CollectionFsAccess(input_basedir)
554
555         kwargs["fs_access"] = self.fs_access
556         kwargs["enable_reuse"] = args.enable_reuse
557
558         kwargs["outdir"] = "$(task.outdir)"
559         kwargs["tmpdir"] = "$(task.tmpdir)"
560
561         if kwargs.get("conformance_test"):
562             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
563         else:
564             if args.submit:
565                 jobiter = iter((runnerjob,))
566             else:
567                 components = {}
568                 if "cwl_runner_job" in kwargs:
569                     components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
570
571                 self.pipeline = self.api.pipeline_instances().create(
572                     body={
573                         "owner_uuid": self.project_uuid,
574                         "name": shortname(tool.tool["id"]),
575                         "components": components,
576                         "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
577
578                 logger.info("Pipeline instance %s", self.pipeline["uuid"])
579
580                 jobiter = tool.job(job_order,
581                                    input_basedir,
582                                    self.output_callback,
583                                    docker_outdir="$(task.outdir)",
584                                    **kwargs)
585
586             try:
587                 self.cond.acquire()
588                 # Will continue to hold the lock for the duration of this code
589                 # except when in cond.wait(), at which point on_message can update
590                 # job state and process output callbacks.
591
592                 for runnable in jobiter:
593                     if runnable:
594                         runnable.run(**kwargs)
595                     else:
596                         if self.jobs:
597                             self.cond.wait(1)
598                         else:
599                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
600                             break
601
602                 while self.jobs:
603                     self.cond.wait(1)
604
605                 events.close()
606
607                 if self.final_output is None:
608                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
609
610                 # create final output collection
611             except:
612                 if sys.exc_info()[0] is KeyboardInterrupt:
613                     logger.error("Interrupted, marking pipeline as failed")
614                 else:
615                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
616                 if self.pipeline:
617                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
618                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
619             finally:
620                 self.cond.release()
621
622             return self.final_output
623
624 def versionstring():
625     """Print version string of key packages for provenance and debugging."""
626
627     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
628     arvpkg = pkg_resources.require("arvados-python-client")
629     cwlpkg = pkg_resources.require("cwltool")
630
631     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
632                                     "arvados-python-client", arvpkg[0].version,
633                                     "cwltool", cwlpkg[0].version)
634
635 def main(args, stdout, stderr, api_client=None):
636     args.insert(0, "--leave-outputs")
637     parser = cwltool.main.arg_parser()
638
639     exgroup = parser.add_mutually_exclusive_group()
640     exgroup.add_argument("--enable-reuse", action="store_true",
641                         default=True, dest="enable_reuse",
642                         help="")
643     exgroup.add_argument("--disable-reuse", action="store_false",
644                         default=True, dest="enable_reuse",
645                         help="")
646
647     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
648     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
649                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
650                         default=False)
651
652     exgroup = parser.add_mutually_exclusive_group()
653     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
654                         default=True, dest="submit")
655     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
656                         default=True, dest="submit")
657
658     exgroup = parser.add_mutually_exclusive_group()
659     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
660                         default=True, dest="wait")
661     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
662                         default=True, dest="wait")
663
664     try:
665         if api_client is None:
666             api_client=arvados.api('v1', model=OrderedJsonModel())
667         runner = ArvCwlRunner(api_client)
668     except Exception as e:
669         logger.error(e)
670         return 1
671
672     return cwltool.main.main(args,
673                              stdout=stdout,
674                              stderr=stderr,
675                              executor=runner.arvExecutor,
676                              makeTool=runner.arvMakeTool,
677                              parser=parser,
678                              versionfunc=versionstring)