3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
15 from functools import partial
16 import pkg_resources # part of setuptools
18 from cwltool.errors import WorkflowException
20 import cwltool.workflow
21 import cwltool.process
23 from schema_salad.sourceline import SourceLine
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
54 class ArvCwlRunner(object):
55 """Execute a CWL tool or workflow, submit work (using either jobs or
56 containers API), wait for them to complete, and report output.
60 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63 self.lock = threading.Lock()
64 self.cond = threading.Condition(self.lock)
65 self.final_output = None
66 self.final_status = None
68 self.num_retries = num_retries
70 self.stop_polling = threading.Event()
73 self.final_output_collection = None
74 self.output_name = output_name
75 self.output_tags = output_tags
76 self.project_uuid = None
78 if keep_client is not None:
79 self.keep_client = keep_client
81 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
86 expected_api = ["jobs", "containers"]
87 for api in expected_api:
89 methods = self.api._rootDesc.get('resources')[api]['methods']
90 if ('httpMethod' in methods['create'] and
91 (work_api == api or work_api is None)):
99 raise Exception("No supported APIs")
101 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
103 def arv_make_tool(self, toolpath_object, **kwargs):
104 kwargs["work_api"] = self.work_api
105 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
107 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
108 num_retries=self.num_retries)
109 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
110 return ArvadosCommandTool(self, toolpath_object, **kwargs)
111 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
112 return ArvadosWorkflow(self, toolpath_object, **kwargs)
114 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
116 def output_callback(self, out, processStatus):
117 if processStatus == "success":
118 logger.info("Overall process status is %s", processStatus)
120 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
121 body={"state": "Complete"}).execute(num_retries=self.num_retries)
123 logger.warn("Overall process status is %s", processStatus)
125 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
126 body={"state": "Failed"}).execute(num_retries=self.num_retries)
127 self.final_status = processStatus
128 self.final_output = out
130 def on_message(self, event):
131 if "object_uuid" in event:
132 if event["object_uuid"] in self.processes and event["event_type"] == "update":
133 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
134 uuid = event["object_uuid"]
136 j = self.processes[uuid]
137 logger.info("%s %s is Running", self.label(j), uuid)
139 j.update_pipeline_component(event["properties"]["new_attributes"])
140 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
141 uuid = event["object_uuid"]
144 j = self.processes[uuid]
145 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
146 with Perf(metrics, "done %s" % j.name):
147 j.done(event["properties"]["new_attributes"])
152 def label(self, obj):
153 return "[%s %s]" % (self.work_api[0:-1], obj.name)
155 def poll_states(self):
156 """Poll status of jobs or containers listed in the processes dict.
158 Runs in a separate thread.
163 self.stop_polling.wait(15)
164 if self.stop_polling.is_set():
167 keys = self.processes.keys()
171 if self.work_api == "containers":
172 table = self.poll_api.container_requests()
173 elif self.work_api == "jobs":
174 table = self.poll_api.jobs()
177 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
178 except Exception as e:
179 logger.warn("Error checking states on API server: %s", e)
182 for p in proc_states["items"]:
184 "object_uuid": p["uuid"],
185 "event_type": "update",
191 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
193 self.processes.clear()
197 self.stop_polling.set()
199 def get_uploaded(self):
200 return self.uploaded.copy()
202 def add_uploaded(self, src, pair):
203 self.uploaded[src] = pair
205 def check_features(self, obj):
206 if isinstance(obj, dict):
207 if obj.get("writable"):
208 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
209 if obj.get("class") == "DockerRequirement":
210 if obj.get("dockerOutputDirectory"):
211 # TODO: can be supported by containers API, but not jobs API.
212 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
213 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
214 for v in obj.itervalues():
215 self.check_features(v)
216 elif isinstance(obj, list):
217 for i,v in enumerate(obj):
218 with SourceLine(obj, i, UnsupportedRequirement):
219 self.check_features(v)
221 def make_output_collection(self, name, tagsString, outputObj):
222 outputObj = copy.deepcopy(outputObj)
225 def capture(fileobj):
226 files.append(fileobj)
228 adjustDirObjs(outputObj, capture)
229 adjustFileObjs(outputObj, capture)
231 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
233 final = arvados.collection.Collection(api_client=self.api,
234 keep_client=self.keep_client,
235 num_retries=self.num_retries)
237 for k,v in generatemapper.items():
238 if k.startswith("_:"):
239 if v.type == "Directory":
241 if v.type == "CreateFile":
242 with final.open(v.target, "wb") as f:
243 f.write(v.resolved.encode("utf-8"))
246 if not k.startswith("keep:"):
247 raise Exception("Output source is not in keep or a literal")
249 srccollection = sp[0][5:]
250 reader = self.collection_cache.get(srccollection)
252 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
253 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
255 logger.warn("While preparing output collection: %s", e)
257 def rewrite(fileobj):
258 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
259 for k in ("basename", "listing", "contents"):
263 adjustDirObjs(outputObj, rewrite)
264 adjustFileObjs(outputObj, rewrite)
266 with final.open("cwl.output.json", "w") as f:
267 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
269 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
271 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
272 final.api_response()["name"],
273 final.manifest_locator())
275 final_uuid = final.manifest_locator()
276 tags = tagsString.split(',')
278 self.api.links().create(body={
279 "head_uuid": final_uuid, "link_class": "tag", "name": tag
280 }).execute(num_retries=self.num_retries)
282 def finalcollection(fileobj):
283 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
285 adjustDirObjs(outputObj, finalcollection)
286 adjustFileObjs(outputObj, finalcollection)
288 return (outputObj, final)
290 def set_crunch_output(self):
291 if self.work_api == "containers":
293 current = self.api.containers().current().execute(num_retries=self.num_retries)
294 except ApiError as e:
295 # Status code 404 just means we're not running in a container.
296 if e.resp.status != 404:
297 logger.info("Getting current container: %s", e)
300 self.api.containers().update(uuid=current['uuid'],
302 'output': self.final_output_collection.portable_data_hash(),
303 }).execute(num_retries=self.num_retries)
304 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
307 }).execute(num_retries=self.num_retries)
308 except Exception as e:
309 logger.info("Setting container output: %s", e)
310 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
311 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
313 'output': self.final_output_collection.portable_data_hash(),
314 'success': self.final_status == "success",
316 }).execute(num_retries=self.num_retries)
318 def arv_executor(self, tool, job_order, **kwargs):
319 self.debug = kwargs.get("debug")
321 tool.visit(self.check_features)
323 self.project_uuid = kwargs.get("project_uuid")
325 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
326 collection_cache=self.collection_cache)
327 self.fs_access = make_fs_access(kwargs["basedir"])
329 if not kwargs.get("name"):
330 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
332 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
333 # Also uploads docker images.
334 upload_workflow_deps(self, tool)
336 # Reload tool object which may have been updated by
337 # upload_workflow_deps
338 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
339 makeTool=self.arv_make_tool,
340 loader=tool.doc_loader,
341 avsc_names=tool.doc_schema,
342 metadata=tool.metadata)
344 # Upload local file references in the job order.
345 job_order = upload_job_order(self, "%s input" % kwargs["name"],
348 existing_uuid = kwargs.get("update_workflow")
349 if existing_uuid or kwargs.get("create_workflow"):
350 # Create a pipeline template or workflow record and exit.
351 if self.work_api == "jobs":
352 tmpl = RunnerTemplate(self, tool, job_order,
353 kwargs.get("enable_reuse"),
355 submit_runner_ram=kwargs.get("submit_runner_ram"),
358 # cwltool.main will write our return value to stdout.
359 return (tmpl.uuid, "success")
360 elif self.work_api == "containers":
361 return (upload_workflow(self, tool, job_order,
364 submit_runner_ram=kwargs.get("submit_runner_ram"),
365 name=kwargs["name"]),
368 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
370 kwargs["make_fs_access"] = make_fs_access
371 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
372 kwargs["use_container"] = True
373 kwargs["tmpdir_prefix"] = "tmp"
374 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
376 if self.work_api == "containers":
377 kwargs["outdir"] = "/var/spool/cwl"
378 kwargs["docker_outdir"] = "/var/spool/cwl"
379 kwargs["tmpdir"] = "/tmp"
380 kwargs["docker_tmpdir"] = "/tmp"
381 elif self.work_api == "jobs":
382 kwargs["outdir"] = "$(task.outdir)"
383 kwargs["docker_outdir"] = "$(task.outdir)"
384 kwargs["tmpdir"] = "$(task.tmpdir)"
387 if kwargs.get("submit"):
388 # Submit a runner job to run the workflow for us.
389 if self.work_api == "containers":
390 if tool.tool["class"] == "CommandLineTool":
391 kwargs["runnerjob"] = tool.tool["id"]
392 upload_dependencies(self,
398 runnerjob = tool.job(job_order,
399 self.output_callback,
402 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
405 submit_runner_ram=kwargs.get("submit_runner_ram"),
406 name=kwargs.get("name"),
407 on_error=kwargs.get("on_error"),
408 submit_runner_image=kwargs.get("submit_runner_image"))
409 elif self.work_api == "jobs":
410 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
413 submit_runner_ram=kwargs.get("submit_runner_ram"),
414 name=kwargs.get("name"),
415 on_error=kwargs.get("on_error"),
416 submit_runner_image=kwargs.get("submit_runner_image"))
418 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
419 # Create pipeline for local run
420 self.pipeline = self.api.pipeline_instances().create(
422 "owner_uuid": self.project_uuid,
423 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
425 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
426 logger.info("Pipeline instance %s", self.pipeline["uuid"])
428 if runnerjob and not kwargs.get("wait"):
429 runnerjob.run(wait=kwargs.get("wait"))
430 return (runnerjob.uuid, "success")
432 self.poll_api = arvados.api('v1')
433 self.polling_thread = threading.Thread(target=self.poll_states)
434 self.polling_thread.start()
437 jobiter = iter((runnerjob,))
439 if "cwl_runner_job" in kwargs:
440 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
441 jobiter = tool.job(job_order,
442 self.output_callback,
447 # Will continue to hold the lock for the duration of this code
448 # except when in cond.wait(), at which point on_message can update
449 # job state and process output callbacks.
451 loopperf = Perf(metrics, "jobiter")
453 for runnable in jobiter:
456 if self.stop_polling.is_set():
460 with Perf(metrics, "run"):
461 runnable.run(**kwargs)
466 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
471 while self.processes:
474 except UnsupportedRequirement:
477 if sys.exc_info()[0] is KeyboardInterrupt:
478 logger.error("Interrupted, marking pipeline as failed")
480 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
482 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
483 body={"state": "Failed"}).execute(num_retries=self.num_retries)
484 if runnerjob and runnerjob.uuid and self.work_api == "containers":
485 self.api.container_requests().update(uuid=runnerjob.uuid,
486 body={"priority": "0"}).execute(num_retries=self.num_retries)
489 self.stop_polling.set()
490 self.polling_thread.join()
492 if self.final_status == "UnsupportedRequirement":
493 raise UnsupportedRequirement("Check log for details.")
495 if self.final_output is None:
496 raise WorkflowException("Workflow did not return a result.")
498 if kwargs.get("submit") and isinstance(runnerjob, Runner):
499 logger.info("Final output collection %s", runnerjob.final_output)
501 if self.output_name is None:
502 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
503 if self.output_tags is None:
504 self.output_tags = ""
505 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
506 self.set_crunch_output()
508 if kwargs.get("compute_checksum"):
509 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
510 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
512 return (self.final_output, self.final_status)
516 """Print version string of key packages for provenance and debugging."""
518 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
519 arvpkg = pkg_resources.require("arvados-python-client")
520 cwlpkg = pkg_resources.require("cwltool")
522 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
523 "arvados-python-client", arvpkg[0].version,
524 "cwltool", cwlpkg[0].version)
527 def arg_parser(): # type: () -> argparse.ArgumentParser
528 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
530 parser.add_argument("--basedir", type=str,
531 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
532 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
533 help="Output directory, default current directory")
535 parser.add_argument("--eval-timeout",
536 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
539 parser.add_argument("--version", action="store_true", help="Print version and exit")
541 exgroup = parser.add_mutually_exclusive_group()
542 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
543 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
544 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
546 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
548 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
550 exgroup = parser.add_mutually_exclusive_group()
551 exgroup.add_argument("--enable-reuse", action="store_true",
552 default=True, dest="enable_reuse",
554 exgroup.add_argument("--disable-reuse", action="store_false",
555 default=True, dest="enable_reuse",
558 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
559 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
560 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
561 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
562 help="Ignore Docker image version when deciding whether to reuse past jobs.",
565 exgroup = parser.add_mutually_exclusive_group()
566 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
567 default=True, dest="submit")
568 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
569 default=True, dest="submit")
570 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
571 dest="create_workflow")
572 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
573 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
575 exgroup = parser.add_mutually_exclusive_group()
576 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
577 default=True, dest="wait")
578 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
579 default=True, dest="wait")
581 exgroup = parser.add_mutually_exclusive_group()
582 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
583 default=True, dest="log_timestamps")
584 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
585 default=True, dest="log_timestamps")
587 parser.add_argument("--api", type=str,
588 default=None, dest="work_api",
589 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
591 parser.add_argument("--compute-checksum", action="store_true", default=False,
592 help="Compute checksum of contents while collecting outputs",
593 dest="compute_checksum")
595 parser.add_argument("--submit-runner-ram", type=int,
596 help="RAM (in MiB) required for the workflow runner job (default 1024)",
599 parser.add_argument("--submit-runner-image", type=str,
600 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
603 parser.add_argument("--name", type=str,
604 help="Name to use for workflow execution instance.",
607 parser.add_argument("--on-error", type=str,
608 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
609 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
611 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
612 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
617 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
618 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
619 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
621 cwltool.process.supportedProcessRequirements.extend([
622 "http://arvados.org/cwl#RunInSingleContainer",
623 "http://arvados.org/cwl#OutputDirType",
624 "http://arvados.org/cwl#RuntimeConstraints",
625 "http://arvados.org/cwl#PartitionRequirement",
626 "http://arvados.org/cwl#APIRequirement",
627 "http://commonwl.org/cwltool#LoadListingRequirement"
630 def main(args, stdout, stderr, api_client=None, keep_client=None):
631 parser = arg_parser()
633 job_order_object = None
634 arvargs = parser.parse_args(args)
637 print versionstring()
640 if arvargs.update_workflow:
641 if arvargs.update_workflow.find('-7fd4e-') == 5:
642 want_api = 'containers'
643 elif arvargs.update_workflow.find('-p5p6p-') == 5:
647 if want_api and arvargs.work_api and want_api != arvargs.work_api:
648 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
649 arvargs.update_workflow, want_api, arvargs.work_api))
651 arvargs.work_api = want_api
653 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
654 job_order_object = ({}, "")
659 if api_client is None:
660 api_client=arvados.api('v1', model=OrderedJsonModel())
661 if keep_client is None:
662 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
663 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
664 num_retries=4, output_name=arvargs.output_name,
665 output_tags=arvargs.output_tags)
666 except Exception as e:
671 logger.setLevel(logging.DEBUG)
672 logging.getLogger('arvados').setLevel(logging.DEBUG)
675 logger.setLevel(logging.WARN)
676 logging.getLogger('arvados').setLevel(logging.WARN)
677 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
680 metrics.setLevel(logging.DEBUG)
681 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
683 if arvargs.log_timestamps:
684 arvados.log_handler.setFormatter(logging.Formatter(
685 '%(asctime)s %(name)s %(levelname)s: %(message)s',
686 '%Y-%m-%d %H:%M:%S'))
688 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
690 arvargs.conformance_test = None
691 arvargs.use_container = True
692 arvargs.relax_path_checks = True
693 arvargs.validate = None
695 make_fs_access = partial(CollectionFsAccess,
696 collection_cache=runner.collection_cache)
698 return cwltool.main.main(args=arvargs,
701 executor=runner.arv_executor,
702 makeTool=runner.arv_make_tool,
703 versionfunc=versionstring,
704 job_order_object=job_order_object,
705 make_fs_access=make_fs_access,
706 fetcher_constructor=partial(CollectionFetcher,
707 api_client=api_client,
708 fs_access=make_fs_access(""),
709 num_retries=runner.num_retries),
710 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
711 logger_handler=arvados.log_handler,
712 custom_schema_callback=add_arv_hints)