Merge branch 'master' into 10576-cwl-keep-fetcher
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24 from arvados.keep import KeepClient
25 from arvados.errors import ApiError
26
27 from .arvcontainer import ArvadosContainer, RunnerContainer
28 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
29 from. runner import Runner, upload_instance
30 from .arvtool import ArvadosCommandTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
33 from .perf import Perf
34 from .pathmapper import FinalOutputPathMapper
35 from ._version import __version__
36
37 from cwltool.pack import pack
38 from cwltool.process import shortname, UnsupportedRequirement, getListing
39 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
40 from cwltool.draft2tool import compute_checksums
41 from arvados.api import OrderedJsonModel
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45 logger.setLevel(logging.INFO)
46
47
48 class ArvCwlRunner(object):
49     """Execute a CWL tool or workflow, submit work (using either jobs or
50     containers API), wait for them to complete, and report output.
51
52     """
53
54     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
55         self.api = api_client
56         self.processes = {}
57         self.lock = threading.Lock()
58         self.cond = threading.Condition(self.lock)
59         self.final_output = None
60         self.final_status = None
61         self.uploaded = {}
62         self.num_retries = num_retries
63         self.uuid = None
64         self.stop_polling = threading.Event()
65         self.poll_api = None
66         self.pipeline = None
67         self.final_output_collection = None
68         self.output_name = output_name
69         self.output_tags = output_tags
70         self.project_uuid = None
71
72         if keep_client is not None:
73             self.keep_client = keep_client
74         else:
75             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
76
77         self.work_api = None
78         expected_api = ["jobs", "containers"]
79         for api in expected_api:
80             try:
81                 methods = self.api._rootDesc.get('resources')[api]['methods']
82                 if ('httpMethod' in methods['create'] and
83                     (work_api == api or work_api is None)):
84                     self.work_api = api
85                     break
86             except KeyError:
87                 pass
88
89         if not self.work_api:
90             if work_api is None:
91                 raise Exception("No supported APIs")
92             else:
93                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
94
95     def arv_make_tool(self, toolpath_object, **kwargs):
96         kwargs["work_api"] = self.work_api
97         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
98                                                 api_client=self.api,
99                                                 keep_client=self.keep_client)
100         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
101             return ArvadosCommandTool(self, toolpath_object, **kwargs)
102         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
103             return ArvadosWorkflow(self, toolpath_object, **kwargs)
104         else:
105             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
106
107     def output_callback(self, out, processStatus):
108         if processStatus == "success":
109             logger.info("Overall process status is %s", processStatus)
110             if self.pipeline:
111                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
112                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
113         else:
114             logger.warn("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
118         self.final_status = processStatus
119         self.final_output = out
120
121     def on_message(self, event):
122         if "object_uuid" in event:
123             if event["object_uuid"] in self.processes and event["event_type"] == "update":
124                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
125                     uuid = event["object_uuid"]
126                     with self.lock:
127                         j = self.processes[uuid]
128                         logger.info("Job %s (%s) is Running", j.name, uuid)
129                         j.running = True
130                         j.update_pipeline_component(event["properties"]["new_attributes"])
131                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
132                     uuid = event["object_uuid"]
133                     try:
134                         self.cond.acquire()
135                         j = self.processes[uuid]
136                         txt = self.work_api[0].upper() + self.work_api[1:-1]
137                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
138                         with Perf(metrics, "done %s" % j.name):
139                             j.done(event["properties"]["new_attributes"])
140                         self.cond.notify()
141                     finally:
142                         self.cond.release()
143
144     def poll_states(self):
145         """Poll status of jobs or containers listed in the processes dict.
146
147         Runs in a separate thread.
148         """
149
150         try:
151             while True:
152                 self.stop_polling.wait(15)
153                 if self.stop_polling.is_set():
154                     break
155                 with self.lock:
156                     keys = self.processes.keys()
157                 if not keys:
158                     continue
159
160                 if self.work_api == "containers":
161                     table = self.poll_api.container_requests()
162                 elif self.work_api == "jobs":
163                     table = self.poll_api.jobs()
164
165                 try:
166                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
167                 except Exception as e:
168                     logger.warn("Error checking states on API server: %s", e)
169                     continue
170
171                 for p in proc_states["items"]:
172                     self.on_message({
173                         "object_uuid": p["uuid"],
174                         "event_type": "update",
175                         "properties": {
176                             "new_attributes": p
177                         }
178                     })
179         except:
180             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
181             self.cond.acquire()
182             self.processes.clear()
183             self.cond.notify()
184             self.cond.release()
185         finally:
186             self.stop_polling.set()
187
188     def get_uploaded(self):
189         return self.uploaded.copy()
190
191     def add_uploaded(self, src, pair):
192         self.uploaded[src] = pair
193
194     def check_writable(self, obj):
195         if isinstance(obj, dict):
196             if obj.get("writable"):
197                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
198             for v in obj.itervalues():
199                 self.check_writable(v)
200         if isinstance(obj, list):
201             for v in obj:
202                 self.check_writable(v)
203
204     def make_output_collection(self, name, tagsString, outputObj):
205         outputObj = copy.deepcopy(outputObj)
206
207         files = []
208         def capture(fileobj):
209             files.append(fileobj)
210
211         adjustDirObjs(outputObj, capture)
212         adjustFileObjs(outputObj, capture)
213
214         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
215
216         final = arvados.collection.Collection(api_client=self.api,
217                                               keep_client=self.keep_client,
218                                               num_retries=self.num_retries)
219
220         srccollections = {}
221         for k,v in generatemapper.items():
222             if k.startswith("_:"):
223                 if v.type == "Directory":
224                     continue
225                 if v.type == "CreateFile":
226                     with final.open(v.target, "wb") as f:
227                         f.write(v.resolved.encode("utf-8"))
228                     continue
229
230             if not k.startswith("keep:"):
231                 raise Exception("Output source is not in keep or a literal")
232             sp = k.split("/")
233             srccollection = sp[0][5:]
234             if srccollection not in srccollections:
235                 try:
236                     srccollections[srccollection] = arvados.collection.CollectionReader(
237                         srccollection,
238                         api_client=self.api,
239                         keep_client=self.keep_client,
240                         num_retries=self.num_retries)
241                 except arvados.errors.ArgumentError as e:
242                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
243                     raise
244             reader = srccollections[srccollection]
245             try:
246                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
247                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
248             except IOError as e:
249                 logger.warn("While preparing output collection: %s", e)
250
251         def rewrite(fileobj):
252             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
253             for k in ("basename", "listing", "contents"):
254                 if k in fileobj:
255                     del fileobj[k]
256
257         adjustDirObjs(outputObj, rewrite)
258         adjustFileObjs(outputObj, rewrite)
259
260         with final.open("cwl.output.json", "w") as f:
261             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
262
263         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
264
265         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
266                     final.api_response()["name"],
267                     final.manifest_locator())
268
269         final_uuid = final.manifest_locator()
270         tags = tagsString.split(',')
271         for tag in tags:
272              self.api.links().create(body={
273                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
274                 }).execute(num_retries=self.num_retries)
275
276         def finalcollection(fileobj):
277             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
278
279         adjustDirObjs(outputObj, finalcollection)
280         adjustFileObjs(outputObj, finalcollection)
281
282         return (outputObj, final)
283
284     def set_crunch_output(self):
285         if self.work_api == "containers":
286             try:
287                 current = self.api.containers().current().execute(num_retries=self.num_retries)
288             except ApiError as e:
289                 # Status code 404 just means we're not running in a container.
290                 if e.resp.status != 404:
291                     logger.info("Getting current container: %s", e)
292                 return
293             try:
294                 self.api.containers().update(uuid=current['uuid'],
295                                              body={
296                                                  'output': self.final_output_collection.portable_data_hash(),
297                                              }).execute(num_retries=self.num_retries)
298             except Exception as e:
299                 logger.info("Setting container output: %s", e)
300         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
301             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
302                                    body={
303                                        'output': self.final_output_collection.portable_data_hash(),
304                                        'success': self.final_status == "success",
305                                        'progress':1.0
306                                    }).execute(num_retries=self.num_retries)
307
308     def arv_executor(self, tool, job_order, **kwargs):
309         self.debug = kwargs.get("debug")
310
311         tool.visit(self.check_writable)
312
313         self.project_uuid = kwargs.get("project_uuid")
314         self.pipeline = None
315         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
316                                                                  api_client=self.api,
317                                                                  keep_client=self.keep_client)
318         self.fs_access = make_fs_access(kwargs["basedir"])
319
320         existing_uuid = kwargs.get("update_workflow")
321         if existing_uuid or kwargs.get("create_workflow"):
322             if self.work_api == "jobs":
323                 tmpl = RunnerTemplate(self, tool, job_order,
324                                       kwargs.get("enable_reuse"),
325                                       uuid=existing_uuid,
326                                       submit_runner_ram=kwargs.get("submit_runner_ram"))
327                 tmpl.save()
328                 # cwltool.main will write our return value to stdout.
329                 return tmpl.uuid
330             else:
331                 return upload_workflow(self, tool, job_order,
332                                        self.project_uuid,
333                                        uuid=existing_uuid,
334                                        submit_runner_ram=kwargs.get("submit_runner_ram"))
335
336         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
337
338         kwargs["make_fs_access"] = make_fs_access
339         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
340         kwargs["use_container"] = True
341         kwargs["tmpdir_prefix"] = "tmp"
342         kwargs["on_error"] = "continue"
343         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
344
345         if self.work_api == "containers":
346             kwargs["outdir"] = "/var/spool/cwl"
347             kwargs["docker_outdir"] = "/var/spool/cwl"
348             kwargs["tmpdir"] = "/tmp"
349             kwargs["docker_tmpdir"] = "/tmp"
350         elif self.work_api == "jobs":
351             kwargs["outdir"] = "$(task.outdir)"
352             kwargs["docker_outdir"] = "$(task.outdir)"
353             kwargs["tmpdir"] = "$(task.tmpdir)"
354
355         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
356
357         runnerjob = None
358         if kwargs.get("submit"):
359             if self.work_api == "containers":
360                 if tool.tool["class"] == "CommandLineTool":
361                     runnerjob = tool.job(job_order,
362                                          self.output_callback,
363                                          **kwargs).next()
364                 else:
365                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
366                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
367             else:
368                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
369                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
370
371         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
372             # Create pipeline for local run
373             self.pipeline = self.api.pipeline_instances().create(
374                 body={
375                     "owner_uuid": self.project_uuid,
376                     "name": shortname(tool.tool["id"]),
377                     "components": {},
378                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
379             logger.info("Pipeline instance %s", self.pipeline["uuid"])
380
381         if runnerjob and not kwargs.get("wait"):
382             runnerjob.run(wait=kwargs.get("wait"))
383             return runnerjob.uuid
384
385         self.poll_api = arvados.api('v1')
386         self.polling_thread = threading.Thread(target=self.poll_states)
387         self.polling_thread.start()
388
389         if runnerjob:
390             jobiter = iter((runnerjob,))
391         else:
392             if "cwl_runner_job" in kwargs:
393                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
394             jobiter = tool.job(job_order,
395                                self.output_callback,
396                                **kwargs)
397
398         try:
399             self.cond.acquire()
400             # Will continue to hold the lock for the duration of this code
401             # except when in cond.wait(), at which point on_message can update
402             # job state and process output callbacks.
403
404             loopperf = Perf(metrics, "jobiter")
405             loopperf.__enter__()
406             for runnable in jobiter:
407                 loopperf.__exit__()
408
409                 if self.stop_polling.is_set():
410                     break
411
412                 if runnable:
413                     with Perf(metrics, "run"):
414                         runnable.run(**kwargs)
415                 else:
416                     if self.processes:
417                         self.cond.wait(1)
418                     else:
419                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
420                         break
421                 loopperf.__enter__()
422             loopperf.__exit__()
423
424             while self.processes:
425                 self.cond.wait(1)
426
427         except UnsupportedRequirement:
428             raise
429         except:
430             if sys.exc_info()[0] is KeyboardInterrupt:
431                 logger.error("Interrupted, marking pipeline as failed")
432             else:
433                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
434             if self.pipeline:
435                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
436                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
437             if runnerjob and runnerjob.uuid and self.work_api == "containers":
438                 self.api.container_requests().update(uuid=runnerjob.uuid,
439                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
440         finally:
441             self.cond.release()
442             self.stop_polling.set()
443             self.polling_thread.join()
444
445         if self.final_status == "UnsupportedRequirement":
446             raise UnsupportedRequirement("Check log for details.")
447
448         if self.final_output is None:
449             raise WorkflowException("Workflow did not return a result.")
450
451         if kwargs.get("submit") and isinstance(runnerjob, Runner):
452             logger.info("Final output collection %s", runnerjob.final_output)
453         else:
454             if self.output_name is None:
455                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
456             if self.output_tags is None:
457                 self.output_tags = ""
458             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
459             self.set_crunch_output()
460
461         if self.final_status != "success":
462             raise WorkflowException("Workflow failed.")
463
464         if kwargs.get("compute_checksum"):
465             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
466             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
467
468         return self.final_output
469
470
471 def versionstring():
472     """Print version string of key packages for provenance and debugging."""
473
474     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
475     arvpkg = pkg_resources.require("arvados-python-client")
476     cwlpkg = pkg_resources.require("cwltool")
477
478     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
479                                     "arvados-python-client", arvpkg[0].version,
480                                     "cwltool", cwlpkg[0].version)
481
482
483 def arg_parser():  # type: () -> argparse.ArgumentParser
484     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
485
486     parser.add_argument("--basedir", type=str,
487                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
488     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
489                         help="Output directory, default current directory")
490
491     parser.add_argument("--eval-timeout",
492                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
493                         type=float,
494                         default=20)
495     parser.add_argument("--version", action="store_true", help="Print version and exit")
496
497     exgroup = parser.add_mutually_exclusive_group()
498     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
499     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
500     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
501
502     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
503
504     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
505
506     exgroup = parser.add_mutually_exclusive_group()
507     exgroup.add_argument("--enable-reuse", action="store_true",
508                         default=True, dest="enable_reuse",
509                         help="")
510     exgroup.add_argument("--disable-reuse", action="store_false",
511                         default=True, dest="enable_reuse",
512                         help="")
513
514     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
515     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
516     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
517     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
518                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
519                         default=False)
520
521     exgroup = parser.add_mutually_exclusive_group()
522     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
523                         default=True, dest="submit")
524     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
525                         default=True, dest="submit")
526     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
527                          dest="create_workflow")
528     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
529     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
530
531     exgroup = parser.add_mutually_exclusive_group()
532     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
533                         default=True, dest="wait")
534     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
535                         default=True, dest="wait")
536
537     parser.add_argument("--api", type=str,
538                         default=None, dest="work_api",
539                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
540
541     parser.add_argument("--compute-checksum", action="store_true", default=False,
542                         help="Compute checksum of contents while collecting outputs",
543                         dest="compute_checksum")
544
545     parser.add_argument("--submit-runner-ram", type=int,
546                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
547                         default=1024)
548
549     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
550     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
551
552     return parser
553
554 def add_arv_hints():
555     cache = {}
556     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
557     cache["http://arvados.org/cwl"] = res.read()
558     res.close()
559     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
560     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
561     for n in extnames.names:
562         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
563             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
564         document_loader.idx["http://arvados.org/cwl#"+n] = {}
565
566 def main(args, stdout, stderr, api_client=None, keep_client=None):
567     parser = arg_parser()
568
569     job_order_object = None
570     arvargs = parser.parse_args(args)
571
572     if arvargs.version:
573         print versionstring()
574         return
575
576     if arvargs.update_workflow:
577         if arvargs.update_workflow.find('-7fd4e-') == 5:
578             want_api = 'containers'
579         elif arvargs.update_workflow.find('-p5p6p-') == 5:
580             want_api = 'jobs'
581         else:
582             want_api = None
583         if want_api and arvargs.work_api and want_api != arvargs.work_api:
584             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
585                 arvargs.update_workflow, want_api, arvargs.work_api))
586             return 1
587         arvargs.work_api = want_api
588
589     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
590         job_order_object = ({}, "")
591
592     add_arv_hints()
593
594     try:
595         if api_client is None:
596             api_client=arvados.api('v1', model=OrderedJsonModel())
597         if keep_client is None:
598             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
599         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
600                               num_retries=4, output_name=arvargs.output_name,
601                               output_tags=arvargs.output_tags)
602     except Exception as e:
603         logger.error(e)
604         return 1
605
606     if arvargs.debug:
607         logger.setLevel(logging.DEBUG)
608
609     if arvargs.quiet:
610         logger.setLevel(logging.WARN)
611         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
612
613     if arvargs.metrics:
614         metrics.setLevel(logging.DEBUG)
615         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
616
617     arvargs.conformance_test = None
618     arvargs.use_container = True
619     arvargs.relax_path_checks = True
620
621     return cwltool.main.main(args=arvargs,
622                              stdout=stdout,
623                              stderr=stderr,
624                              executor=runner.arv_executor,
625                              makeTool=runner.arv_make_tool,
626                              versionfunc=versionstring,
627                              job_order_object=job_order_object,
628                              make_fs_access=partial(CollectionFsAccess,
629                                                     api_client=api_client,
630                                                     keep_client=keep_client),
631                              fetcher_constructor=partial(CollectionFetcher,
632                                                          api_client=api_client,
633                                                          keep_client=keep_client),
634                              resolver=partial(collectionResolver, api_client))