Merge branch 'master' into 7658-websockets-reconnect-on-close
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import arvados
7 import arvados.events
8 import arvados.commands.keepdocker
9 import arvados.commands.run
10 import arvados.collection
11 import arvados.util
12 import cwltool.draft2tool
13 import cwltool.workflow
14 import cwltool.main
15 from cwltool.process import shortname
16 from cwltool.errors import WorkflowException
17 import threading
18 import cwltool.docker
19 import fnmatch
20 import logging
21 import re
22 import os
23 import sys
24 import functools
25 import json
26 import pkg_resources  # part of setuptools
27
28 from cwltool.process import get_feature, adjustFiles, scandeps
29 from arvados.api import OrderedJsonModel
30
31 logger = logging.getLogger('arvados.cwl-runner')
32 logger.setLevel(logging.INFO)
33
34 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
35 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
36 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
37
38
39 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
40     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
41
42     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
43         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
44
45     sp = dockerRequirement["dockerImageId"].split(":")
46     image_name = sp[0]
47     image_tag = sp[1] if len(sp) > 1 else None
48
49     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
50                                                             image_name=image_name,
51                                                             image_tag=image_tag)
52
53     if not images:
54         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
55         args = ["--project-uuid="+project_uuid, image_name]
56         if image_tag:
57             args.append(image_tag)
58         logger.info("Uploading Docker image %s", ":".join(args[1:]))
59         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
60
61     return dockerRequirement["dockerImageId"]
62
63
64 class CollectionFsAccess(cwltool.process.StdFsAccess):
65     """Implement the cwltool FsAccess interface for Arvados Collections."""
66
67     def __init__(self, basedir):
68         self.collections = {}
69         self.basedir = basedir
70
71     def get_collection(self, path):
72         p = path.split("/")
73         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
74             pdh = p[0][5:]
75             if pdh not in self.collections:
76                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
77             return (self.collections[pdh], "/".join(p[1:]))
78         else:
79             return (None, path)
80
81     def _match(self, collection, patternsegments, parent):
82         if not patternsegments:
83             return []
84
85         if not isinstance(collection, arvados.collection.RichCollectionBase):
86             return []
87
88         ret = []
89         # iterate over the files and subcollections in 'collection'
90         for filename in collection:
91             if patternsegments[0] == '.':
92                 # Pattern contains something like "./foo" so just shift
93                 # past the "./"
94                 ret.extend(self._match(collection, patternsegments[1:], parent))
95             elif fnmatch.fnmatch(filename, patternsegments[0]):
96                 cur = os.path.join(parent, filename)
97                 if len(patternsegments) == 1:
98                     ret.append(cur)
99                 else:
100                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
101         return ret
102
103     def glob(self, pattern):
104         collection, rest = self.get_collection(pattern)
105         patternsegments = rest.split("/")
106         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
107
108     def open(self, fn, mode):
109         collection, rest = self.get_collection(fn)
110         if collection:
111             return collection.open(rest, mode)
112         else:
113             return open(self._abs(fn), mode)
114
115     def exists(self, fn):
116         collection, rest = self.get_collection(fn)
117         if collection:
118             return collection.exists(rest)
119         else:
120             return os.path.exists(self._abs(fn))
121
122 class ArvadosJob(object):
123     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
124
125     def __init__(self, runner):
126         self.arvrunner = runner
127         self.running = False
128
129     def run(self, dry_run=False, pull_image=True, **kwargs):
130         script_parameters = {
131             "command": self.command_line
132         }
133         runtime_constraints = {}
134
135         if self.generatefiles:
136             vwd = arvados.collection.Collection()
137             script_parameters["task.vwd"] = {}
138             for t in self.generatefiles:
139                 if isinstance(self.generatefiles[t], dict):
140                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
141                     vwd.copy(rest, t, source_collection=src)
142                 else:
143                     with vwd.open(t, "w") as f:
144                         f.write(self.generatefiles[t])
145             vwd.save_new()
146             for t in self.generatefiles:
147                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
148
149         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
150         if self.environment:
151             script_parameters["task.env"].update(self.environment)
152
153         if self.stdin:
154             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
155
156         if self.stdout:
157             script_parameters["task.stdout"] = self.stdout
158
159         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
160         if docker_req and kwargs.get("use_container") is not False:
161             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
162         else:
163             runtime_constraints["docker_image"] = "arvados/jobs"
164
165         resources = self.builder.resources
166         if resources is not None:
167             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
168             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
169             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
170
171         filters = [["repository", "=", "arvados"],
172                    ["script", "=", "crunchrunner"],
173                    ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
174         if not self.arvrunner.ignore_docker_for_reuse:
175             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
176
177         try:
178             response = self.arvrunner.api.jobs().create(
179                 body={
180                     "owner_uuid": self.arvrunner.project_uuid,
181                     "script": "crunchrunner",
182                     "repository": "arvados",
183                     "script_version": "master",
184                     "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
185                     "script_parameters": {"tasks": [script_parameters]},
186                     "runtime_constraints": runtime_constraints
187                 },
188                 filters=filters,
189                 find_or_create=kwargs.get("enable_reuse", True)
190             ).execute(num_retries=self.arvrunner.num_retries)
191
192             self.arvrunner.jobs[response["uuid"]] = self
193
194             self.arvrunner.pipeline["components"][self.name] = {"job": response}
195             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
196                                                                                      body={
197                                                                                          "components": self.arvrunner.pipeline["components"]
198                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
199
200             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
201
202             if response["state"] in ("Complete", "Failed", "Cancelled"):
203                 self.done(response)
204         except Exception as e:
205             logger.error("Got error %s" % str(e))
206             self.output_callback({}, "permanentFail")
207
208     def update_pipeline_component(self, record):
209         self.arvrunner.pipeline["components"][self.name] = {"job": record}
210         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
211                                                                                  body={
212                                                                                     "components": self.arvrunner.pipeline["components"]
213                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
214
215     def done(self, record):
216         try:
217             self.update_pipeline_component(record)
218         except:
219             pass
220
221         try:
222             if record["state"] == "Complete":
223                 processStatus = "success"
224             else:
225                 processStatus = "permanentFail"
226
227             try:
228                 outputs = {}
229                 if record["output"]:
230                     logc = arvados.collection.Collection(record["log"])
231                     log = logc.open(logc.keys()[0])
232                     tmpdir = None
233                     outdir = None
234                     keepdir = None
235                     for l in log:
236                         # Determine the tmpdir, outdir and keepdir paths from
237                         # the job run.  Unfortunately, we can't take the first
238                         # values we find (which are expected to be near the
239                         # top) and stop scanning because if the node fails and
240                         # the job restarts on a different node these values
241                         # will different runs, and we need to know about the
242                         # final run that actually produced output.
243
244                         g = tmpdirre.match(l)
245                         if g:
246                             tmpdir = g.group(1)
247                         g = outdirre.match(l)
248                         if g:
249                             outdir = g.group(1)
250                         g = keepre.match(l)
251                         if g:
252                             keepdir = g.group(1)
253
254                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
255
256                     # check if collection already exists with same owner, name and content
257                     collection_exists = self.arvrunner.api.collections().list(
258                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
259                                  ['portable_data_hash', '=', record["output"]],
260                                  ["name", "=", colname]]
261                     ).execute(num_retries=self.arvrunner.num_retries)
262
263                     if not collection_exists["items"]:
264                         # Create a collection located in the same project as the
265                         # pipeline with the contents of the output.
266                         # First, get output record.
267                         collections = self.arvrunner.api.collections().list(
268                             limit=1,
269                             filters=[['portable_data_hash', '=', record["output"]]],
270                             select=["manifest_text"]
271                         ).execute(num_retries=self.arvrunner.num_retries)
272
273                         if not collections["items"]:
274                             raise WorkflowException(
275                                 "Job output '%s' cannot be found on API server" % (
276                                     record["output"]))
277
278                         # Create new collection in the parent project
279                         # with the output contents.
280                         self.arvrunner.api.collections().create(body={
281                             "owner_uuid": self.arvrunner.project_uuid,
282                             "name": colname,
283                             "portable_data_hash": record["output"],
284                             "manifest_text": collections["items"][0]["manifest_text"]
285                         }, ensure_unique_name=True).execute(
286                             num_retries=self.arvrunner.num_retries)
287
288                     self.builder.outdir = outdir
289                     self.builder.pathmapper.keepdir = keepdir
290                     outputs = self.collect_outputs("keep:" + record["output"])
291             except WorkflowException as e:
292                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
293                 processStatus = "permanentFail"
294             except Exception as e:
295                 logger.exception("Got unknown exception while collecting job outputs:")
296                 processStatus = "permanentFail"
297
298             self.output_callback(outputs, processStatus)
299         finally:
300             del self.arvrunner.jobs[record["uuid"]]
301
302
303 class RunnerJob(object):
304     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
305
306     def __init__(self, runner, tool, job_order, enable_reuse):
307         self.arvrunner = runner
308         self.tool = tool
309         self.job_order = job_order
310         self.running = False
311         self.enable_reuse = enable_reuse
312
313     def update_pipeline_component(self, record):
314         pass
315
316     def upload_docker(self, tool):
317         if isinstance(tool, cwltool.draft2tool.CommandLineTool):
318             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
319             if docker_req:
320                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
321         elif isinstance(tool, cwltool.workflow.Workflow):
322             for s in tool.steps:
323                 self.upload_docker(s.embedded_tool)
324
325     def run(self, dry_run=False, pull_image=True, **kwargs):
326         self.upload_docker(self.tool)
327
328         workflowfiles = set()
329         jobfiles = set()
330         workflowfiles.add(self.tool.tool["id"])
331
332         self.name = os.path.basename(self.tool.tool["id"])
333
334         def visitFiles(files, path):
335             files.add(path)
336             return path
337
338         document_loader, _, _ = cwltool.process.get_schema()
339         def loadref(b, u):
340             return document_loader.resolve_ref(u, base_url=b)[0]
341
342         sc = scandeps("", self.tool.tool,
343                       set(("$import", "run")),
344                       set(("$include", "$schemas", "path")),
345                       loadref)
346         adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
347         adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
348
349         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
350                                        "%s",
351                                        "%s/%s",
352                                        name=self.name,
353                                        **kwargs)
354
355         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
356                                   "%s",
357                                   "%s/%s",
358                                   name=os.path.basename(self.job_order.get("id", "#")),
359                                   **kwargs)
360
361         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
362
363         if "id" in self.job_order:
364             del self.job_order["id"]
365
366         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
367
368         response = self.arvrunner.api.jobs().create(body={
369             "owner_uuid": self.arvrunner.project_uuid,
370             "script": "cwl-runner",
371             "script_version": "master",
372             "repository": "arvados",
373             "script_parameters": self.job_order,
374             "runtime_constraints": {
375                 "docker_image": "arvados/jobs"
376             }
377         }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
378
379         self.arvrunner.jobs[response["uuid"]] = self
380
381         logger.info("Submitted job %s", response["uuid"])
382
383         if response["state"] in ("Complete", "Failed", "Cancelled"):
384             self.done(response)
385
386     def done(self, record):
387         if record["state"] == "Complete":
388             processStatus = "success"
389         else:
390             processStatus = "permanentFail"
391
392         outputs = None
393         try:
394             try:
395                 outc = arvados.collection.Collection(record["output"])
396                 with outc.open("cwl.output.json") as f:
397                     outputs = json.load(f)
398             except Exception as e:
399                 logger.error("While getting final output object: %s", e)
400             self.arvrunner.output_callback(outputs, processStatus)
401         finally:
402             del self.arvrunner.jobs[record["uuid"]]
403
404 class ArvPathMapper(cwltool.pathmapper.PathMapper):
405     """Convert container-local paths to and from Keep collection ids."""
406
407     def __init__(self, arvrunner, referenced_files, basedir,
408                  collection_pattern, file_pattern, name=None, **kwargs):
409         self._pathmap = arvrunner.get_uploaded()
410         uploadfiles = set()
411
412         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
413
414         for src in referenced_files:
415             if isinstance(src, basestring) and pdh_path.match(src):
416                 self._pathmap[src] = (src, collection_pattern % src[5:])
417             if "#" in src:
418                 src = src[:src.index("#")]
419             if src not in self._pathmap:
420                 ab = cwltool.pathmapper.abspath(src, basedir)
421                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
422                 if kwargs.get("conformance_test"):
423                     self._pathmap[src] = (src, ab)
424                 elif isinstance(st, arvados.commands.run.UploadFile):
425                     uploadfiles.add((src, ab, st))
426                 elif isinstance(st, arvados.commands.run.ArvFile):
427                     self._pathmap[src] = (ab, st.fn)
428                 else:
429                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
430
431         if uploadfiles:
432             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
433                                              arvrunner.api,
434                                              dry_run=kwargs.get("dry_run"),
435                                              num_retries=3,
436                                              fnPattern=file_pattern,
437                                              name=name,
438                                              project=arvrunner.project_uuid)
439
440         for src, ab, st in uploadfiles:
441             arvrunner.add_uploaded(src, (ab, st.fn))
442             self._pathmap[src] = (ab, st.fn)
443
444         self.keepdir = None
445
446     def reversemap(self, target):
447         if target.startswith("keep:"):
448             return (target, target)
449         elif self.keepdir and target.startswith(self.keepdir):
450             return (target, "keep:" + target[len(self.keepdir)+1:])
451         else:
452             return super(ArvPathMapper, self).reversemap(target)
453
454
455 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
456     """Wrap cwltool CommandLineTool to override selected methods."""
457
458     def __init__(self, arvrunner, toolpath_object, **kwargs):
459         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
460         self.arvrunner = arvrunner
461
462     def makeJobRunner(self):
463         return ArvadosJob(self.arvrunner)
464
465     def makePathMapper(self, reffiles, input_basedir, **kwargs):
466         return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
467                              "$(task.keep)/%s",
468                              "$(task.keep)/%s/%s",
469                              **kwargs)
470
471
472 class ArvCwlRunner(object):
473     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
474     complete, and report output."""
475
476     def __init__(self, api_client):
477         self.api = api_client
478         self.jobs = {}
479         self.lock = threading.Lock()
480         self.cond = threading.Condition(self.lock)
481         self.final_output = None
482         self.uploaded = {}
483         self.num_retries = 4
484
485     def arvMakeTool(self, toolpath_object, **kwargs):
486         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
487             return ArvadosCommandTool(self, toolpath_object, **kwargs)
488         else:
489             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
490
491     def output_callback(self, out, processStatus):
492         if processStatus == "success":
493             logger.info("Overall job status is %s", processStatus)
494             if self.pipeline:
495                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
496                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
497
498         else:
499             logger.warn("Overall job status is %s", processStatus)
500             if self.pipeline:
501                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
502                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
503         self.final_output = out
504
505
506     def on_message(self, event):
507         if "object_uuid" in event:
508             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
509                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
510                     uuid = event["object_uuid"]
511                     with self.lock:
512                         j = self.jobs[uuid]
513                         logger.info("Job %s (%s) is Running", j.name, uuid)
514                         j.running = True
515                         j.update_pipeline_component(event["properties"]["new_attributes"])
516                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
517                     uuid = event["object_uuid"]
518                     try:
519                         self.cond.acquire()
520                         j = self.jobs[uuid]
521                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
522                         j.done(event["properties"]["new_attributes"])
523                         self.cond.notify()
524                     finally:
525                         self.cond.release()
526
527     def get_uploaded(self):
528         return self.uploaded.copy()
529
530     def add_uploaded(self, src, pair):
531         self.uploaded[src] = pair
532
533     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
534         self.debug = args.debug
535
536         if args.quiet:
537             logger.setLevel(logging.WARN)
538             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
539
540         useruuid = self.api.users().current().execute()["uuid"]
541         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
542         self.pipeline = None
543
544         if args.submit:
545             runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
546             if not args.wait:
547                 runnerjob.run()
548                 return
549
550         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
551
552         self.debug = args.debug
553         self.ignore_docker_for_reuse = args.ignore_docker_for_reuse
554         self.fs_access = CollectionFsAccess(input_basedir)
555
556         kwargs["fs_access"] = self.fs_access
557         kwargs["enable_reuse"] = args.enable_reuse
558
559         kwargs["outdir"] = "$(task.outdir)"
560         kwargs["tmpdir"] = "$(task.tmpdir)"
561
562         if kwargs.get("conformance_test"):
563             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
564         else:
565             if args.submit:
566                 jobiter = iter((runnerjob,))
567             else:
568                 components = {}
569                 if "cwl_runner_job" in kwargs:
570                     components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
571
572                 self.pipeline = self.api.pipeline_instances().create(
573                     body={
574                         "owner_uuid": self.project_uuid,
575                         "name": shortname(tool.tool["id"]),
576                         "components": components,
577                         "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
578
579                 logger.info("Pipeline instance %s", self.pipeline["uuid"])
580
581                 jobiter = tool.job(job_order,
582                                    input_basedir,
583                                    self.output_callback,
584                                    docker_outdir="$(task.outdir)",
585                                    **kwargs)
586
587             try:
588                 self.cond.acquire()
589                 # Will continue to hold the lock for the duration of this code
590                 # except when in cond.wait(), at which point on_message can update
591                 # job state and process output callbacks.
592
593                 for runnable in jobiter:
594                     if runnable:
595                         runnable.run(**kwargs)
596                     else:
597                         if self.jobs:
598                             self.cond.wait(1)
599                         else:
600                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
601                             break
602
603                 while self.jobs:
604                     self.cond.wait(1)
605
606                 events.close()
607
608                 if self.final_output is None:
609                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
610
611                 # create final output collection
612             except:
613                 if sys.exc_info()[0] is KeyboardInterrupt:
614                     logger.error("Interrupted, marking pipeline as failed")
615                 else:
616                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
617                 if self.pipeline:
618                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
619                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
620             finally:
621                 self.cond.release()
622
623             return self.final_output
624
625 def versionstring():
626     """Print version string of key packages for provenance and debugging."""
627
628     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
629     arvpkg = pkg_resources.require("arvados-python-client")
630     cwlpkg = pkg_resources.require("cwltool")
631
632     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
633                                     "arvados-python-client", arvpkg[0].version,
634                                     "cwltool", cwlpkg[0].version)
635
636 def main(args, stdout, stderr, api_client=None):
637     args.insert(0, "--leave-outputs")
638     parser = cwltool.main.arg_parser()
639
640     exgroup = parser.add_mutually_exclusive_group()
641     exgroup.add_argument("--enable-reuse", action="store_true",
642                         default=True, dest="enable_reuse",
643                         help="")
644     exgroup.add_argument("--disable-reuse", action="store_false",
645                         default=True, dest="enable_reuse",
646                         help="")
647
648     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
649     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
650                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
651                         default=False)
652
653     exgroup = parser.add_mutually_exclusive_group()
654     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
655                         default=True, dest="submit")
656     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
657                         default=True, dest="submit")
658
659     exgroup = parser.add_mutually_exclusive_group()
660     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
661                         default=True, dest="wait")
662     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
663                         default=True, dest="wait")
664
665     try:
666         if api_client is None:
667             api_client=arvados.api('v1', model=OrderedJsonModel())
668         runner = ArvCwlRunner(api_client)
669     except Exception as e:
670         logger.error(e)
671         return 1
672
673     return cwltool.main.main(args,
674                              stdout=stdout,
675                              stderr=stderr,
676                              executor=runner.arvExecutor,
677                              makeTool=runner.arvMakeTool,
678                              parser=parser,
679                              versionfunc=versionstring)