8460: Merge branch 'master' into 8460-websocket-go
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24 from arvados.keep import KeepClient
25 from arvados.errors import ApiError
26
27 from .arvcontainer import ArvadosContainer, RunnerContainer
28 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
29 from. runner import Runner, upload_instance
30 from .arvtool import ArvadosCommandTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
33 from .perf import Perf
34 from .pathmapper import FinalOutputPathMapper
35 from ._version import __version__
36
37 from cwltool.pack import pack
38 from cwltool.process import shortname, UnsupportedRequirement, getListing
39 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
40 from cwltool.draft2tool import compute_checksums
41 from arvados.api import OrderedJsonModel
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45 logger.setLevel(logging.INFO)
46
47
48 class ArvCwlRunner(object):
49     """Execute a CWL tool or workflow, submit work (using either jobs or
50     containers API), wait for them to complete, and report output.
51
52     """
53
54     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
55         self.api = api_client
56         self.processes = {}
57         self.lock = threading.Lock()
58         self.cond = threading.Condition(self.lock)
59         self.final_output = None
60         self.final_status = None
61         self.uploaded = {}
62         self.num_retries = num_retries
63         self.uuid = None
64         self.stop_polling = threading.Event()
65         self.poll_api = None
66         self.pipeline = None
67         self.final_output_collection = None
68         self.output_name = output_name
69         self.output_tags = output_tags
70         self.project_uuid = None
71
72         if keep_client is not None:
73             self.keep_client = keep_client
74         else:
75             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
76
77         self.work_api = None
78         expected_api = ["jobs", "containers"]
79         for api in expected_api:
80             try:
81                 methods = self.api._rootDesc.get('resources')[api]['methods']
82                 if ('httpMethod' in methods['create'] and
83                     (work_api == api or work_api is None)):
84                     self.work_api = api
85                     break
86             except KeyError:
87                 pass
88
89         if not self.work_api:
90             if work_api is None:
91                 raise Exception("No supported APIs")
92             else:
93                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
94
95     def arv_make_tool(self, toolpath_object, **kwargs):
96         kwargs["work_api"] = self.work_api
97         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
98                                                 api_client=self.api,
99                                                 keep_client=self.keep_client)
100         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
101             return ArvadosCommandTool(self, toolpath_object, **kwargs)
102         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
103             return ArvadosWorkflow(self, toolpath_object, **kwargs)
104         else:
105             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
106
107     def output_callback(self, out, processStatus):
108         if processStatus == "success":
109             logger.info("Overall process status is %s", processStatus)
110             if self.pipeline:
111                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
112                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
113         else:
114             logger.warn("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
118         self.final_status = processStatus
119         self.final_output = out
120
121     def on_message(self, event):
122         if "object_uuid" in event:
123             if event["object_uuid"] in self.processes and event["event_type"] == "update":
124                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
125                     uuid = event["object_uuid"]
126                     with self.lock:
127                         j = self.processes[uuid]
128                         logger.info("Job %s (%s) is Running", j.name, uuid)
129                         j.running = True
130                         j.update_pipeline_component(event["properties"]["new_attributes"])
131                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
132                     uuid = event["object_uuid"]
133                     try:
134                         self.cond.acquire()
135                         j = self.processes[uuid]
136                         txt = self.work_api[0].upper() + self.work_api[1:-1]
137                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
138                         with Perf(metrics, "done %s" % j.name):
139                             j.done(event["properties"]["new_attributes"])
140                         self.cond.notify()
141                     finally:
142                         self.cond.release()
143
144     def poll_states(self):
145         """Poll status of jobs or containers listed in the processes dict.
146
147         Runs in a separate thread.
148         """
149
150         try:
151             while True:
152                 self.stop_polling.wait(15)
153                 if self.stop_polling.is_set():
154                     break
155                 with self.lock:
156                     keys = self.processes.keys()
157                 if not keys:
158                     continue
159
160                 if self.work_api == "containers":
161                     table = self.poll_api.container_requests()
162                 elif self.work_api == "jobs":
163                     table = self.poll_api.jobs()
164
165                 try:
166                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
167                 except Exception as e:
168                     logger.warn("Error checking states on API server: %s", e)
169                     continue
170
171                 for p in proc_states["items"]:
172                     self.on_message({
173                         "object_uuid": p["uuid"],
174                         "event_type": "update",
175                         "properties": {
176                             "new_attributes": p
177                         }
178                     })
179         except:
180             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
181             self.cond.acquire()
182             self.processes.clear()
183             self.cond.notify()
184             self.cond.release()
185         finally:
186             self.stop_polling.set()
187
188     def get_uploaded(self):
189         return self.uploaded.copy()
190
191     def add_uploaded(self, src, pair):
192         self.uploaded[src] = pair
193
194     def check_writable(self, obj):
195         if isinstance(obj, dict):
196             if obj.get("writable"):
197                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
198             for v in obj.itervalues():
199                 self.check_writable(v)
200         if isinstance(obj, list):
201             for v in obj:
202                 self.check_writable(v)
203
204     def make_output_collection(self, name, tagsString, outputObj):
205         outputObj = copy.deepcopy(outputObj)
206
207         files = []
208         def capture(fileobj):
209             files.append(fileobj)
210
211         adjustDirObjs(outputObj, capture)
212         adjustFileObjs(outputObj, capture)
213
214         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
215
216         final = arvados.collection.Collection(api_client=self.api,
217                                               keep_client=self.keep_client,
218                                               num_retries=self.num_retries)
219
220         srccollections = {}
221         for k,v in generatemapper.items():
222             if k.startswith("_:"):
223                 if v.type == "Directory":
224                     continue
225                 if v.type == "CreateFile":
226                     with final.open(v.target, "wb") as f:
227                         f.write(v.resolved.encode("utf-8"))
228                     continue
229
230             if not k.startswith("keep:"):
231                 raise Exception("Output source is not in keep or a literal")
232             sp = k.split("/")
233             srccollection = sp[0][5:]
234             if srccollection not in srccollections:
235                 try:
236                     srccollections[srccollection] = arvados.collection.CollectionReader(
237                         srccollection,
238                         api_client=self.api,
239                         keep_client=self.keep_client,
240                         num_retries=self.num_retries)
241                 except arvados.errors.ArgumentError as e:
242                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
243                     raise
244             reader = srccollections[srccollection]
245             try:
246                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
247                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
248             except IOError as e:
249                 logger.warn("While preparing output collection: %s", e)
250
251         def rewrite(fileobj):
252             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
253             for k in ("basename", "listing", "contents"):
254                 if k in fileobj:
255                     del fileobj[k]
256
257         adjustDirObjs(outputObj, rewrite)
258         adjustFileObjs(outputObj, rewrite)
259
260         with final.open("cwl.output.json", "w") as f:
261             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
262
263         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
264
265         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
266                     final.api_response()["name"],
267                     final.manifest_locator())
268
269         final_uuid = final.manifest_locator()
270         tags = tagsString.split(',')
271         for tag in tags:
272              self.api.links().create(body={
273                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
274                 }).execute(num_retries=self.num_retries)
275
276         def finalcollection(fileobj):
277             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
278
279         adjustDirObjs(outputObj, finalcollection)
280         adjustFileObjs(outputObj, finalcollection)
281
282         return (outputObj, final)
283
284     def set_crunch_output(self):
285         if self.work_api == "containers":
286             try:
287                 current = self.api.containers().current().execute(num_retries=self.num_retries)
288             except ApiError as e:
289                 # Status code 404 just means we're not running in a container.
290                 if e.resp.status != 404:
291                     logger.info("Getting current container: %s", e)
292                 return
293             try:
294                 self.api.containers().update(uuid=current['uuid'],
295                                              body={
296                                                  'output': self.final_output_collection.portable_data_hash(),
297                                              }).execute(num_retries=self.num_retries)
298             except Exception as e:
299                 logger.info("Setting container output: %s", e)
300         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
301             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
302                                    body={
303                                        'output': self.final_output_collection.portable_data_hash(),
304                                        'success': self.final_status == "success",
305                                        'progress':1.0
306                                    }).execute(num_retries=self.num_retries)
307
308     def arv_executor(self, tool, job_order, **kwargs):
309         self.debug = kwargs.get("debug")
310
311         tool.visit(self.check_writable)
312
313         self.project_uuid = kwargs.get("project_uuid")
314         self.pipeline = None
315         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
316                                                                  api_client=self.api,
317                                                                  keep_client=self.keep_client)
318         self.fs_access = make_fs_access(kwargs["basedir"])
319
320         existing_uuid = kwargs.get("update_workflow")
321         if existing_uuid or kwargs.get("create_workflow"):
322             if self.work_api == "jobs":
323                 tmpl = RunnerTemplate(self, tool, job_order,
324                                       kwargs.get("enable_reuse"),
325                                       uuid=existing_uuid,
326                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
327                                       name=kwargs.get("name"))
328                 tmpl.save()
329                 # cwltool.main will write our return value to stdout.
330                 return tmpl.uuid
331             else:
332                 return upload_workflow(self, tool, job_order,
333                                        self.project_uuid,
334                                        uuid=existing_uuid,
335                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
336                                        name=kwargs.get("name"))
337
338         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
339
340         kwargs["make_fs_access"] = make_fs_access
341         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
342         kwargs["use_container"] = True
343         kwargs["tmpdir_prefix"] = "tmp"
344         kwargs["on_error"] = "continue"
345         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
346
347         if not kwargs["name"]:
348             del kwargs["name"]
349
350         if self.work_api == "containers":
351             kwargs["outdir"] = "/var/spool/cwl"
352             kwargs["docker_outdir"] = "/var/spool/cwl"
353             kwargs["tmpdir"] = "/tmp"
354             kwargs["docker_tmpdir"] = "/tmp"
355         elif self.work_api == "jobs":
356             kwargs["outdir"] = "$(task.outdir)"
357             kwargs["docker_outdir"] = "$(task.outdir)"
358             kwargs["tmpdir"] = "$(task.tmpdir)"
359
360         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
361
362         runnerjob = None
363         if kwargs.get("submit"):
364             if self.work_api == "containers":
365                 if tool.tool["class"] == "CommandLineTool":
366                     kwargs["runnerjob"] = tool.tool["id"]
367                     runnerjob = tool.job(job_order,
368                                          self.output_callback,
369                                          **kwargs).next()
370                 else:
371                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
372                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
373                                                 name=kwargs.get("name"))
374             else:
375                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
376                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
377                                       name=kwargs.get("name"))
378
379         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
380             # Create pipeline for local run
381             self.pipeline = self.api.pipeline_instances().create(
382                 body={
383                     "owner_uuid": self.project_uuid,
384                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
385                     "components": {},
386                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
387             logger.info("Pipeline instance %s", self.pipeline["uuid"])
388
389         if runnerjob and not kwargs.get("wait"):
390             runnerjob.run(wait=kwargs.get("wait"))
391             return runnerjob.uuid
392
393         self.poll_api = arvados.api('v1')
394         self.polling_thread = threading.Thread(target=self.poll_states)
395         self.polling_thread.start()
396
397         if runnerjob:
398             jobiter = iter((runnerjob,))
399         else:
400             if "cwl_runner_job" in kwargs:
401                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
402             jobiter = tool.job(job_order,
403                                self.output_callback,
404                                **kwargs)
405
406         try:
407             self.cond.acquire()
408             # Will continue to hold the lock for the duration of this code
409             # except when in cond.wait(), at which point on_message can update
410             # job state and process output callbacks.
411
412             loopperf = Perf(metrics, "jobiter")
413             loopperf.__enter__()
414             for runnable in jobiter:
415                 loopperf.__exit__()
416
417                 if self.stop_polling.is_set():
418                     break
419
420                 if runnable:
421                     with Perf(metrics, "run"):
422                         runnable.run(**kwargs)
423                 else:
424                     if self.processes:
425                         self.cond.wait(1)
426                     else:
427                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
428                         break
429                 loopperf.__enter__()
430             loopperf.__exit__()
431
432             while self.processes:
433                 self.cond.wait(1)
434
435         except UnsupportedRequirement:
436             raise
437         except:
438             if sys.exc_info()[0] is KeyboardInterrupt:
439                 logger.error("Interrupted, marking pipeline as failed")
440             else:
441                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
442             if self.pipeline:
443                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
444                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
445             if runnerjob and runnerjob.uuid and self.work_api == "containers":
446                 self.api.container_requests().update(uuid=runnerjob.uuid,
447                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
448         finally:
449             self.cond.release()
450             self.stop_polling.set()
451             self.polling_thread.join()
452
453         if self.final_status == "UnsupportedRequirement":
454             raise UnsupportedRequirement("Check log for details.")
455
456         if self.final_output is None:
457             raise WorkflowException("Workflow did not return a result.")
458
459         if kwargs.get("submit") and isinstance(runnerjob, Runner):
460             logger.info("Final output collection %s", runnerjob.final_output)
461         else:
462             if self.output_name is None:
463                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
464             if self.output_tags is None:
465                 self.output_tags = ""
466             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
467             self.set_crunch_output()
468
469         if self.final_status != "success":
470             raise WorkflowException("Workflow failed.")
471
472         if kwargs.get("compute_checksum"):
473             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
474             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
475
476         return self.final_output
477
478
479 def versionstring():
480     """Print version string of key packages for provenance and debugging."""
481
482     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
483     arvpkg = pkg_resources.require("arvados-python-client")
484     cwlpkg = pkg_resources.require("cwltool")
485
486     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
487                                     "arvados-python-client", arvpkg[0].version,
488                                     "cwltool", cwlpkg[0].version)
489
490
491 def arg_parser():  # type: () -> argparse.ArgumentParser
492     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
493
494     parser.add_argument("--basedir", type=str,
495                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
496     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
497                         help="Output directory, default current directory")
498
499     parser.add_argument("--eval-timeout",
500                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
501                         type=float,
502                         default=20)
503     parser.add_argument("--version", action="store_true", help="Print version and exit")
504
505     exgroup = parser.add_mutually_exclusive_group()
506     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
507     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
508     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
509
510     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
511
512     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
513
514     exgroup = parser.add_mutually_exclusive_group()
515     exgroup.add_argument("--enable-reuse", action="store_true",
516                         default=True, dest="enable_reuse",
517                         help="")
518     exgroup.add_argument("--disable-reuse", action="store_false",
519                         default=True, dest="enable_reuse",
520                         help="")
521
522     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
523     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
524     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
525     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
526                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
527                         default=False)
528
529     exgroup = parser.add_mutually_exclusive_group()
530     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
531                         default=True, dest="submit")
532     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
533                         default=True, dest="submit")
534     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
535                          dest="create_workflow")
536     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
537     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
538
539     exgroup = parser.add_mutually_exclusive_group()
540     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
541                         default=True, dest="wait")
542     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
543                         default=True, dest="wait")
544
545     parser.add_argument("--api", type=str,
546                         default=None, dest="work_api",
547                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
548
549     parser.add_argument("--compute-checksum", action="store_true", default=False,
550                         help="Compute checksum of contents while collecting outputs",
551                         dest="compute_checksum")
552
553     parser.add_argument("--submit-runner-ram", type=int,
554                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
555                         default=1024)
556
557     parser.add_argument("--name", type=str,
558                         help="Name to use for workflow execution instance.",
559                         default=None)
560
561     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
562     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
563
564     return parser
565
566 def add_arv_hints():
567     cache = {}
568     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
569     cache["http://arvados.org/cwl"] = res.read()
570     res.close()
571     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
572     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
573     for n in extnames.names:
574         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
575             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
576         document_loader.idx["http://arvados.org/cwl#"+n] = {}
577
578 def main(args, stdout, stderr, api_client=None, keep_client=None):
579     parser = arg_parser()
580
581     job_order_object = None
582     arvargs = parser.parse_args(args)
583
584     if arvargs.version:
585         print versionstring()
586         return
587
588     if arvargs.update_workflow:
589         if arvargs.update_workflow.find('-7fd4e-') == 5:
590             want_api = 'containers'
591         elif arvargs.update_workflow.find('-p5p6p-') == 5:
592             want_api = 'jobs'
593         else:
594             want_api = None
595         if want_api and arvargs.work_api and want_api != arvargs.work_api:
596             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
597                 arvargs.update_workflow, want_api, arvargs.work_api))
598             return 1
599         arvargs.work_api = want_api
600
601     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
602         job_order_object = ({}, "")
603
604     add_arv_hints()
605
606     try:
607         if api_client is None:
608             api_client=arvados.api('v1', model=OrderedJsonModel())
609         if keep_client is None:
610             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
611         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
612                               num_retries=4, output_name=arvargs.output_name,
613                               output_tags=arvargs.output_tags)
614     except Exception as e:
615         logger.error(e)
616         return 1
617
618     if arvargs.debug:
619         logger.setLevel(logging.DEBUG)
620
621     if arvargs.quiet:
622         logger.setLevel(logging.WARN)
623         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
624
625     if arvargs.metrics:
626         metrics.setLevel(logging.DEBUG)
627         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
628
629     arvargs.conformance_test = None
630     arvargs.use_container = True
631     arvargs.relax_path_checks = True
632
633     return cwltool.main.main(args=arvargs,
634                              stdout=stdout,
635                              stderr=stderr,
636                              executor=runner.arv_executor,
637                              makeTool=runner.arv_make_tool,
638                              versionfunc=versionstring,
639                              job_order_object=job_order_object,
640                              make_fs_access=partial(CollectionFsAccess,
641                                                     api_client=api_client,
642                                                     keep_client=keep_client),
643                              fetcher_constructor=partial(CollectionFetcher,
644                                                          api_client=api_client,
645                                                          keep_client=keep_client),
646                              resolver=partial(collectionResolver, api_client))