Merge branch '10681-cwl-wf-name-flag' closes #10681
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24 from arvados.errors import ApiError
25
26 from .arvcontainer import ArvadosContainer, RunnerContainer
27 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
28 from. runner import Runner, upload_instance
29 from .arvtool import ArvadosCommandTool
30 from .arvworkflow import ArvadosWorkflow, upload_workflow
31 from .fsaccess import CollectionFsAccess
32 from .perf import Perf
33 from .pathmapper import FinalOutputPathMapper
34 from ._version import __version__
35
36 from cwltool.pack import pack
37 from cwltool.process import shortname, UnsupportedRequirement, getListing
38 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
39 from cwltool.draft2tool import compute_checksums
40 from arvados.api import OrderedJsonModel
41
42 logger = logging.getLogger('arvados.cwl-runner')
43 metrics = logging.getLogger('arvados.cwl-runner.metrics')
44 logger.setLevel(logging.INFO)
45
46
47 class ArvCwlRunner(object):
48     """Execute a CWL tool or workflow, submit work (using either jobs or
49     containers API), wait for them to complete, and report output.
50
51     """
52
53     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
54         self.api = api_client
55         self.processes = {}
56         self.lock = threading.Lock()
57         self.cond = threading.Condition(self.lock)
58         self.final_output = None
59         self.final_status = None
60         self.uploaded = {}
61         self.num_retries = 4
62         self.uuid = None
63         self.stop_polling = threading.Event()
64         self.poll_api = None
65         self.pipeline = None
66         self.final_output_collection = None
67         self.output_name = output_name
68         self.output_tags = output_tags
69         self.project_uuid = None
70
71         if keep_client is not None:
72             self.keep_client = keep_client
73         else:
74             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
75
76         self.work_api = None
77         expected_api = ["jobs", "containers"]
78         for api in expected_api:
79             try:
80                 methods = self.api._rootDesc.get('resources')[api]['methods']
81                 if ('httpMethod' in methods['create'] and
82                     (work_api == api or work_api is None)):
83                     self.work_api = api
84                     break
85             except KeyError:
86                 pass
87
88         if not self.work_api:
89             if work_api is None:
90                 raise Exception("No supported APIs")
91             else:
92                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
93
94     def arv_make_tool(self, toolpath_object, **kwargs):
95         kwargs["work_api"] = self.work_api
96         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
97             return ArvadosCommandTool(self, toolpath_object, **kwargs)
98         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
99             return ArvadosWorkflow(self, toolpath_object, **kwargs)
100         else:
101             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
102
103     def output_callback(self, out, processStatus):
104         if processStatus == "success":
105             logger.info("Overall process status is %s", processStatus)
106             if self.pipeline:
107                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
108                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
109         else:
110             logger.warn("Overall process status is %s", processStatus)
111             if self.pipeline:
112                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
113                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
114         self.final_status = processStatus
115         self.final_output = out
116
117     def on_message(self, event):
118         if "object_uuid" in event:
119             if event["object_uuid"] in self.processes and event["event_type"] == "update":
120                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
121                     uuid = event["object_uuid"]
122                     with self.lock:
123                         j = self.processes[uuid]
124                         logger.info("Job %s (%s) is Running", j.name, uuid)
125                         j.running = True
126                         j.update_pipeline_component(event["properties"]["new_attributes"])
127                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
128                     uuid = event["object_uuid"]
129                     try:
130                         self.cond.acquire()
131                         j = self.processes[uuid]
132                         txt = self.work_api[0].upper() + self.work_api[1:-1]
133                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
134                         with Perf(metrics, "done %s" % j.name):
135                             j.done(event["properties"]["new_attributes"])
136                         self.cond.notify()
137                     finally:
138                         self.cond.release()
139
140     def poll_states(self):
141         """Poll status of jobs or containers listed in the processes dict.
142
143         Runs in a separate thread.
144         """
145
146         try:
147             while True:
148                 self.stop_polling.wait(15)
149                 if self.stop_polling.is_set():
150                     break
151                 with self.lock:
152                     keys = self.processes.keys()
153                 if not keys:
154                     continue
155
156                 if self.work_api == "containers":
157                     table = self.poll_api.container_requests()
158                 elif self.work_api == "jobs":
159                     table = self.poll_api.jobs()
160
161                 try:
162                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
163                 except Exception as e:
164                     logger.warn("Error checking states on API server: %s", e)
165                     continue
166
167                 for p in proc_states["items"]:
168                     self.on_message({
169                         "object_uuid": p["uuid"],
170                         "event_type": "update",
171                         "properties": {
172                             "new_attributes": p
173                         }
174                     })
175         except:
176             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
177             self.cond.acquire()
178             self.processes.clear()
179             self.cond.notify()
180             self.cond.release()
181         finally:
182             self.stop_polling.set()
183
184     def get_uploaded(self):
185         return self.uploaded.copy()
186
187     def add_uploaded(self, src, pair):
188         self.uploaded[src] = pair
189
190     def check_writable(self, obj):
191         if isinstance(obj, dict):
192             if obj.get("writable"):
193                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
194             for v in obj.itervalues():
195                 self.check_writable(v)
196         if isinstance(obj, list):
197             for v in obj:
198                 self.check_writable(v)
199
200     def make_output_collection(self, name, tagsString, outputObj):
201         outputObj = copy.deepcopy(outputObj)
202
203         files = []
204         def capture(fileobj):
205             files.append(fileobj)
206
207         adjustDirObjs(outputObj, capture)
208         adjustFileObjs(outputObj, capture)
209
210         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
211
212         final = arvados.collection.Collection(api_client=self.api,
213                                               keep_client=self.keep_client,
214                                               num_retries=self.num_retries)
215
216         srccollections = {}
217         for k,v in generatemapper.items():
218             if k.startswith("_:"):
219                 if v.type == "Directory":
220                     continue
221                 if v.type == "CreateFile":
222                     with final.open(v.target, "wb") as f:
223                         f.write(v.resolved.encode("utf-8"))
224                     continue
225
226             if not k.startswith("keep:"):
227                 raise Exception("Output source is not in keep or a literal")
228             sp = k.split("/")
229             srccollection = sp[0][5:]
230             if srccollection not in srccollections:
231                 try:
232                     srccollections[srccollection] = arvados.collection.CollectionReader(
233                         srccollection,
234                         api_client=self.api,
235                         keep_client=self.keep_client,
236                         num_retries=self.num_retries)
237                 except arvados.errors.ArgumentError as e:
238                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
239                     raise
240             reader = srccollections[srccollection]
241             try:
242                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
243                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
244             except IOError as e:
245                 logger.warn("While preparing output collection: %s", e)
246
247         def rewrite(fileobj):
248             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
249             for k in ("basename", "listing", "contents"):
250                 if k in fileobj:
251                     del fileobj[k]
252
253         adjustDirObjs(outputObj, rewrite)
254         adjustFileObjs(outputObj, rewrite)
255
256         with final.open("cwl.output.json", "w") as f:
257             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
258
259         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
260
261         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
262                     final.api_response()["name"],
263                     final.manifest_locator())
264
265         final_uuid = final.manifest_locator()
266         tags = tagsString.split(',')
267         for tag in tags:
268              self.api.links().create(body={
269                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
270                 }).execute(num_retries=self.num_retries)
271
272         def finalcollection(fileobj):
273             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
274
275         adjustDirObjs(outputObj, finalcollection)
276         adjustFileObjs(outputObj, finalcollection)
277
278         return (outputObj, final)
279
280     def set_crunch_output(self):
281         if self.work_api == "containers":
282             try:
283                 current = self.api.containers().current().execute(num_retries=self.num_retries)
284             except ApiError as e:
285                 # Status code 404 just means we're not running in a container.
286                 if e.resp.status != 404:
287                     logger.info("Getting current container: %s", e)
288                 return
289             try:
290                 self.api.containers().update(uuid=current['uuid'],
291                                              body={
292                                                  'output': self.final_output_collection.portable_data_hash(),
293                                              }).execute(num_retries=self.num_retries)
294             except Exception as e:
295                 logger.info("Setting container output: %s", e)
296         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
297             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
298                                    body={
299                                        'output': self.final_output_collection.portable_data_hash(),
300                                        'success': self.final_status == "success",
301                                        'progress':1.0
302                                    }).execute(num_retries=self.num_retries)
303
304     def arv_executor(self, tool, job_order, **kwargs):
305         self.debug = kwargs.get("debug")
306
307         tool.visit(self.check_writable)
308
309         self.project_uuid = kwargs.get("project_uuid")
310         self.pipeline = None
311         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
312                                                                  api_client=self.api,
313                                                                  keep_client=self.keep_client)
314         self.fs_access = make_fs_access(kwargs["basedir"])
315
316         existing_uuid = kwargs.get("update_workflow")
317         if existing_uuid or kwargs.get("create_workflow"):
318             if self.work_api == "jobs":
319                 tmpl = RunnerTemplate(self, tool, job_order,
320                                       kwargs.get("enable_reuse"),
321                                       uuid=existing_uuid,
322                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
323                                       name=kwargs.get("name"))
324                 tmpl.save()
325                 # cwltool.main will write our return value to stdout.
326                 return tmpl.uuid
327             else:
328                 return upload_workflow(self, tool, job_order,
329                                        self.project_uuid,
330                                        uuid=existing_uuid,
331                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
332                                        name=kwargs.get("name"))
333
334         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
335
336         kwargs["make_fs_access"] = make_fs_access
337         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
338         kwargs["use_container"] = True
339         kwargs["tmpdir_prefix"] = "tmp"
340         kwargs["on_error"] = "continue"
341         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
342
343         if self.work_api == "containers":
344             kwargs["outdir"] = "/var/spool/cwl"
345             kwargs["docker_outdir"] = "/var/spool/cwl"
346             kwargs["tmpdir"] = "/tmp"
347             kwargs["docker_tmpdir"] = "/tmp"
348         elif self.work_api == "jobs":
349             kwargs["outdir"] = "$(task.outdir)"
350             kwargs["docker_outdir"] = "$(task.outdir)"
351             kwargs["tmpdir"] = "$(task.tmpdir)"
352
353         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
354
355         runnerjob = None
356         if kwargs.get("submit"):
357             if self.work_api == "containers":
358                 if tool.tool["class"] == "CommandLineTool":
359                     runnerjob = tool.job(job_order,
360                                          self.output_callback,
361                                          **kwargs).next()
362                 else:
363                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
364                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
365                                                 name=kwargs.get("name"))
366             else:
367                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
368                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
369                                       name=kwargs.get("name"))
370
371         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
372             # Create pipeline for local run
373             self.pipeline = self.api.pipeline_instances().create(
374                 body={
375                     "owner_uuid": self.project_uuid,
376                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
377                     "components": {},
378                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
379             logger.info("Pipeline instance %s", self.pipeline["uuid"])
380
381         if runnerjob and not kwargs.get("wait"):
382             runnerjob.run(wait=kwargs.get("wait"))
383             return runnerjob.uuid
384
385         self.poll_api = arvados.api('v1')
386         self.polling_thread = threading.Thread(target=self.poll_states)
387         self.polling_thread.start()
388
389         if runnerjob:
390             jobiter = iter((runnerjob,))
391         else:
392             if "cwl_runner_job" in kwargs:
393                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
394             jobiter = tool.job(job_order,
395                                self.output_callback,
396                                **kwargs)
397
398         try:
399             self.cond.acquire()
400             # Will continue to hold the lock for the duration of this code
401             # except when in cond.wait(), at which point on_message can update
402             # job state and process output callbacks.
403
404             loopperf = Perf(metrics, "jobiter")
405             loopperf.__enter__()
406             for runnable in jobiter:
407                 loopperf.__exit__()
408
409                 if self.stop_polling.is_set():
410                     break
411
412                 if runnable:
413                     with Perf(metrics, "run"):
414                         runnable.run(**kwargs)
415                 else:
416                     if self.processes:
417                         self.cond.wait(1)
418                     else:
419                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
420                         break
421                 loopperf.__enter__()
422             loopperf.__exit__()
423
424             while self.processes:
425                 self.cond.wait(1)
426
427         except UnsupportedRequirement:
428             raise
429         except:
430             if sys.exc_info()[0] is KeyboardInterrupt:
431                 logger.error("Interrupted, marking pipeline as failed")
432             else:
433                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
434             if self.pipeline:
435                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
436                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
437             if runnerjob and runnerjob.uuid and self.work_api == "containers":
438                 self.api.container_requests().update(uuid=runnerjob.uuid,
439                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
440         finally:
441             self.cond.release()
442             self.stop_polling.set()
443             self.polling_thread.join()
444
445         if self.final_status == "UnsupportedRequirement":
446             raise UnsupportedRequirement("Check log for details.")
447
448         if self.final_output is None:
449             raise WorkflowException("Workflow did not return a result.")
450
451         if kwargs.get("submit") and isinstance(runnerjob, Runner):
452             logger.info("Final output collection %s", runnerjob.final_output)
453         else:
454             if self.output_name is None:
455                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
456             if self.output_tags is None:
457                 self.output_tags = ""
458             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
459             self.set_crunch_output()
460
461         if self.final_status != "success":
462             raise WorkflowException("Workflow failed.")
463
464         if kwargs.get("compute_checksum"):
465             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
466             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
467
468         return self.final_output
469
470
471 def versionstring():
472     """Print version string of key packages for provenance and debugging."""
473
474     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
475     arvpkg = pkg_resources.require("arvados-python-client")
476     cwlpkg = pkg_resources.require("cwltool")
477
478     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
479                                     "arvados-python-client", arvpkg[0].version,
480                                     "cwltool", cwlpkg[0].version)
481
482
483 def arg_parser():  # type: () -> argparse.ArgumentParser
484     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
485
486     parser.add_argument("--basedir", type=str,
487                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
488     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
489                         help="Output directory, default current directory")
490
491     parser.add_argument("--eval-timeout",
492                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
493                         type=float,
494                         default=20)
495     parser.add_argument("--version", action="store_true", help="Print version and exit")
496
497     exgroup = parser.add_mutually_exclusive_group()
498     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
499     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
500     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
501
502     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
503
504     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
505
506     exgroup = parser.add_mutually_exclusive_group()
507     exgroup.add_argument("--enable-reuse", action="store_true",
508                         default=True, dest="enable_reuse",
509                         help="")
510     exgroup.add_argument("--disable-reuse", action="store_false",
511                         default=True, dest="enable_reuse",
512                         help="")
513
514     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
515     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
516     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
517     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
518                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
519                         default=False)
520
521     exgroup = parser.add_mutually_exclusive_group()
522     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
523                         default=True, dest="submit")
524     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
525                         default=True, dest="submit")
526     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
527                          dest="create_workflow")
528     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
529     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
530
531     exgroup = parser.add_mutually_exclusive_group()
532     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
533                         default=True, dest="wait")
534     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
535                         default=True, dest="wait")
536
537     parser.add_argument("--api", type=str,
538                         default=None, dest="work_api",
539                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
540
541     parser.add_argument("--compute-checksum", action="store_true", default=False,
542                         help="Compute checksum of contents while collecting outputs",
543                         dest="compute_checksum")
544
545     parser.add_argument("--submit-runner-ram", type=int,
546                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
547                         default=1024)
548
549     parser.add_argument("--name", type=str,
550                         help="Name to use for workflow execution instance.",
551                         default=None)
552
553     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
554     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
555
556     return parser
557
558 def add_arv_hints():
559     cache = {}
560     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
561     cache["http://arvados.org/cwl"] = res.read()
562     res.close()
563     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
564     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
565     for n in extnames.names:
566         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
567             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
568         document_loader.idx["http://arvados.org/cwl#"+n] = {}
569
570 def main(args, stdout, stderr, api_client=None, keep_client=None):
571     parser = arg_parser()
572
573     job_order_object = None
574     arvargs = parser.parse_args(args)
575
576     if arvargs.version:
577         print versionstring()
578         return
579
580     if arvargs.update_workflow:
581         if arvargs.update_workflow.find('-7fd4e-') == 5:
582             want_api = 'containers'
583         elif arvargs.update_workflow.find('-p5p6p-') == 5:
584             want_api = 'jobs'
585         else:
586             want_api = None
587         if want_api and arvargs.work_api and want_api != arvargs.work_api:
588             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
589                 arvargs.update_workflow, want_api, arvargs.work_api))
590             return 1
591         arvargs.work_api = want_api
592
593     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
594         job_order_object = ({}, "")
595
596     add_arv_hints()
597
598     try:
599         if api_client is None:
600             api_client=arvados.api('v1', model=OrderedJsonModel())
601         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
602     except Exception as e:
603         logger.error(e)
604         return 1
605
606     if arvargs.debug:
607         logger.setLevel(logging.DEBUG)
608
609     if arvargs.quiet:
610         logger.setLevel(logging.WARN)
611         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
612
613     if arvargs.metrics:
614         metrics.setLevel(logging.DEBUG)
615         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
616
617     arvargs.conformance_test = None
618     arvargs.use_container = True
619     arvargs.relax_path_checks = True
620
621     return cwltool.main.main(args=arvargs,
622                              stdout=stdout,
623                              stderr=stderr,
624                              executor=runner.arv_executor,
625                              makeTool=runner.arv_make_tool,
626                              versionfunc=versionstring,
627                              job_order_object=job_order_object,
628                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))