8654: Make --submit --wait the default mode.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 from cwltool.errors import WorkflowException
15 import threading
16 import cwltool.docker
17 import fnmatch
18 import logging
19 import re
20 import os
21 import sys
22 import functools
23 import json
24 import pkg_resources  # part of setuptools
25
26 from cwltool.process import get_feature, adjustFiles, scandeps
27 from arvados.api import OrderedJsonModel
28
29 logger = logging.getLogger('arvados.cwl-runner')
30 logger.setLevel(logging.INFO)
31
32 crunchrunner_pdh = "ff6fc71e593081ef9733afacaeee15ea+140"
33 crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
34 certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
35
36 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
37 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
38 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
39
40
41 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
42     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
43         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
44
45     sp = dockerRequirement["dockerImageId"].split(":")
46     image_name = sp[0]
47     image_tag = sp[1] if len(sp) > 1 else None
48
49     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
50                                                             image_name=image_name,
51                                                             image_tag=image_tag)
52
53     if not images:
54         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
55         args = ["--project-uuid="+project_uuid, image_name]
56         if image_tag:
57             args.append(image_tag)
58         logger.info("Uploading Docker image %s", ":".join(args[1:]))
59         arvados.commands.keepdocker.main(args)
60
61     return dockerRequirement["dockerImageId"]
62
63
64 class CollectionFsAccess(cwltool.process.StdFsAccess):
65     def __init__(self, basedir):
66         self.collections = {}
67         self.basedir = basedir
68
69     def get_collection(self, path):
70         p = path.split("/")
71         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
72             pdh = p[0][5:]
73             if pdh not in self.collections:
74                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
75             return (self.collections[pdh], "/".join(p[1:]))
76         else:
77             return (None, path)
78
79     def _match(self, collection, patternsegments, parent):
80         if not patternsegments:
81             return []
82
83         if not isinstance(collection, arvados.collection.RichCollectionBase):
84             return []
85
86         ret = []
87         # iterate over the files and subcollections in 'collection'
88         for filename in collection:
89             if patternsegments[0] == '.':
90                 # Pattern contains something like "./foo" so just shift
91                 # past the "./"
92                 ret.extend(self._match(collection, patternsegments[1:], parent))
93             elif fnmatch.fnmatch(filename, patternsegments[0]):
94                 cur = os.path.join(parent, filename)
95                 if len(patternsegments) == 1:
96                     ret.append(cur)
97                 else:
98                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
99         return ret
100
101     def glob(self, pattern):
102         collection, rest = self.get_collection(pattern)
103         patternsegments = rest.split("/")
104         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
105
106     def open(self, fn, mode):
107         collection, rest = self.get_collection(fn)
108         if collection:
109             return collection.open(rest, mode)
110         else:
111             return open(self._abs(fn), mode)
112
113     def exists(self, fn):
114         collection, rest = self.get_collection(fn)
115         if collection:
116             return collection.exists(rest)
117         else:
118             return os.path.exists(self._abs(fn))
119
120 class ArvadosJob(object):
121     def __init__(self, runner):
122         self.arvrunner = runner
123         self.running = False
124
125     def run(self, dry_run=False, pull_image=True, **kwargs):
126         script_parameters = {
127             "command": self.command_line
128         }
129         runtime_constraints = {}
130
131         if self.generatefiles:
132             vwd = arvados.collection.Collection()
133             script_parameters["task.vwd"] = {}
134             for t in self.generatefiles:
135                 if isinstance(self.generatefiles[t], dict):
136                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
137                     vwd.copy(rest, t, source_collection=src)
138                 else:
139                     with vwd.open(t, "w") as f:
140                         f.write(self.generatefiles[t])
141             vwd.save_new()
142             for t in self.generatefiles:
143                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
144
145         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
146         if self.environment:
147             script_parameters["task.env"].update(self.environment)
148
149         if self.stdin:
150             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
151
152         if self.stdout:
153             script_parameters["task.stdout"] = self.stdout
154
155         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
156         if docker_req and kwargs.get("use_container") is not False:
157             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
158
159         resources = self.builder.resources
160         if resources is not None:
161             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
162             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
163             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
164
165         try:
166             response = self.arvrunner.api.jobs().create(body={
167                 "owner_uuid": self.arvrunner.project_uuid,
168                 "script": "crunchrunner",
169                 "repository": "arvados",
170                 "script_version": "master",
171                 "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
172                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
173                 "runtime_constraints": runtime_constraints
174             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
175
176             self.arvrunner.jobs[response["uuid"]] = self
177
178             self.arvrunner.pipeline["components"][self.name] = {"job": response}
179             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
180                                                                                      body={
181                                                                                          "components": self.arvrunner.pipeline["components"]
182                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
183
184             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
185
186             if response["state"] in ("Complete", "Failed", "Cancelled"):
187                 self.done(response)
188         except Exception as e:
189             logger.error("Got error %s" % str(e))
190             self.output_callback({}, "permanentFail")
191
192     def update_pipeline_component(self, record):
193         self.arvrunner.pipeline["components"][self.name] = {"job": record}
194         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
195                                                                                  body={
196                                                                                     "components": self.arvrunner.pipeline["components"]
197                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
198
199     def done(self, record):
200         try:
201             self.update_pipeline_component(record)
202         except:
203             pass
204
205         try:
206             if record["state"] == "Complete":
207                 processStatus = "success"
208             else:
209                 processStatus = "permanentFail"
210
211             try:
212                 outputs = {}
213                 if record["output"]:
214                     logc = arvados.collection.Collection(record["log"])
215                     log = logc.open(logc.keys()[0])
216                     tmpdir = None
217                     outdir = None
218                     keepdir = None
219                     for l in log:
220                         # Determine the tmpdir, outdir and keepdir paths from
221                         # the job run.  Unfortunately, we can't take the first
222                         # values we find (which are expected to be near the
223                         # top) and stop scanning because if the node fails and
224                         # the job restarts on a different node these values
225                         # will different runs, and we need to know about the
226                         # final run that actually produced output.
227
228                         g = tmpdirre.match(l)
229                         if g:
230                             tmpdir = g.group(1)
231                         g = outdirre.match(l)
232                         if g:
233                             outdir = g.group(1)
234                         g = keepre.match(l)
235                         if g:
236                             keepdir = g.group(1)
237
238                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
239
240                     # check if collection already exists with same owner, name and content
241                     collection_exists = self.arvrunner.api.collections().list(
242                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
243                                  ['portable_data_hash', '=', record["output"]],
244                                  ["name", "=", colname]]
245                     ).execute(num_retries=self.arvrunner.num_retries)
246
247                     if not collection_exists["items"]:
248                         # Create a collection located in the same project as the
249                         # pipeline with the contents of the output.
250                         # First, get output record.
251                         collections = self.arvrunner.api.collections().list(
252                             limit=1,
253                             filters=[['portable_data_hash', '=', record["output"]]],
254                             select=["manifest_text"]
255                         ).execute(num_retries=self.arvrunner.num_retries)
256
257                         if not collections["items"]:
258                             raise WorkflowException(
259                                 "Job output '%s' cannot be found on API server" % (
260                                     record["output"]))
261
262                         # Create new collection in the parent project
263                         # with the output contents.
264                         self.arvrunner.api.collections().create(body={
265                             "owner_uuid": self.arvrunner.project_uuid,
266                             "name": colname,
267                             "portable_data_hash": record["output"],
268                             "manifest_text": collections["items"][0]["manifest_text"]
269                         }, ensure_unique_name=True).execute(
270                             num_retries=self.arvrunner.num_retries)
271
272                     self.builder.outdir = outdir
273                     self.builder.pathmapper.keepdir = keepdir
274                     outputs = self.collect_outputs("keep:" + record["output"])
275             except WorkflowException as e:
276                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
277                 processStatus = "permanentFail"
278             except Exception as e:
279                 logger.exception("Got unknown exception while collecting job outputs:")
280                 processStatus = "permanentFail"
281
282             self.output_callback(outputs, processStatus)
283         finally:
284             del self.arvrunner.jobs[record["uuid"]]
285
286
287 class RunnerJob(object):
288     def __init__(self, runner, tool, job_order, enable_reuse):
289         self.arvrunner = runner
290         self.tool = tool
291         self.job_order = job_order
292         self.running = False
293         self.enable_reuse = enable_reuse
294
295     def update_pipeline_component(self, record):
296         pass
297
298     def upload_docker(self, tool):
299         if isinstance(tool, cwltool.draft2tool.CommandLineTool):
300             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
301             if docker_req:
302                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
303         elif isinstance(tool, cwltool.workflow.Workflow):
304             for s in tool.steps:
305                 self.upload_docker(s.embedded_tool)
306
307     def run(self, dry_run=False, pull_image=True, **kwargs):
308         self.upload_docker(self.tool)
309
310         workflowfiles = set()
311         jobfiles = set()
312         workflowfiles.add(self.tool.tool["id"])
313
314         self.name = os.path.basename(self.tool.tool["id"])
315
316         def visitFiles(files, path):
317             files.add(path)
318             return path
319
320         document_loader, _, _ = cwltool.process.get_schema()
321         def loadref(b, u):
322             return document_loader.resolve_ref(u, base_url=b)[0]
323
324         sc = scandeps("", self.tool.tool,
325                       set(("$import", "run")),
326                       set(("$include", "$schemas", "path")),
327                       loadref)
328         adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
329         adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
330
331         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
332                                        "%s",
333                                        "%s/%s",
334                                        name=self.name,
335                                        **kwargs)
336
337         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
338                                   "%s",
339                                   "%s/%s",
340                                   name=os.path.basename(self.job_order.get("id", "#")),
341                                   **kwargs)
342
343         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
344
345         if "id" in self.job_order:
346             del self.job_order["id"]
347
348         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
349
350         response = self.arvrunner.api.jobs().create(body={
351             "script": "cwl-runner",
352             "script_version": "8654-arv-jobs-cwl-runner",
353             "repository": "arvados",
354             "script_parameters": self.job_order,
355             "runtime_constraints": {
356                 "docker_image": "arvados/jobs"
357             }
358         }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
359
360         self.arvrunner.jobs[response["uuid"]] = self
361
362         logger.info("Submitted job %s", response["uuid"])
363
364         if response["state"] in ("Complete", "Failed", "Cancelled"):
365             self.done(response)
366
367     def done(self, record):
368         if record["state"] == "Complete":
369             processStatus = "success"
370         else:
371             processStatus = "permanentFail"
372
373         outputs = None
374         try:
375             try:
376                 outc = arvados.collection.Collection(record["output"])
377                 with outc.open("cwl.output.json") as f:
378                     outputs = json.load(f)
379             except Exception as e:
380                 logger.error("While getting final output object: %s", e)
381             self.arvrunner.output_callback(outputs, processStatus)
382         finally:
383             del self.arvrunner.jobs[record["uuid"]]
384
385 class ArvPathMapper(cwltool.pathmapper.PathMapper):
386     def __init__(self, arvrunner, referenced_files, basedir,
387                  collection_pattern, file_pattern, name=None, **kwargs):
388         self._pathmap = arvrunner.get_uploaded()
389         uploadfiles = set()
390
391         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
392
393         for src in referenced_files:
394             if isinstance(src, basestring) and pdh_path.match(src):
395                 self._pathmap[src] = (src, collection_pattern % src[5:])
396             if "#" in src:
397                 src = src[:src.index("#")]
398             if src not in self._pathmap:
399                 ab = cwltool.pathmapper.abspath(src, basedir)
400                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
401                 if kwargs.get("conformance_test"):
402                     self._pathmap[src] = (src, ab)
403                 elif isinstance(st, arvados.commands.run.UploadFile):
404                     uploadfiles.add((src, ab, st))
405                 elif isinstance(st, arvados.commands.run.ArvFile):
406                     self._pathmap[src] = (ab, st.fn)
407                 else:
408                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
409
410         if uploadfiles:
411             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
412                                              arvrunner.api,
413                                              dry_run=kwargs.get("dry_run"),
414                                              num_retries=3,
415                                              fnPattern=file_pattern,
416                                              name=name,
417                                              project=arvrunner.project_uuid)
418
419         for src, ab, st in uploadfiles:
420             arvrunner.add_uploaded(src, (ab, st.fn))
421             self._pathmap[src] = (ab, st.fn)
422
423         self.keepdir = None
424
425     def reversemap(self, target):
426         if target.startswith("keep:"):
427             return (target, target)
428         elif self.keepdir and target.startswith(self.keepdir):
429             return (target, "keep:" + target[len(self.keepdir)+1:])
430         else:
431             return super(ArvPathMapper, self).reversemap(target)
432
433
434 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
435     def __init__(self, arvrunner, toolpath_object, **kwargs):
436         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
437         self.arvrunner = arvrunner
438
439     def makeJobRunner(self):
440         return ArvadosJob(self.arvrunner)
441
442     def makePathMapper(self, reffiles, input_basedir, **kwargs):
443         return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
444                              "$(task.keep)/%s",
445                              "$(task.keep)/%s/%s",
446                              **kwargs)
447
448
449 class ArvCwlRunner(object):
450     def __init__(self, api_client):
451         self.api = api_client
452         self.jobs = {}
453         self.lock = threading.Lock()
454         self.cond = threading.Condition(self.lock)
455         self.final_output = None
456         self.uploaded = {}
457         self.num_retries = 4
458
459     def arvMakeTool(self, toolpath_object, **kwargs):
460         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
461             return ArvadosCommandTool(self, toolpath_object, **kwargs)
462         else:
463             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
464
465     def output_callback(self, out, processStatus):
466         if processStatus == "success":
467             logger.info("Overall job status is %s", processStatus)
468             if self.pipeline:
469                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
470                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
471
472         else:
473             logger.warn("Overall job status is %s", processStatus)
474             if self.pipeline:
475                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
476                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
477         self.final_output = out
478
479
480     def on_message(self, event):
481         if "object_uuid" in event:
482             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
483                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
484                     uuid = event["object_uuid"]
485                     with self.lock:
486                         j = self.jobs[uuid]
487                         logger.info("Job %s (%s) is Running", j.name, uuid)
488                         j.running = True
489                         j.update_pipeline_component(event["properties"]["new_attributes"])
490                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
491                     uuid = event["object_uuid"]
492                     try:
493                         self.cond.acquire()
494                         j = self.jobs[uuid]
495                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
496                         j.done(event["properties"]["new_attributes"])
497                         self.cond.notify()
498                     finally:
499                         self.cond.release()
500
501     def get_uploaded(self):
502         return self.uploaded.copy()
503
504     def add_uploaded(self, src, pair):
505         self.uploaded[src] = pair
506
507     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
508         self.debug = args.debug
509
510         if args.quiet:
511             logger.setLevel(logging.WARN)
512             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
513
514         try:
515             self.api.collections().get(uuid=crunchrunner_pdh).execute()
516         except arvados.errors.ApiError as e:
517             import httplib2
518             h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
519             resp, content = h.request(crunchrunner_download, "GET")
520             resp2, content2 = h.request(certs_download, "GET")
521             with arvados.collection.Collection() as col:
522                 with col.open("crunchrunner", "w") as f:
523                     f.write(content)
524                 with col.open("ca-certificates.crt", "w") as f:
525                     f.write(content2)
526
527                 col.save_new("crunchrunner binary", ensure_unique_name=True)
528
529         useruuid = self.api.users().current().execute()["uuid"]
530         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
531         self.pipeline = None
532
533         if args.submit:
534             runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
535             if not args.wait:
536                 runnerjob.run()
537                 return
538
539         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
540
541         self.fs_access = CollectionFsAccess(input_basedir)
542
543         kwargs["fs_access"] = self.fs_access
544         kwargs["enable_reuse"] = args.enable_reuse
545
546         kwargs["outdir"] = "$(task.outdir)"
547         kwargs["tmpdir"] = "$(task.tmpdir)"
548
549         if kwargs.get("conformance_test"):
550             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
551         else:
552             if args.submit:
553                 jobiter = iter((runnerjob,))
554             else:
555                 components = {}
556                 if "cwl_runner_job" in kwargs:
557                     components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
558
559                 self.pipeline = self.api.pipeline_instances().create(
560                     body={
561                         "owner_uuid": self.project_uuid,
562                         "name": shortname(tool.tool["id"]),
563                         "components": components,
564                         "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
565
566                 logger.info("Pipeline instance %s", self.pipeline["uuid"])
567
568                 jobiter = tool.job(job_order,
569                                    input_basedir,
570                                    self.output_callback,
571                                    docker_outdir="$(task.outdir)",
572                                    **kwargs)
573
574             try:
575                 self.cond.acquire()
576                 # Will continue to hold the lock for the duration of this code
577                 # except when in cond.wait(), at which point on_message can update
578                 # job state and process output callbacks.
579
580                 for runnable in jobiter:
581                     if runnable:
582                         runnable.run(**kwargs)
583                     else:
584                         if self.jobs:
585                             self.cond.wait(1)
586                         else:
587                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
588                             break
589
590                 while self.jobs:
591                     self.cond.wait(1)
592
593                 events.close()
594
595                 if self.final_output is None:
596                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
597
598                 # create final output collection
599             except:
600                 if sys.exc_info()[0] is KeyboardInterrupt:
601                     logger.error("Interrupted, marking pipeline as failed")
602                 else:
603                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
604                 if self.pipeline:
605                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
606                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
607             finally:
608                 self.cond.release()
609
610             return self.final_output
611
612 def versionstring():
613     cwlpkg = pkg_resources.require("cwltool")
614     arvpkg = pkg_resources.require("arvados-python-client")
615     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
616
617     return "%s %s, %s %s, %s %s" % (sys.argv[0],
618                                         "arvados-cwl-runner", arvcwlpkg[0].version,
619                                         "arvados-python-client", cwlpkg[0].version,
620                                         "cwltool", arvpkg[0].version)
621
622 def main(args, stdout, stderr, api_client=None):
623     args.insert(0, "--leave-outputs")
624     parser = cwltool.main.arg_parser()
625
626     exgroup = parser.add_mutually_exclusive_group()
627     exgroup.add_argument("--enable-reuse", action="store_true",
628                         default=True, dest="enable_reuse",
629                         help="")
630     exgroup.add_argument("--disable-reuse", action="store_false",
631                         default=True, dest="enable_reuse",
632                         help="")
633
634     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
635
636     exgroup = parser.add_mutually_exclusive_group()
637     exgroup.add_argument("--submit", action="store_true", help="Submit runner job so workflow can run unattended.",
638                         default=True, dest="submit")
639     exgroup.add_argument("--local", action="store_false", help="Workflow runner runs on local host and submits jobs.",
640                         default=True, dest="submit")
641
642     exgroup = parser.add_mutually_exclusive_group()
643     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
644                         default=True, dest="wait")
645     exgroup.add_argument("--no-wait", action="store_false", help="Exit after submitting workflow runner job.",
646                         default=True, dest="wait")
647
648     try:
649         if api_client is None:
650             api_client=arvados.api('v1', model=OrderedJsonModel())
651         runner = ArvCwlRunner(api_client)
652     except Exception as e:
653         logger.error(e)
654         return 1
655
656     return cwltool.main.main(args,
657                              stdout=stdout,
658                              stderr=stderr,
659                              executor=runner.arvExecutor,
660                              makeTool=runner.arvMakeTool,
661                              parser=parser,
662                              versionfunc=versionstring)