10576: Add resolver to execute from keep references and arvados workflow
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24 from arvados.keep import KeepClient
25 from arvados.errors import ApiError
26
27 from .arvcontainer import ArvadosContainer, RunnerContainer
28 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
29 from. runner import Runner, upload_instance
30 from .arvtool import ArvadosCommandTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
33 from .perf import Perf
34 from .pathmapper import FinalOutputPathMapper
35 from ._version import __version__
36
37 from cwltool.pack import pack
38 from cwltool.process import shortname, UnsupportedRequirement, getListing
39 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
40 from cwltool.draft2tool import compute_checksums
41 from arvados.api import OrderedJsonModel
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45 logger.setLevel(logging.INFO)
46
47
48 class ArvCwlRunner(object):
49     """Execute a CWL tool or workflow, submit work (using either jobs or
50     containers API), wait for them to complete, and report output.
51
52     """
53
54     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
55         self.api = api_client
56         self.processes = {}
57         self.lock = threading.Lock()
58         self.cond = threading.Condition(self.lock)
59         self.final_output = None
60         self.final_status = None
61         self.uploaded = {}
62         self.num_retries = num_retries
63         self.uuid = None
64         self.stop_polling = threading.Event()
65         self.poll_api = None
66         self.pipeline = None
67         self.final_output_collection = None
68         self.output_name = output_name
69         self.output_tags = output_tags
70         self.project_uuid = None
71
72         if keep_client is not None:
73             self.keep_client = keep_client
74         else:
75             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
76
77         self.work_api = None
78         expected_api = ["jobs", "containers"]
79         for api in expected_api:
80             try:
81                 methods = self.api._rootDesc.get('resources')[api]['methods']
82                 if ('httpMethod' in methods['create'] and
83                     (work_api == api or work_api is None)):
84                     self.work_api = api
85                     break
86             except KeyError:
87                 pass
88
89         if not self.work_api:
90             if work_api is None:
91                 raise Exception("No supported APIs")
92             else:
93                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
94
95     def arv_make_tool(self, toolpath_object, **kwargs):
96         kwargs["work_api"] = self.work_api
97         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
98             return ArvadosCommandTool(self, toolpath_object, **kwargs)
99         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
100             return ArvadosWorkflow(self, toolpath_object, **kwargs)
101         else:
102             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
103
104     def output_callback(self, out, processStatus):
105         if processStatus == "success":
106             logger.info("Overall process status is %s", processStatus)
107             if self.pipeline:
108                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
109                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
110         else:
111             logger.warn("Overall process status is %s", processStatus)
112             if self.pipeline:
113                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
114                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
115         self.final_status = processStatus
116         self.final_output = out
117
118     def on_message(self, event):
119         if "object_uuid" in event:
120             if event["object_uuid"] in self.processes and event["event_type"] == "update":
121                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
122                     uuid = event["object_uuid"]
123                     with self.lock:
124                         j = self.processes[uuid]
125                         logger.info("Job %s (%s) is Running", j.name, uuid)
126                         j.running = True
127                         j.update_pipeline_component(event["properties"]["new_attributes"])
128                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
129                     uuid = event["object_uuid"]
130                     try:
131                         self.cond.acquire()
132                         j = self.processes[uuid]
133                         txt = self.work_api[0].upper() + self.work_api[1:-1]
134                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
135                         with Perf(metrics, "done %s" % j.name):
136                             j.done(event["properties"]["new_attributes"])
137                         self.cond.notify()
138                     finally:
139                         self.cond.release()
140
141     def poll_states(self):
142         """Poll status of jobs or containers listed in the processes dict.
143
144         Runs in a separate thread.
145         """
146
147         try:
148             while True:
149                 self.stop_polling.wait(15)
150                 if self.stop_polling.is_set():
151                     break
152                 with self.lock:
153                     keys = self.processes.keys()
154                 if not keys:
155                     continue
156
157                 if self.work_api == "containers":
158                     table = self.poll_api.container_requests()
159                 elif self.work_api == "jobs":
160                     table = self.poll_api.jobs()
161
162                 try:
163                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
164                 except Exception as e:
165                     logger.warn("Error checking states on API server: %s", e)
166                     continue
167
168                 for p in proc_states["items"]:
169                     self.on_message({
170                         "object_uuid": p["uuid"],
171                         "event_type": "update",
172                         "properties": {
173                             "new_attributes": p
174                         }
175                     })
176         except:
177             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
178             self.cond.acquire()
179             self.processes.clear()
180             self.cond.notify()
181             self.cond.release()
182         finally:
183             self.stop_polling.set()
184
185     def get_uploaded(self):
186         return self.uploaded.copy()
187
188     def add_uploaded(self, src, pair):
189         self.uploaded[src] = pair
190
191     def check_writable(self, obj):
192         if isinstance(obj, dict):
193             if obj.get("writable"):
194                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
195             for v in obj.itervalues():
196                 self.check_writable(v)
197         if isinstance(obj, list):
198             for v in obj:
199                 self.check_writable(v)
200
201     def make_output_collection(self, name, tagsString, outputObj):
202         outputObj = copy.deepcopy(outputObj)
203
204         files = []
205         def capture(fileobj):
206             files.append(fileobj)
207
208         adjustDirObjs(outputObj, capture)
209         adjustFileObjs(outputObj, capture)
210
211         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
212
213         final = arvados.collection.Collection(api_client=self.api,
214                                               keep_client=self.keep_client,
215                                               num_retries=self.num_retries)
216
217         srccollections = {}
218         for k,v in generatemapper.items():
219             if k.startswith("_:"):
220                 if v.type == "Directory":
221                     continue
222                 if v.type == "CreateFile":
223                     with final.open(v.target, "wb") as f:
224                         f.write(v.resolved.encode("utf-8"))
225                     continue
226
227             if not k.startswith("keep:"):
228                 raise Exception("Output source is not in keep or a literal")
229             sp = k.split("/")
230             srccollection = sp[0][5:]
231             if srccollection not in srccollections:
232                 try:
233                     srccollections[srccollection] = arvados.collection.CollectionReader(
234                         srccollection,
235                         api_client=self.api,
236                         keep_client=self.keep_client,
237                         num_retries=self.num_retries)
238                 except arvados.errors.ArgumentError as e:
239                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
240                     raise
241             reader = srccollections[srccollection]
242             try:
243                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
244                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
245             except IOError as e:
246                 logger.warn("While preparing output collection: %s", e)
247
248         def rewrite(fileobj):
249             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
250             for k in ("basename", "listing", "contents"):
251                 if k in fileobj:
252                     del fileobj[k]
253
254         adjustDirObjs(outputObj, rewrite)
255         adjustFileObjs(outputObj, rewrite)
256
257         with final.open("cwl.output.json", "w") as f:
258             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
259
260         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
261
262         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
263                     final.api_response()["name"],
264                     final.manifest_locator())
265
266         final_uuid = final.manifest_locator()
267         tags = tagsString.split(',')
268         for tag in tags:
269              self.api.links().create(body={
270                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
271                 }).execute(num_retries=self.num_retries)
272
273         def finalcollection(fileobj):
274             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
275
276         adjustDirObjs(outputObj, finalcollection)
277         adjustFileObjs(outputObj, finalcollection)
278
279         return (outputObj, final)
280
281     def set_crunch_output(self):
282         if self.work_api == "containers":
283             try:
284                 current = self.api.containers().current().execute(num_retries=self.num_retries)
285             except ApiError as e:
286                 # Status code 404 just means we're not running in a container.
287                 if e.resp.status != 404:
288                     logger.info("Getting current container: %s", e)
289                 return
290             try:
291                 self.api.containers().update(uuid=current['uuid'],
292                                              body={
293                                                  'output': self.final_output_collection.portable_data_hash(),
294                                              }).execute(num_retries=self.num_retries)
295             except Exception as e:
296                 logger.info("Setting container output: %s", e)
297         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
298             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
299                                    body={
300                                        'output': self.final_output_collection.portable_data_hash(),
301                                        'success': self.final_status == "success",
302                                        'progress':1.0
303                                    }).execute(num_retries=self.num_retries)
304
305     def arv_executor(self, tool, job_order, **kwargs):
306         self.debug = kwargs.get("debug")
307
308         tool.visit(self.check_writable)
309
310         self.project_uuid = kwargs.get("project_uuid")
311         self.pipeline = None
312         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
313                                                                  api_client=self.api,
314                                                                  keep_client=self.keep_client)
315         self.fs_access = make_fs_access(kwargs["basedir"])
316
317         existing_uuid = kwargs.get("update_workflow")
318         if existing_uuid or kwargs.get("create_workflow"):
319             if self.work_api == "jobs":
320                 tmpl = RunnerTemplate(self, tool, job_order,
321                                       kwargs.get("enable_reuse"),
322                                       uuid=existing_uuid,
323                                       submit_runner_ram=kwargs.get("submit_runner_ram"))
324                 tmpl.save()
325                 # cwltool.main will write our return value to stdout.
326                 return tmpl.uuid
327             else:
328                 return upload_workflow(self, tool, job_order,
329                                        self.project_uuid,
330                                        uuid=existing_uuid,
331                                        submit_runner_ram=kwargs.get("submit_runner_ram"))
332
333         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
334
335         kwargs["make_fs_access"] = make_fs_access
336         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
337         kwargs["use_container"] = True
338         kwargs["tmpdir_prefix"] = "tmp"
339         kwargs["on_error"] = "continue"
340         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
341
342         if self.work_api == "containers":
343             kwargs["outdir"] = "/var/spool/cwl"
344             kwargs["docker_outdir"] = "/var/spool/cwl"
345             kwargs["tmpdir"] = "/tmp"
346             kwargs["docker_tmpdir"] = "/tmp"
347         elif self.work_api == "jobs":
348             kwargs["outdir"] = "$(task.outdir)"
349             kwargs["docker_outdir"] = "$(task.outdir)"
350             kwargs["tmpdir"] = "$(task.tmpdir)"
351
352         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
353
354         runnerjob = None
355         if kwargs.get("submit"):
356             if self.work_api == "containers":
357                 if tool.tool["class"] == "CommandLineTool":
358                     runnerjob = tool.job(job_order,
359                                          self.output_callback,
360                                          **kwargs).next()
361                 else:
362                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
363                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
364             else:
365                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
366                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
367
368         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
369             # Create pipeline for local run
370             self.pipeline = self.api.pipeline_instances().create(
371                 body={
372                     "owner_uuid": self.project_uuid,
373                     "name": shortname(tool.tool["id"]),
374                     "components": {},
375                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
376             logger.info("Pipeline instance %s", self.pipeline["uuid"])
377
378         if runnerjob and not kwargs.get("wait"):
379             runnerjob.run(wait=kwargs.get("wait"))
380             return runnerjob.uuid
381
382         self.poll_api = arvados.api('v1')
383         self.polling_thread = threading.Thread(target=self.poll_states)
384         self.polling_thread.start()
385
386         if runnerjob:
387             jobiter = iter((runnerjob,))
388         else:
389             if "cwl_runner_job" in kwargs:
390                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
391             jobiter = tool.job(job_order,
392                                self.output_callback,
393                                **kwargs)
394
395         try:
396             self.cond.acquire()
397             # Will continue to hold the lock for the duration of this code
398             # except when in cond.wait(), at which point on_message can update
399             # job state and process output callbacks.
400
401             loopperf = Perf(metrics, "jobiter")
402             loopperf.__enter__()
403             for runnable in jobiter:
404                 loopperf.__exit__()
405
406                 if self.stop_polling.is_set():
407                     break
408
409                 if runnable:
410                     with Perf(metrics, "run"):
411                         runnable.run(**kwargs)
412                 else:
413                     if self.processes:
414                         self.cond.wait(1)
415                     else:
416                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
417                         break
418                 loopperf.__enter__()
419             loopperf.__exit__()
420
421             while self.processes:
422                 self.cond.wait(1)
423
424         except UnsupportedRequirement:
425             raise
426         except:
427             if sys.exc_info()[0] is KeyboardInterrupt:
428                 logger.error("Interrupted, marking pipeline as failed")
429             else:
430                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
431             if self.pipeline:
432                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
433                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
434             if runnerjob and runnerjob.uuid and self.work_api == "containers":
435                 self.api.container_requests().update(uuid=runnerjob.uuid,
436                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
437         finally:
438             self.cond.release()
439             self.stop_polling.set()
440             self.polling_thread.join()
441
442         if self.final_status == "UnsupportedRequirement":
443             raise UnsupportedRequirement("Check log for details.")
444
445         if self.final_output is None:
446             raise WorkflowException("Workflow did not return a result.")
447
448         if kwargs.get("submit") and isinstance(runnerjob, Runner):
449             logger.info("Final output collection %s", runnerjob.final_output)
450         else:
451             if self.output_name is None:
452                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
453             if self.output_tags is None:
454                 self.output_tags = ""
455             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
456             self.set_crunch_output()
457
458         if self.final_status != "success":
459             raise WorkflowException("Workflow failed.")
460
461         if kwargs.get("compute_checksum"):
462             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
463             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
464
465         return self.final_output
466
467
468 def versionstring():
469     """Print version string of key packages for provenance and debugging."""
470
471     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
472     arvpkg = pkg_resources.require("arvados-python-client")
473     cwlpkg = pkg_resources.require("cwltool")
474
475     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
476                                     "arvados-python-client", arvpkg[0].version,
477                                     "cwltool", cwlpkg[0].version)
478
479
480 def arg_parser():  # type: () -> argparse.ArgumentParser
481     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
482
483     parser.add_argument("--basedir", type=str,
484                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
485     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
486                         help="Output directory, default current directory")
487
488     parser.add_argument("--eval-timeout",
489                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
490                         type=float,
491                         default=20)
492     parser.add_argument("--version", action="store_true", help="Print version and exit")
493
494     exgroup = parser.add_mutually_exclusive_group()
495     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
496     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
497     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
498
499     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
500
501     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
502
503     exgroup = parser.add_mutually_exclusive_group()
504     exgroup.add_argument("--enable-reuse", action="store_true",
505                         default=True, dest="enable_reuse",
506                         help="")
507     exgroup.add_argument("--disable-reuse", action="store_false",
508                         default=True, dest="enable_reuse",
509                         help="")
510
511     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
512     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
513     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
514     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
515                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
516                         default=False)
517
518     exgroup = parser.add_mutually_exclusive_group()
519     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
520                         default=True, dest="submit")
521     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
522                         default=True, dest="submit")
523     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
524                          dest="create_workflow")
525     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
526     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
527
528     exgroup = parser.add_mutually_exclusive_group()
529     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
530                         default=True, dest="wait")
531     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
532                         default=True, dest="wait")
533
534     parser.add_argument("--api", type=str,
535                         default=None, dest="work_api",
536                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
537
538     parser.add_argument("--compute-checksum", action="store_true", default=False,
539                         help="Compute checksum of contents while collecting outputs",
540                         dest="compute_checksum")
541
542     parser.add_argument("--submit-runner-ram", type=int,
543                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
544                         default=1024)
545
546     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
547     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
548
549     return parser
550
551 def add_arv_hints():
552     cache = {}
553     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
554     cache["http://arvados.org/cwl"] = res.read()
555     res.close()
556     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
557     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
558     for n in extnames.names:
559         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
560             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
561         document_loader.idx["http://arvados.org/cwl#"+n] = {}
562
563 def main(args, stdout, stderr, api_client=None, keep_client=None):
564     parser = arg_parser()
565
566     job_order_object = None
567     arvargs = parser.parse_args(args)
568
569     if arvargs.update_workflow:
570         if arvargs.update_workflow.find('-7fd4e-') == 5:
571             want_api = 'containers'
572         elif arvargs.update_workflow.find('-p5p6p-') == 5:
573             want_api = 'jobs'
574         else:
575             want_api = None
576         if want_api and arvargs.work_api and want_api != arvargs.work_api:
577             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
578                 arvargs.update_workflow, want_api, arvargs.work_api))
579             return 1
580         arvargs.work_api = want_api
581
582     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
583         job_order_object = ({}, "")
584
585     add_arv_hints()
586
587     try:
588         if api_client is None:
589             api_client=arvados.api('v1', model=OrderedJsonModel())
590         if keep_client is None:
591             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
592         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
593                               num_retries=4, output_name=arvargs.output_name,
594                               output_tags=arvargs.output_tags)
595     except Exception as e:
596         logger.error(e)
597         return 1
598
599     if arvargs.debug:
600         logger.setLevel(logging.DEBUG)
601
602     if arvargs.quiet:
603         logger.setLevel(logging.WARN)
604         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
605
606     if arvargs.metrics:
607         metrics.setLevel(logging.DEBUG)
608         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
609
610     arvargs.conformance_test = None
611     arvargs.use_container = True
612     arvargs.relax_path_checks = True
613
614     return cwltool.main.main(args=arvargs,
615                              stdout=stdout,
616                              stderr=stderr,
617                              executor=runner.arv_executor,
618                              makeTool=runner.arv_make_tool,
619                              versionfunc=versionstring,
620                              job_order_object=job_order_object,
621                              make_fs_access=partial(CollectionFsAccess,
622                                                     api_client=api_client,
623                                                     keep_client=keep_client),
624                              fetcher_constructor=partial(CollectionFetcher,
625                                                          api_client=api_client,
626                                                          keep_client=keep_client),
627                              resolver=partial(collectionResolver, api_client))