Merge branch 'master' into 10576-cwl-keep-fetcher
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24 from arvados.keep import KeepClient
25 from arvados.errors import ApiError
26
27 from .arvcontainer import ArvadosContainer, RunnerContainer
28 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
29 from. runner import Runner, upload_instance
30 from .arvtool import ArvadosCommandTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
33 from .perf import Perf
34 from .pathmapper import FinalOutputPathMapper
35 from ._version import __version__
36
37 from cwltool.pack import pack
38 from cwltool.process import shortname, UnsupportedRequirement, getListing
39 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
40 from cwltool.draft2tool import compute_checksums
41 from arvados.api import OrderedJsonModel
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45 logger.setLevel(logging.INFO)
46
47
48 class ArvCwlRunner(object):
49     """Execute a CWL tool or workflow, submit work (using either jobs or
50     containers API), wait for them to complete, and report output.
51
52     """
53
54     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
55         self.api = api_client
56         self.processes = {}
57         self.lock = threading.Lock()
58         self.cond = threading.Condition(self.lock)
59         self.final_output = None
60         self.final_status = None
61         self.uploaded = {}
62         self.num_retries = num_retries
63         self.uuid = None
64         self.stop_polling = threading.Event()
65         self.poll_api = None
66         self.pipeline = None
67         self.final_output_collection = None
68         self.output_name = output_name
69         self.output_tags = output_tags
70         self.project_uuid = None
71
72         if keep_client is not None:
73             self.keep_client = keep_client
74         else:
75             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
76
77         self.work_api = None
78         expected_api = ["jobs", "containers"]
79         for api in expected_api:
80             try:
81                 methods = self.api._rootDesc.get('resources')[api]['methods']
82                 if ('httpMethod' in methods['create'] and
83                     (work_api == api or work_api is None)):
84                     self.work_api = api
85                     break
86             except KeyError:
87                 pass
88
89         if not self.work_api:
90             if work_api is None:
91                 raise Exception("No supported APIs")
92             else:
93                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
94
95     def arv_make_tool(self, toolpath_object, **kwargs):
96         kwargs["work_api"] = self.work_api
97         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
98                                                 api_client=self.api,
99                                                 keep_client=self.keep_client)
100         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
101             return ArvadosCommandTool(self, toolpath_object, **kwargs)
102         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
103             return ArvadosWorkflow(self, toolpath_object, **kwargs)
104         else:
105             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
106
107     def output_callback(self, out, processStatus):
108         if processStatus == "success":
109             logger.info("Overall process status is %s", processStatus)
110             if self.pipeline:
111                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
112                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
113         else:
114             logger.warn("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
118         self.final_status = processStatus
119         self.final_output = out
120
121     def on_message(self, event):
122         if "object_uuid" in event:
123             if event["object_uuid"] in self.processes and event["event_type"] == "update":
124                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
125                     uuid = event["object_uuid"]
126                     with self.lock:
127                         j = self.processes[uuid]
128                         logger.info("Job %s (%s) is Running", j.name, uuid)
129                         j.running = True
130                         j.update_pipeline_component(event["properties"]["new_attributes"])
131                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
132                     uuid = event["object_uuid"]
133                     try:
134                         self.cond.acquire()
135                         j = self.processes[uuid]
136                         txt = self.work_api[0].upper() + self.work_api[1:-1]
137                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
138                         with Perf(metrics, "done %s" % j.name):
139                             j.done(event["properties"]["new_attributes"])
140                         self.cond.notify()
141                     finally:
142                         self.cond.release()
143
144     def poll_states(self):
145         """Poll status of jobs or containers listed in the processes dict.
146
147         Runs in a separate thread.
148         """
149
150         try:
151             while True:
152                 self.stop_polling.wait(15)
153                 if self.stop_polling.is_set():
154                     break
155                 with self.lock:
156                     keys = self.processes.keys()
157                 if not keys:
158                     continue
159
160                 if self.work_api == "containers":
161                     table = self.poll_api.container_requests()
162                 elif self.work_api == "jobs":
163                     table = self.poll_api.jobs()
164
165                 try:
166                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
167                 except Exception as e:
168                     logger.warn("Error checking states on API server: %s", e)
169                     continue
170
171                 for p in proc_states["items"]:
172                     self.on_message({
173                         "object_uuid": p["uuid"],
174                         "event_type": "update",
175                         "properties": {
176                             "new_attributes": p
177                         }
178                     })
179         except:
180             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
181             self.cond.acquire()
182             self.processes.clear()
183             self.cond.notify()
184             self.cond.release()
185         finally:
186             self.stop_polling.set()
187
188     def get_uploaded(self):
189         return self.uploaded.copy()
190
191     def add_uploaded(self, src, pair):
192         self.uploaded[src] = pair
193
194     def check_writable(self, obj):
195         if isinstance(obj, dict):
196             if obj.get("writable"):
197                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
198             for v in obj.itervalues():
199                 self.check_writable(v)
200         if isinstance(obj, list):
201             for v in obj:
202                 self.check_writable(v)
203
204     def make_output_collection(self, name, tagsString, outputObj):
205         outputObj = copy.deepcopy(outputObj)
206
207         files = []
208         def capture(fileobj):
209             files.append(fileobj)
210
211         adjustDirObjs(outputObj, capture)
212         adjustFileObjs(outputObj, capture)
213
214         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
215
216         final = arvados.collection.Collection(api_client=self.api,
217                                               keep_client=self.keep_client,
218                                               num_retries=self.num_retries)
219
220         srccollections = {}
221         for k,v in generatemapper.items():
222             if k.startswith("_:"):
223                 if v.type == "Directory":
224                     continue
225                 if v.type == "CreateFile":
226                     with final.open(v.target, "wb") as f:
227                         f.write(v.resolved.encode("utf-8"))
228                     continue
229
230             if not k.startswith("keep:"):
231                 raise Exception("Output source is not in keep or a literal")
232             sp = k.split("/")
233             srccollection = sp[0][5:]
234             if srccollection not in srccollections:
235                 try:
236                     srccollections[srccollection] = arvados.collection.CollectionReader(
237                         srccollection,
238                         api_client=self.api,
239                         keep_client=self.keep_client,
240                         num_retries=self.num_retries)
241                 except arvados.errors.ArgumentError as e:
242                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
243                     raise
244             reader = srccollections[srccollection]
245             try:
246                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
247                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
248             except IOError as e:
249                 logger.warn("While preparing output collection: %s", e)
250
251         def rewrite(fileobj):
252             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
253             for k in ("basename", "listing", "contents"):
254                 if k in fileobj:
255                     del fileobj[k]
256
257         adjustDirObjs(outputObj, rewrite)
258         adjustFileObjs(outputObj, rewrite)
259
260         with final.open("cwl.output.json", "w") as f:
261             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
262
263         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
264
265         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
266                     final.api_response()["name"],
267                     final.manifest_locator())
268
269         final_uuid = final.manifest_locator()
270         tags = tagsString.split(',')
271         for tag in tags:
272              self.api.links().create(body={
273                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
274                 }).execute(num_retries=self.num_retries)
275
276         def finalcollection(fileobj):
277             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
278
279         adjustDirObjs(outputObj, finalcollection)
280         adjustFileObjs(outputObj, finalcollection)
281
282         return (outputObj, final)
283
284     def set_crunch_output(self):
285         if self.work_api == "containers":
286             try:
287                 current = self.api.containers().current().execute(num_retries=self.num_retries)
288             except ApiError as e:
289                 # Status code 404 just means we're not running in a container.
290                 if e.resp.status != 404:
291                     logger.info("Getting current container: %s", e)
292                 return
293             try:
294                 self.api.containers().update(uuid=current['uuid'],
295                                              body={
296                                                  'output': self.final_output_collection.portable_data_hash(),
297                                              }).execute(num_retries=self.num_retries)
298             except Exception as e:
299                 logger.info("Setting container output: %s", e)
300         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
301             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
302                                    body={
303                                        'output': self.final_output_collection.portable_data_hash(),
304                                        'success': self.final_status == "success",
305                                        'progress':1.0
306                                    }).execute(num_retries=self.num_retries)
307
308     def arv_executor(self, tool, job_order, **kwargs):
309         self.debug = kwargs.get("debug")
310
311         tool.visit(self.check_writable)
312
313         self.project_uuid = kwargs.get("project_uuid")
314         self.pipeline = None
315         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
316                                                                  api_client=self.api,
317                                                                  keep_client=self.keep_client)
318         self.fs_access = make_fs_access(kwargs["basedir"])
319
320         existing_uuid = kwargs.get("update_workflow")
321         if existing_uuid or kwargs.get("create_workflow"):
322             if self.work_api == "jobs":
323                 tmpl = RunnerTemplate(self, tool, job_order,
324                                       kwargs.get("enable_reuse"),
325                                       uuid=existing_uuid,
326                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
327                                       name=kwargs.get("name"))
328                 tmpl.save()
329                 # cwltool.main will write our return value to stdout.
330                 return tmpl.uuid
331             else:
332                 return upload_workflow(self, tool, job_order,
333                                        self.project_uuid,
334                                        uuid=existing_uuid,
335                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
336                                        name=kwargs.get("name"))
337
338         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
339
340         kwargs["make_fs_access"] = make_fs_access
341         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
342         kwargs["use_container"] = True
343         kwargs["tmpdir_prefix"] = "tmp"
344         kwargs["on_error"] = "continue"
345         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
346
347         if self.work_api == "containers":
348             kwargs["outdir"] = "/var/spool/cwl"
349             kwargs["docker_outdir"] = "/var/spool/cwl"
350             kwargs["tmpdir"] = "/tmp"
351             kwargs["docker_tmpdir"] = "/tmp"
352         elif self.work_api == "jobs":
353             kwargs["outdir"] = "$(task.outdir)"
354             kwargs["docker_outdir"] = "$(task.outdir)"
355             kwargs["tmpdir"] = "$(task.tmpdir)"
356
357         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
358
359         runnerjob = None
360         if kwargs.get("submit"):
361             if self.work_api == "containers":
362                 if tool.tool["class"] == "CommandLineTool":
363                     runnerjob = tool.job(job_order,
364                                          self.output_callback,
365                                          **kwargs).next()
366                 else:
367                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
368                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
369                                                 name=kwargs.get("name"))
370             else:
371                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
372                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
373                                       name=kwargs.get("name"))
374
375         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
376             # Create pipeline for local run
377             self.pipeline = self.api.pipeline_instances().create(
378                 body={
379                     "owner_uuid": self.project_uuid,
380                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
381                     "components": {},
382                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
383             logger.info("Pipeline instance %s", self.pipeline["uuid"])
384
385         if runnerjob and not kwargs.get("wait"):
386             runnerjob.run(wait=kwargs.get("wait"))
387             return runnerjob.uuid
388
389         self.poll_api = arvados.api('v1')
390         self.polling_thread = threading.Thread(target=self.poll_states)
391         self.polling_thread.start()
392
393         if runnerjob:
394             jobiter = iter((runnerjob,))
395         else:
396             if "cwl_runner_job" in kwargs:
397                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
398             jobiter = tool.job(job_order,
399                                self.output_callback,
400                                **kwargs)
401
402         try:
403             self.cond.acquire()
404             # Will continue to hold the lock for the duration of this code
405             # except when in cond.wait(), at which point on_message can update
406             # job state and process output callbacks.
407
408             loopperf = Perf(metrics, "jobiter")
409             loopperf.__enter__()
410             for runnable in jobiter:
411                 loopperf.__exit__()
412
413                 if self.stop_polling.is_set():
414                     break
415
416                 if runnable:
417                     with Perf(metrics, "run"):
418                         runnable.run(**kwargs)
419                 else:
420                     if self.processes:
421                         self.cond.wait(1)
422                     else:
423                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
424                         break
425                 loopperf.__enter__()
426             loopperf.__exit__()
427
428             while self.processes:
429                 self.cond.wait(1)
430
431         except UnsupportedRequirement:
432             raise
433         except:
434             if sys.exc_info()[0] is KeyboardInterrupt:
435                 logger.error("Interrupted, marking pipeline as failed")
436             else:
437                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
438             if self.pipeline:
439                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
440                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
441             if runnerjob and runnerjob.uuid and self.work_api == "containers":
442                 self.api.container_requests().update(uuid=runnerjob.uuid,
443                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
444         finally:
445             self.cond.release()
446             self.stop_polling.set()
447             self.polling_thread.join()
448
449         if self.final_status == "UnsupportedRequirement":
450             raise UnsupportedRequirement("Check log for details.")
451
452         if self.final_output is None:
453             raise WorkflowException("Workflow did not return a result.")
454
455         if kwargs.get("submit") and isinstance(runnerjob, Runner):
456             logger.info("Final output collection %s", runnerjob.final_output)
457         else:
458             if self.output_name is None:
459                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
460             if self.output_tags is None:
461                 self.output_tags = ""
462             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
463             self.set_crunch_output()
464
465         if self.final_status != "success":
466             raise WorkflowException("Workflow failed.")
467
468         if kwargs.get("compute_checksum"):
469             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
470             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
471
472         return self.final_output
473
474
475 def versionstring():
476     """Print version string of key packages for provenance and debugging."""
477
478     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
479     arvpkg = pkg_resources.require("arvados-python-client")
480     cwlpkg = pkg_resources.require("cwltool")
481
482     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
483                                     "arvados-python-client", arvpkg[0].version,
484                                     "cwltool", cwlpkg[0].version)
485
486
487 def arg_parser():  # type: () -> argparse.ArgumentParser
488     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
489
490     parser.add_argument("--basedir", type=str,
491                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
492     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
493                         help="Output directory, default current directory")
494
495     parser.add_argument("--eval-timeout",
496                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
497                         type=float,
498                         default=20)
499     parser.add_argument("--version", action="store_true", help="Print version and exit")
500
501     exgroup = parser.add_mutually_exclusive_group()
502     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
503     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
504     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
505
506     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
507
508     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
509
510     exgroup = parser.add_mutually_exclusive_group()
511     exgroup.add_argument("--enable-reuse", action="store_true",
512                         default=True, dest="enable_reuse",
513                         help="")
514     exgroup.add_argument("--disable-reuse", action="store_false",
515                         default=True, dest="enable_reuse",
516                         help="")
517
518     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
519     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
520     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
521     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
522                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
523                         default=False)
524
525     exgroup = parser.add_mutually_exclusive_group()
526     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
527                         default=True, dest="submit")
528     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
529                         default=True, dest="submit")
530     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
531                          dest="create_workflow")
532     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
533     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
534
535     exgroup = parser.add_mutually_exclusive_group()
536     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
537                         default=True, dest="wait")
538     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
539                         default=True, dest="wait")
540
541     parser.add_argument("--api", type=str,
542                         default=None, dest="work_api",
543                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
544
545     parser.add_argument("--compute-checksum", action="store_true", default=False,
546                         help="Compute checksum of contents while collecting outputs",
547                         dest="compute_checksum")
548
549     parser.add_argument("--submit-runner-ram", type=int,
550                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
551                         default=1024)
552
553     parser.add_argument("--name", type=str,
554                         help="Name to use for workflow execution instance.",
555                         default=None)
556
557     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
558     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
559
560     return parser
561
562 def add_arv_hints():
563     cache = {}
564     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
565     cache["http://arvados.org/cwl"] = res.read()
566     res.close()
567     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
568     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
569     for n in extnames.names:
570         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
571             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
572         document_loader.idx["http://arvados.org/cwl#"+n] = {}
573
574 def main(args, stdout, stderr, api_client=None, keep_client=None):
575     parser = arg_parser()
576
577     job_order_object = None
578     arvargs = parser.parse_args(args)
579
580     if arvargs.version:
581         print versionstring()
582         return
583
584     if arvargs.update_workflow:
585         if arvargs.update_workflow.find('-7fd4e-') == 5:
586             want_api = 'containers'
587         elif arvargs.update_workflow.find('-p5p6p-') == 5:
588             want_api = 'jobs'
589         else:
590             want_api = None
591         if want_api and arvargs.work_api and want_api != arvargs.work_api:
592             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
593                 arvargs.update_workflow, want_api, arvargs.work_api))
594             return 1
595         arvargs.work_api = want_api
596
597     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
598         job_order_object = ({}, "")
599
600     add_arv_hints()
601
602     try:
603         if api_client is None:
604             api_client=arvados.api('v1', model=OrderedJsonModel())
605         if keep_client is None:
606             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
607         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
608                               num_retries=4, output_name=arvargs.output_name,
609                               output_tags=arvargs.output_tags)
610     except Exception as e:
611         logger.error(e)
612         return 1
613
614     if arvargs.debug:
615         logger.setLevel(logging.DEBUG)
616
617     if arvargs.quiet:
618         logger.setLevel(logging.WARN)
619         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
620
621     if arvargs.metrics:
622         metrics.setLevel(logging.DEBUG)
623         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
624
625     arvargs.conformance_test = None
626     arvargs.use_container = True
627     arvargs.relax_path_checks = True
628
629     return cwltool.main.main(args=arvargs,
630                              stdout=stdout,
631                              stderr=stderr,
632                              executor=runner.arv_executor,
633                              makeTool=runner.arv_make_tool,
634                              versionfunc=versionstring,
635                              job_order_object=job_order_object,
636                              make_fs_access=partial(CollectionFsAccess,
637                                                     api_client=api_client,
638                                                     keep_client=keep_client),
639                              fetcher_constructor=partial(CollectionFetcher,
640                                                          api_client=api_client,
641                                                          keep_client=keep_client),
642                              resolver=partial(collectionResolver, api_client))