8442: Setting up mount points works. Capturing output works.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import arvados
7 import arvados.collection
8 import arvados.commands.keepdocker
9 import arvados.commands.run
10 import arvados.events
11 import arvados.util
12 import copy
13 import cwltool.docker
14 from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18 import fnmatch
19 from functools import partial
20 import json
21 import logging
22 import os
23 import pkg_resources  # part of setuptools
24 import re
25 import sys
26 import threading
27 from cwltool.load_tool import fetch_document
28 from cwltool.builder import Builder
29 import urlparse
30 from .arvcontainer import ArvadosContainer
31 from .arvjob import ArvadosJob
32 from .arvdocker import arv_docker_get_image
33
34 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
35 from arvados.api import OrderedJsonModel
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 logger.setLevel(logging.INFO)
39
40 class CollectionFsAccess(cwltool.process.StdFsAccess):
41     """Implement the cwltool FsAccess interface for Arvados Collections."""
42
43     def __init__(self, basedir):
44         super(CollectionFsAccess, self).__init__(basedir)
45         self.collections = {}
46
47     def get_collection(self, path):
48         p = path.split("/")
49         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
50             pdh = p[0][5:]
51             if pdh not in self.collections:
52                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
53             return (self.collections[pdh], "/".join(p[1:]))
54         else:
55             return (None, path)
56
57     def _match(self, collection, patternsegments, parent):
58         if not patternsegments:
59             return []
60
61         if not isinstance(collection, arvados.collection.RichCollectionBase):
62             return []
63
64         ret = []
65         # iterate over the files and subcollections in 'collection'
66         for filename in collection:
67             if patternsegments[0] == '.':
68                 # Pattern contains something like "./foo" so just shift
69                 # past the "./"
70                 ret.extend(self._match(collection, patternsegments[1:], parent))
71             elif fnmatch.fnmatch(filename, patternsegments[0]):
72                 cur = os.path.join(parent, filename)
73                 if len(patternsegments) == 1:
74                     ret.append(cur)
75                 else:
76                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
77         return ret
78
79     def glob(self, pattern):
80         collection, rest = self.get_collection(pattern)
81         patternsegments = rest.split("/")
82         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
83
84     def open(self, fn, mode):
85         collection, rest = self.get_collection(fn)
86         if collection:
87             return collection.open(rest, mode)
88         else:
89             return open(self._abs(fn), mode)
90
91     def exists(self, fn):
92         collection, rest = self.get_collection(fn)
93         if collection:
94             return collection.exists(rest)
95         else:
96             return os.path.exists(self._abs(fn))
97
98
99
100 class RunnerJob(object):
101     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
102
103     def __init__(self, runner, tool, job_order, enable_reuse):
104         self.arvrunner = runner
105         self.tool = tool
106         self.job_order = job_order
107         self.running = False
108         self.enable_reuse = enable_reuse
109
110     def update_pipeline_component(self, record):
111         pass
112
113     def upload_docker(self, tool):
114         if isinstance(tool, CommandLineTool):
115             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
116             if docker_req:
117                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
118         elif isinstance(tool, cwltool.workflow.Workflow):
119             for s in tool.steps:
120                 self.upload_docker(s.embedded_tool)
121
122     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
123         """Create an Arvados job specification for this workflow.
124
125         The returned dict can be used to create a job (i.e., passed as
126         the +body+ argument to jobs().create()), or as a component in
127         a pipeline template or pipeline instance.
128         """
129         self.upload_docker(self.tool)
130
131         workflowfiles = set()
132         jobfiles = set()
133         workflowfiles.add(self.tool.tool["id"])
134
135         self.name = os.path.basename(self.tool.tool["id"])
136
137         def visitFiles(files, path):
138             files.add(path)
139             return path
140
141         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
142         def loadref(b, u):
143             return document_loader.fetch(urlparse.urljoin(b, u))
144
145         sc = scandeps(uri, workflowobj,
146                       set(("$import", "run")),
147                       set(("$include", "$schemas", "path")),
148                       loadref)
149         adjustFiles(sc, partial(visitFiles, workflowfiles))
150         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
151
152         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
153                                        "%s",
154                                        "%s/%s",
155                                        name=self.name,
156                                        **kwargs)
157
158         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
159                                   "%s",
160                                   "%s/%s",
161                                   name=os.path.basename(self.job_order.get("id", "#")),
162                                   **kwargs)
163
164         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
165
166         if "id" in self.job_order:
167             del self.job_order["id"]
168
169         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
170         return {
171             "script": "cwl-runner",
172             "script_version": "master",
173             "repository": "arvados",
174             "script_parameters": self.job_order,
175             "runtime_constraints": {
176                 "docker_image": "arvados/jobs"
177             }
178         }
179
180     def run(self, *args, **kwargs):
181         job_spec = self.arvados_job_spec(*args, **kwargs)
182         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
183
184         response = self.arvrunner.api.jobs().create(
185             body=job_spec,
186             find_or_create=self.enable_reuse
187         ).execute(num_retries=self.arvrunner.num_retries)
188
189         self.uuid = response["uuid"]
190         self.arvrunner.jobs[self.uuid] = self
191
192         logger.info("Submitted job %s", response["uuid"])
193
194         if kwargs.get("submit"):
195             self.pipeline = self.arvrunner.api.pipeline_instances().create(
196                 body={
197                     "owner_uuid": self.arvrunner.project_uuid,
198                     "name": shortname(self.tool.tool["id"]),
199                     "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
200                     "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
201
202         if response["state"] in ("Complete", "Failed", "Cancelled"):
203             self.done(response)
204
205     def done(self, record):
206         if record["state"] == "Complete":
207             processStatus = "success"
208         else:
209             processStatus = "permanentFail"
210
211         outputs = None
212         try:
213             try:
214                 outc = arvados.collection.Collection(record["output"])
215                 with outc.open("cwl.output.json") as f:
216                     outputs = json.load(f)
217                 def keepify(path):
218                     if not path.startswith("keep:"):
219                         return "keep:%s/%s" % (record["output"], path)
220                 adjustFiles(outputs, keepify)
221             except Exception as e:
222                 logger.error("While getting final output object: %s", e)
223             self.arvrunner.output_callback(outputs, processStatus)
224         finally:
225             del self.arvrunner.jobs[record["uuid"]]
226
227
228 class RunnerTemplate(object):
229     """An Arvados pipeline template that invokes a CWL workflow."""
230
231     type_to_dataclass = {
232         'boolean': 'boolean',
233         'File': 'File',
234         'float': 'number',
235         'int': 'number',
236         'string': 'text',
237     }
238
239     def __init__(self, runner, tool, job_order, enable_reuse):
240         self.runner = runner
241         self.tool = tool
242         self.job = RunnerJob(
243             runner=runner,
244             tool=tool,
245             job_order=job_order,
246             enable_reuse=enable_reuse)
247
248     def pipeline_component_spec(self):
249         """Return a component that Workbench and a-r-p-i will understand.
250
251         Specifically, translate CWL input specs to Arvados pipeline
252         format, like {"dataclass":"File","value":"xyz"}.
253         """
254         spec = self.job.arvados_job_spec()
255
256         # Most of the component spec is exactly the same as the job
257         # spec (script, script_version, etc.).
258         # spec['script_parameters'] isn't right, though. A component
259         # spec's script_parameters hash is a translation of
260         # self.tool.tool['inputs'] with defaults/overrides taken from
261         # the job order. So we move the job parameters out of the way
262         # and build a new spec['script_parameters'].
263         job_params = spec['script_parameters']
264         spec['script_parameters'] = {}
265
266         for param in self.tool.tool['inputs']:
267             param = copy.deepcopy(param)
268
269             # Data type and "required" flag...
270             types = param['type']
271             if not isinstance(types, list):
272                 types = [types]
273             param['required'] = 'null' not in types
274             non_null_types = set(types) - set(['null'])
275             if len(non_null_types) == 1:
276                 the_type = [c for c in non_null_types][0]
277                 dataclass = self.type_to_dataclass.get(the_type)
278                 if dataclass:
279                     param['dataclass'] = dataclass
280             # Note: If we didn't figure out a single appropriate
281             # dataclass, we just left that attribute out.  We leave
282             # the "type" attribute there in any case, which might help
283             # downstream.
284
285             # Title and description...
286             title = param.pop('label', '')
287             descr = param.pop('description', '').rstrip('\n')
288             if title:
289                 param['title'] = title
290             if descr:
291                 param['description'] = descr
292
293             # Fill in the value from the current job order, if any.
294             param_id = shortname(param.pop('id'))
295             value = job_params.get(param_id)
296             if value is None:
297                 pass
298             elif not isinstance(value, dict):
299                 param['value'] = value
300             elif param.get('dataclass') == 'File' and value.get('path'):
301                 param['value'] = value['path']
302
303             spec['script_parameters'][param_id] = param
304         spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
305         return spec
306
307     def save(self):
308         job_spec = self.pipeline_component_spec()
309         response = self.runner.api.pipeline_templates().create(body={
310             "components": {
311                 self.job.name: job_spec,
312             },
313             "name": self.job.name,
314             "owner_uuid": self.runner.project_uuid,
315         }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
316         self.uuid = response["uuid"]
317         logger.info("Created template %s", self.uuid)
318
319
320 class ArvPathMapper(cwltool.pathmapper.PathMapper):
321     """Convert container-local paths to and from Keep collection ids."""
322
323     def __init__(self, arvrunner, referenced_files, input_basedir,
324                  collection_pattern, file_pattern, name=None, **kwargs):
325         self._pathmap = arvrunner.get_uploaded()
326         uploadfiles = set()
327
328         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
329
330         for src in referenced_files:
331             if isinstance(src, basestring) and pdh_path.match(src):
332                 self._pathmap[src] = (src, collection_pattern % src[5:])
333             if "#" in src:
334                 src = src[:src.index("#")]
335             if src not in self._pathmap:
336                 ab = cwltool.pathmapper.abspath(src, input_basedir)
337                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
338                 if kwargs.get("conformance_test"):
339                     self._pathmap[src] = (src, ab)
340                 elif isinstance(st, arvados.commands.run.UploadFile):
341                     uploadfiles.add((src, ab, st))
342                 elif isinstance(st, arvados.commands.run.ArvFile):
343                     self._pathmap[src] = (ab, st.fn)
344                 else:
345                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
346
347         if uploadfiles:
348             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
349                                              arvrunner.api,
350                                              dry_run=kwargs.get("dry_run"),
351                                              num_retries=3,
352                                              fnPattern=file_pattern,
353                                              name=name,
354                                              project=arvrunner.project_uuid)
355
356         for src, ab, st in uploadfiles:
357             arvrunner.add_uploaded(src, (ab, st.fn))
358             self._pathmap[src] = (ab, st.fn)
359
360         self.keepdir = None
361
362     def reversemap(self, target):
363         if target.startswith("keep:"):
364             return (target, target)
365         elif self.keepdir and target.startswith(self.keepdir):
366             return (target, "keep:" + target[len(self.keepdir)+1:])
367         else:
368             return super(ArvPathMapper, self).reversemap(target)
369
370
371 class ArvadosCommandTool(CommandLineTool):
372     """Wrap cwltool CommandLineTool to override selected methods."""
373
374     def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
375         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
376         self.arvrunner = arvrunner
377         self.crunch2 = crunch2
378
379     def makeJobRunner(self):
380         if self.crunch2:
381             return ArvadosContainer(self.arvrunner)
382         else:
383             return ArvadosJob(self.arvrunner)
384
385     def makePathMapper(self, reffiles, **kwargs):
386         if self.crunch2:
387             return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
388                                  "/keep/%s",
389                                  "/keep/%s/%s",
390                                  **kwargs)
391         else:
392             return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
393                                  "$(task.keep)/%s",
394                                  "$(task.keep)/%s/%s",
395                                  **kwargs)
396
397
398 class ArvCwlRunner(object):
399     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
400     complete, and report output."""
401
402     def __init__(self, api_client, crunch2):
403         self.api = api_client
404         self.jobs = {}
405         self.lock = threading.Lock()
406         self.cond = threading.Condition(self.lock)
407         self.final_output = None
408         self.uploaded = {}
409         self.num_retries = 4
410         self.uuid = None
411         self.crunch2 = crunch2
412
413     def arvMakeTool(self, toolpath_object, **kwargs):
414         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
415             return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
416         else:
417             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
418
419     def output_callback(self, out, processStatus):
420         if processStatus == "success":
421             logger.info("Overall job status is %s", processStatus)
422             if self.pipeline:
423                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
424                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
425
426         else:
427             logger.warn("Overall job status is %s", processStatus)
428             if self.pipeline:
429                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
430                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
431         self.final_output = out
432
433     def on_message(self, event):
434         if "object_uuid" in event:
435             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
436                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
437                     uuid = event["object_uuid"]
438                     with self.lock:
439                         j = self.jobs[uuid]
440                         logger.info("Job %s (%s) is Running", j.name, uuid)
441                         j.running = True
442                         j.update_pipeline_component(event["properties"]["new_attributes"])
443                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
444                     uuid = event["object_uuid"]
445                     try:
446                         self.cond.acquire()
447                         j = self.jobs[uuid]
448                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
449                         j.done(event["properties"]["new_attributes"])
450                         self.cond.notify()
451                     finally:
452                         self.cond.release()
453
454     def get_uploaded(self):
455         return self.uploaded.copy()
456
457     def add_uploaded(self, src, pair):
458         self.uploaded[src] = pair
459
460     def arvExecutor(self, tool, job_order, **kwargs):
461         self.debug = kwargs.get("debug")
462
463         if kwargs.get("quiet"):
464             logger.setLevel(logging.WARN)
465             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
466
467         useruuid = self.api.users().current().execute()["uuid"]
468         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
469         self.pipeline = None
470
471         if kwargs.get("create_template"):
472             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
473             tmpl.save()
474             # cwltool.main will write our return value to stdout.
475             return tmpl.uuid
476
477         if kwargs.get("submit"):
478             runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
479
480         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs:
481             # Create pipeline for local run
482             self.pipeline = self.api.pipeline_instances().create(
483                 body={
484                     "owner_uuid": self.project_uuid,
485                     "name": shortname(tool.tool["id"]),
486                     "components": {},
487                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
488             logger.info("Pipeline instance %s", self.pipeline["uuid"])
489
490         if kwargs.get("submit") and not kwargs.get("wait"):
491                 runnerjob.run()
492                 return runnerjob.uuid
493
494         if self.crunch2:
495             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
496         else:
497             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
498
499         self.debug = kwargs.get("debug")
500         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
501         self.fs_access = CollectionFsAccess(kwargs["basedir"])
502
503         kwargs["fs_access"] = self.fs_access
504         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
505
506         if self.crunch2:
507             kwargs["outdir"] = "/var/spool/cwl"
508             kwargs["tmpdir"] = "/tmp"
509         else:
510             kwargs["outdir"] = "$(task.outdir)"
511             kwargs["tmpdir"] = "$(task.tmpdir)"
512
513         if kwargs.get("conformance_test"):
514             return cwltool.main.single_job_executor(tool, job_order, **kwargs)
515         else:
516             if kwargs.get("submit"):
517                 jobiter = iter((runnerjob,))
518             else:
519                 if "cwl_runner_job" in kwargs:
520                     self.uuid = kwargs.get("cwl_runner_job").get('uuid')
521                 jobiter = tool.job(job_order,
522                                    self.output_callback,
523                                    docker_outdir="$(task.outdir)",
524                                    **kwargs)
525
526             try:
527                 self.cond.acquire()
528                 # Will continue to hold the lock for the duration of this code
529                 # except when in cond.wait(), at which point on_message can update
530                 # job state and process output callbacks.
531
532                 for runnable in jobiter:
533                     if runnable:
534                         runnable.run(**kwargs)
535                     else:
536                         if self.jobs:
537                             self.cond.wait(1)
538                         else:
539                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
540                             break
541
542                 while self.jobs:
543                     self.cond.wait(1)
544
545                 events.close()
546             except:
547                 if sys.exc_info()[0] is KeyboardInterrupt:
548                     logger.error("Interrupted, marking pipeline as failed")
549                 else:
550                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
551                 if self.pipeline:
552                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
553                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
554             finally:
555                 self.cond.release()
556
557             if self.final_output is None:
558                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
559
560             return self.final_output
561
562 def versionstring():
563     """Print version string of key packages for provenance and debugging."""
564
565     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
566     arvpkg = pkg_resources.require("arvados-python-client")
567     cwlpkg = pkg_resources.require("cwltool")
568
569     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
570                                     "arvados-python-client", arvpkg[0].version,
571                                     "cwltool", cwlpkg[0].version)
572
573 def arg_parser():  # type: () -> argparse.ArgumentParser
574     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
575
576     parser.add_argument("--conformance-test", action="store_true")
577     parser.add_argument("--basedir", type=str,
578                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
579     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
580                         help="Output directory, default current directory")
581
582     parser.add_argument("--eval-timeout",
583                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
584                         type=float,
585                         default=20)
586     parser.add_argument("--version", action="store_true", help="Print version and exit")
587
588     exgroup = parser.add_mutually_exclusive_group()
589     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
590     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
591     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
592
593     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
594
595     exgroup = parser.add_mutually_exclusive_group()
596     exgroup.add_argument("--enable-reuse", action="store_true",
597                         default=True, dest="enable_reuse",
598                         help="")
599     exgroup.add_argument("--disable-reuse", action="store_false",
600                         default=True, dest="enable_reuse",
601                         help="")
602
603     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
604     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
605                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
606                         default=False)
607
608     exgroup = parser.add_mutually_exclusive_group()
609     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
610                         default=True, dest="submit")
611     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
612                         default=True, dest="submit")
613     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
614
615     exgroup = parser.add_mutually_exclusive_group()
616     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
617                         default=True, dest="wait")
618     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
619                         default=True, dest="wait")
620
621     exgroup = parser.add_mutually_exclusive_group()
622     exgroup.add_argument("--crunch1", action="store_false",
623                         default=False, dest="crunch2",
624                         help="Use Crunch v1 Jobs API")
625
626     exgroup.add_argument("--crunch2", action="store_true",
627                         default=False, dest="crunch2",
628                         help="Use Crunch v2 Containers API")
629
630     parser.add_argument("workflow", type=str, nargs="?", default=None)
631     parser.add_argument("job_order", nargs=argparse.REMAINDER)
632
633     return parser
634
635 def main(args, stdout, stderr, api_client=None):
636     parser = arg_parser()
637
638     job_order_object = None
639     arvargs = parser.parse_args(args)
640     if arvargs.create_template and not arvargs.job_order:
641         job_order_object = ({}, "")
642
643     try:
644         if api_client is None:
645             api_client=arvados.api('v1', model=OrderedJsonModel())
646         runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
647     except Exception as e:
648         logger.error(e)
649         return 1
650
651     return cwltool.main.main(args=arvargs,
652                              stdout=stdout,
653                              stderr=stderr,
654                              executor=runner.arvExecutor,
655                              makeTool=runner.arvMakeTool,
656                              versionfunc=versionstring,
657                              job_order_object=job_order_object)