Merge branch '10681-cwl-name-flag' closes #10681
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24 from arvados.errors import ApiError
25
26 from .arvcontainer import ArvadosContainer, RunnerContainer
27 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
28 from. runner import Runner, upload_instance
29 from .arvtool import ArvadosCommandTool
30 from .arvworkflow import ArvadosWorkflow, upload_workflow
31 from .fsaccess import CollectionFsAccess
32 from .perf import Perf
33 from .pathmapper import FinalOutputPathMapper
34 from ._version import __version__
35
36 from cwltool.pack import pack
37 from cwltool.process import shortname, UnsupportedRequirement, getListing
38 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
39 from cwltool.draft2tool import compute_checksums
40 from arvados.api import OrderedJsonModel
41
42 logger = logging.getLogger('arvados.cwl-runner')
43 metrics = logging.getLogger('arvados.cwl-runner.metrics')
44 logger.setLevel(logging.INFO)
45
46
47 class ArvCwlRunner(object):
48     """Execute a CWL tool or workflow, submit work (using either jobs or
49     containers API), wait for them to complete, and report output.
50
51     """
52
53     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
54         self.api = api_client
55         self.processes = {}
56         self.lock = threading.Lock()
57         self.cond = threading.Condition(self.lock)
58         self.final_output = None
59         self.final_status = None
60         self.uploaded = {}
61         self.num_retries = 4
62         self.uuid = None
63         self.stop_polling = threading.Event()
64         self.poll_api = None
65         self.pipeline = None
66         self.final_output_collection = None
67         self.output_name = output_name
68         self.output_tags = output_tags
69         self.project_uuid = None
70
71         if keep_client is not None:
72             self.keep_client = keep_client
73         else:
74             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
75
76         self.work_api = None
77         expected_api = ["jobs", "containers"]
78         for api in expected_api:
79             try:
80                 methods = self.api._rootDesc.get('resources')[api]['methods']
81                 if ('httpMethod' in methods['create'] and
82                     (work_api == api or work_api is None)):
83                     self.work_api = api
84                     break
85             except KeyError:
86                 pass
87
88         if not self.work_api:
89             if work_api is None:
90                 raise Exception("No supported APIs")
91             else:
92                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
93
94     def arv_make_tool(self, toolpath_object, **kwargs):
95         kwargs["work_api"] = self.work_api
96         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
97             return ArvadosCommandTool(self, toolpath_object, **kwargs)
98         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
99             return ArvadosWorkflow(self, toolpath_object, **kwargs)
100         else:
101             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
102
103     def output_callback(self, out, processStatus):
104         if processStatus == "success":
105             logger.info("Overall process status is %s", processStatus)
106             if self.pipeline:
107                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
108                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
109         else:
110             logger.warn("Overall process status is %s", processStatus)
111             if self.pipeline:
112                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
113                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
114         self.final_status = processStatus
115         self.final_output = out
116
117     def on_message(self, event):
118         if "object_uuid" in event:
119             if event["object_uuid"] in self.processes and event["event_type"] == "update":
120                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
121                     uuid = event["object_uuid"]
122                     with self.lock:
123                         j = self.processes[uuid]
124                         logger.info("Job %s (%s) is Running", j.name, uuid)
125                         j.running = True
126                         j.update_pipeline_component(event["properties"]["new_attributes"])
127                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
128                     uuid = event["object_uuid"]
129                     try:
130                         self.cond.acquire()
131                         j = self.processes[uuid]
132                         txt = self.work_api[0].upper() + self.work_api[1:-1]
133                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
134                         with Perf(metrics, "done %s" % j.name):
135                             j.done(event["properties"]["new_attributes"])
136                         self.cond.notify()
137                     finally:
138                         self.cond.release()
139
140     def poll_states(self):
141         """Poll status of jobs or containers listed in the processes dict.
142
143         Runs in a separate thread.
144         """
145
146         try:
147             while True:
148                 self.stop_polling.wait(15)
149                 if self.stop_polling.is_set():
150                     break
151                 with self.lock:
152                     keys = self.processes.keys()
153                 if not keys:
154                     continue
155
156                 if self.work_api == "containers":
157                     table = self.poll_api.container_requests()
158                 elif self.work_api == "jobs":
159                     table = self.poll_api.jobs()
160
161                 try:
162                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
163                 except Exception as e:
164                     logger.warn("Error checking states on API server: %s", e)
165                     continue
166
167                 for p in proc_states["items"]:
168                     self.on_message({
169                         "object_uuid": p["uuid"],
170                         "event_type": "update",
171                         "properties": {
172                             "new_attributes": p
173                         }
174                     })
175         except:
176             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
177             self.cond.acquire()
178             self.processes.clear()
179             self.cond.notify()
180             self.cond.release()
181         finally:
182             self.stop_polling.set()
183
184     def get_uploaded(self):
185         return self.uploaded.copy()
186
187     def add_uploaded(self, src, pair):
188         self.uploaded[src] = pair
189
190     def check_writable(self, obj):
191         if isinstance(obj, dict):
192             if obj.get("writable"):
193                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
194             for v in obj.itervalues():
195                 self.check_writable(v)
196         if isinstance(obj, list):
197             for v in obj:
198                 self.check_writable(v)
199
200     def make_output_collection(self, name, tagsString, outputObj):
201         outputObj = copy.deepcopy(outputObj)
202
203         files = []
204         def capture(fileobj):
205             files.append(fileobj)
206
207         adjustDirObjs(outputObj, capture)
208         adjustFileObjs(outputObj, capture)
209
210         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
211
212         final = arvados.collection.Collection(api_client=self.api,
213                                               keep_client=self.keep_client,
214                                               num_retries=self.num_retries)
215
216         srccollections = {}
217         for k,v in generatemapper.items():
218             if k.startswith("_:"):
219                 if v.type == "Directory":
220                     continue
221                 if v.type == "CreateFile":
222                     with final.open(v.target, "wb") as f:
223                         f.write(v.resolved.encode("utf-8"))
224                     continue
225
226             if not k.startswith("keep:"):
227                 raise Exception("Output source is not in keep or a literal")
228             sp = k.split("/")
229             srccollection = sp[0][5:]
230             if srccollection not in srccollections:
231                 try:
232                     srccollections[srccollection] = arvados.collection.CollectionReader(
233                         srccollection,
234                         api_client=self.api,
235                         keep_client=self.keep_client,
236                         num_retries=self.num_retries)
237                 except arvados.errors.ArgumentError as e:
238                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
239                     raise
240             reader = srccollections[srccollection]
241             try:
242                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
243                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
244             except IOError as e:
245                 logger.warn("While preparing output collection: %s", e)
246
247         def rewrite(fileobj):
248             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
249             for k in ("basename", "listing", "contents"):
250                 if k in fileobj:
251                     del fileobj[k]
252
253         adjustDirObjs(outputObj, rewrite)
254         adjustFileObjs(outputObj, rewrite)
255
256         with final.open("cwl.output.json", "w") as f:
257             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
258
259         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
260
261         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
262                     final.api_response()["name"],
263                     final.manifest_locator())
264
265         final_uuid = final.manifest_locator()
266         tags = tagsString.split(',')
267         for tag in tags:
268              self.api.links().create(body={
269                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
270                 }).execute(num_retries=self.num_retries)
271
272         def finalcollection(fileobj):
273             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
274
275         adjustDirObjs(outputObj, finalcollection)
276         adjustFileObjs(outputObj, finalcollection)
277
278         return (outputObj, final)
279
280     def set_crunch_output(self):
281         if self.work_api == "containers":
282             try:
283                 current = self.api.containers().current().execute(num_retries=self.num_retries)
284             except ApiError as e:
285                 # Status code 404 just means we're not running in a container.
286                 if e.resp.status != 404:
287                     logger.info("Getting current container: %s", e)
288                 return
289             try:
290                 self.api.containers().update(uuid=current['uuid'],
291                                              body={
292                                                  'output': self.final_output_collection.portable_data_hash(),
293                                              }).execute(num_retries=self.num_retries)
294             except Exception as e:
295                 logger.info("Setting container output: %s", e)
296         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
297             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
298                                    body={
299                                        'output': self.final_output_collection.portable_data_hash(),
300                                        'success': self.final_status == "success",
301                                        'progress':1.0
302                                    }).execute(num_retries=self.num_retries)
303
304     def arv_executor(self, tool, job_order, **kwargs):
305         self.debug = kwargs.get("debug")
306
307         tool.visit(self.check_writable)
308
309         self.project_uuid = kwargs.get("project_uuid")
310         self.pipeline = None
311         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
312                                                                  api_client=self.api,
313                                                                  keep_client=self.keep_client)
314         self.fs_access = make_fs_access(kwargs["basedir"])
315
316         existing_uuid = kwargs.get("update_workflow")
317         if existing_uuid or kwargs.get("create_workflow"):
318             if self.work_api == "jobs":
319                 tmpl = RunnerTemplate(self, tool, job_order,
320                                       kwargs.get("enable_reuse"),
321                                       uuid=existing_uuid,
322                                       submit_runner_ram=kwargs.get("submit_runner_ram"))
323                 tmpl.save()
324                 # cwltool.main will write our return value to stdout.
325                 return tmpl.uuid
326             else:
327                 return upload_workflow(self, tool, job_order,
328                                        self.project_uuid,
329                                        uuid=existing_uuid,
330                                        submit_runner_ram=kwargs.get("submit_runner_ram"))
331
332         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
333
334         kwargs["make_fs_access"] = make_fs_access
335         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
336         kwargs["use_container"] = True
337         kwargs["tmpdir_prefix"] = "tmp"
338         kwargs["on_error"] = "continue"
339         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
340
341         if self.work_api == "containers":
342             kwargs["outdir"] = "/var/spool/cwl"
343             kwargs["docker_outdir"] = "/var/spool/cwl"
344             kwargs["tmpdir"] = "/tmp"
345             kwargs["docker_tmpdir"] = "/tmp"
346         elif self.work_api == "jobs":
347             kwargs["outdir"] = "$(task.outdir)"
348             kwargs["docker_outdir"] = "$(task.outdir)"
349             kwargs["tmpdir"] = "$(task.tmpdir)"
350
351         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
352
353         runnerjob = None
354         if kwargs.get("submit"):
355             if self.work_api == "containers":
356                 if tool.tool["class"] == "CommandLineTool":
357                     runnerjob = tool.job(job_order,
358                                          self.output_callback,
359                                          **kwargs).next()
360                 else:
361                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
362                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
363                                                 name=kwargs.get("name"))
364             else:
365                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
366                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
367                                       name=kwargs.get("name"))
368
369         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
370             # Create pipeline for local run
371             self.pipeline = self.api.pipeline_instances().create(
372                 body={
373                     "owner_uuid": self.project_uuid,
374                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
375                     "components": {},
376                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
377             logger.info("Pipeline instance %s", self.pipeline["uuid"])
378
379         if runnerjob and not kwargs.get("wait"):
380             runnerjob.run(wait=kwargs.get("wait"))
381             return runnerjob.uuid
382
383         self.poll_api = arvados.api('v1')
384         self.polling_thread = threading.Thread(target=self.poll_states)
385         self.polling_thread.start()
386
387         if runnerjob:
388             jobiter = iter((runnerjob,))
389         else:
390             if "cwl_runner_job" in kwargs:
391                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
392             jobiter = tool.job(job_order,
393                                self.output_callback,
394                                **kwargs)
395
396         try:
397             self.cond.acquire()
398             # Will continue to hold the lock for the duration of this code
399             # except when in cond.wait(), at which point on_message can update
400             # job state and process output callbacks.
401
402             loopperf = Perf(metrics, "jobiter")
403             loopperf.__enter__()
404             for runnable in jobiter:
405                 loopperf.__exit__()
406
407                 if self.stop_polling.is_set():
408                     break
409
410                 if runnable:
411                     with Perf(metrics, "run"):
412                         runnable.run(**kwargs)
413                 else:
414                     if self.processes:
415                         self.cond.wait(1)
416                     else:
417                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
418                         break
419                 loopperf.__enter__()
420             loopperf.__exit__()
421
422             while self.processes:
423                 self.cond.wait(1)
424
425         except UnsupportedRequirement:
426             raise
427         except:
428             if sys.exc_info()[0] is KeyboardInterrupt:
429                 logger.error("Interrupted, marking pipeline as failed")
430             else:
431                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
432             if self.pipeline:
433                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
434                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
435             if runnerjob and runnerjob.uuid and self.work_api == "containers":
436                 self.api.container_requests().update(uuid=runnerjob.uuid,
437                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
438         finally:
439             self.cond.release()
440             self.stop_polling.set()
441             self.polling_thread.join()
442
443         if self.final_status == "UnsupportedRequirement":
444             raise UnsupportedRequirement("Check log for details.")
445
446         if self.final_output is None:
447             raise WorkflowException("Workflow did not return a result.")
448
449         if kwargs.get("submit") and isinstance(runnerjob, Runner):
450             logger.info("Final output collection %s", runnerjob.final_output)
451         else:
452             if self.output_name is None:
453                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
454             if self.output_tags is None:
455                 self.output_tags = ""
456             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
457             self.set_crunch_output()
458
459         if self.final_status != "success":
460             raise WorkflowException("Workflow failed.")
461
462         if kwargs.get("compute_checksum"):
463             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
464             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
465
466         return self.final_output
467
468
469 def versionstring():
470     """Print version string of key packages for provenance and debugging."""
471
472     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
473     arvpkg = pkg_resources.require("arvados-python-client")
474     cwlpkg = pkg_resources.require("cwltool")
475
476     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
477                                     "arvados-python-client", arvpkg[0].version,
478                                     "cwltool", cwlpkg[0].version)
479
480
481 def arg_parser():  # type: () -> argparse.ArgumentParser
482     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
483
484     parser.add_argument("--basedir", type=str,
485                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
486     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
487                         help="Output directory, default current directory")
488
489     parser.add_argument("--eval-timeout",
490                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
491                         type=float,
492                         default=20)
493     parser.add_argument("--version", action="store_true", help="Print version and exit")
494
495     exgroup = parser.add_mutually_exclusive_group()
496     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
497     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
498     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
499
500     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
501
502     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
503
504     exgroup = parser.add_mutually_exclusive_group()
505     exgroup.add_argument("--enable-reuse", action="store_true",
506                         default=True, dest="enable_reuse",
507                         help="")
508     exgroup.add_argument("--disable-reuse", action="store_false",
509                         default=True, dest="enable_reuse",
510                         help="")
511
512     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
513     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
514     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
515     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
516                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
517                         default=False)
518
519     exgroup = parser.add_mutually_exclusive_group()
520     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
521                         default=True, dest="submit")
522     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
523                         default=True, dest="submit")
524     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
525                          dest="create_workflow")
526     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
527     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
528
529     exgroup = parser.add_mutually_exclusive_group()
530     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
531                         default=True, dest="wait")
532     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
533                         default=True, dest="wait")
534
535     parser.add_argument("--api", type=str,
536                         default=None, dest="work_api",
537                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
538
539     parser.add_argument("--compute-checksum", action="store_true", default=False,
540                         help="Compute checksum of contents while collecting outputs",
541                         dest="compute_checksum")
542
543     parser.add_argument("--submit-runner-ram", type=int,
544                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
545                         default=1024)
546
547     parser.add_argument("--name", type=str,
548                         help="Name to use for workflow execution instance.",
549                         default=None)
550
551     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
552     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
553
554     return parser
555
556 def add_arv_hints():
557     cache = {}
558     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
559     cache["http://arvados.org/cwl"] = res.read()
560     res.close()
561     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
562     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
563     for n in extnames.names:
564         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
565             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
566         document_loader.idx["http://arvados.org/cwl#"+n] = {}
567
568 def main(args, stdout, stderr, api_client=None, keep_client=None):
569     parser = arg_parser()
570
571     job_order_object = None
572     arvargs = parser.parse_args(args)
573
574     if arvargs.version:
575         print versionstring()
576         return
577
578     if arvargs.update_workflow:
579         if arvargs.update_workflow.find('-7fd4e-') == 5:
580             want_api = 'containers'
581         elif arvargs.update_workflow.find('-p5p6p-') == 5:
582             want_api = 'jobs'
583         else:
584             want_api = None
585         if want_api and arvargs.work_api and want_api != arvargs.work_api:
586             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
587                 arvargs.update_workflow, want_api, arvargs.work_api))
588             return 1
589         arvargs.work_api = want_api
590
591     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
592         job_order_object = ({}, "")
593
594     add_arv_hints()
595
596     try:
597         if api_client is None:
598             api_client=arvados.api('v1', model=OrderedJsonModel())
599         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
600     except Exception as e:
601         logger.error(e)
602         return 1
603
604     if arvargs.debug:
605         logger.setLevel(logging.DEBUG)
606
607     if arvargs.quiet:
608         logger.setLevel(logging.WARN)
609         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
610
611     if arvargs.metrics:
612         metrics.setLevel(logging.DEBUG)
613         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
614
615     arvargs.conformance_test = None
616     arvargs.use_container = True
617     arvargs.relax_path_checks = True
618
619     return cwltool.main.main(args=arvargs,
620                              stdout=stdout,
621                              stderr=stderr,
622                              executor=runner.arv_executor,
623                              makeTool=runner.arv_make_tool,
624                              versionfunc=versionstring,
625                              job_order_object=job_order_object,
626                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))