3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 import cwltool.process
22 from schema_salad.sourceline import SourceLine
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
49 arvados.log_handler.setFormatter(logging.Formatter(
50 '%(asctime)s %(name)s %(levelname)s: %(message)s',
53 class ArvCwlRunner(object):
54 """Execute a CWL tool or workflow, submit work (using either jobs or
55 containers API), wait for them to complete, and report output.
59 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
62 self.lock = threading.Lock()
63 self.cond = threading.Condition(self.lock)
64 self.final_output = None
65 self.final_status = None
67 self.num_retries = num_retries
69 self.stop_polling = threading.Event()
72 self.final_output_collection = None
73 self.output_name = output_name
74 self.output_tags = output_tags
75 self.project_uuid = None
77 if keep_client is not None:
78 self.keep_client = keep_client
80 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 expected_api = ["jobs", "containers"]
84 for api in expected_api:
86 methods = self.api._rootDesc.get('resources')[api]['methods']
87 if ('httpMethod' in methods['create'] and
88 (work_api == api or work_api is None)):
96 raise Exception("No supported APIs")
98 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
100 def arv_make_tool(self, toolpath_object, **kwargs):
101 kwargs["work_api"] = self.work_api
102 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
104 keep_client=self.keep_client)
105 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106 return ArvadosCommandTool(self, toolpath_object, **kwargs)
107 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108 return ArvadosWorkflow(self, toolpath_object, **kwargs)
110 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
112 def output_callback(self, out, processStatus):
113 if processStatus == "success":
114 logger.info("Overall process status is %s", processStatus)
116 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117 body={"state": "Complete"}).execute(num_retries=self.num_retries)
119 logger.warn("Overall process status is %s", processStatus)
121 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122 body={"state": "Failed"}).execute(num_retries=self.num_retries)
123 self.final_status = processStatus
124 self.final_output = out
126 def on_message(self, event):
127 if "object_uuid" in event:
128 if event["object_uuid"] in self.processes and event["event_type"] == "update":
129 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130 uuid = event["object_uuid"]
132 j = self.processes[uuid]
133 logger.info("%s %s is Running", self.label(j), uuid)
135 j.update_pipeline_component(event["properties"]["new_attributes"])
136 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137 uuid = event["object_uuid"]
140 j = self.processes[uuid]
141 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142 with Perf(metrics, "done %s" % j.name):
143 j.done(event["properties"]["new_attributes"])
148 def label(self, obj):
149 return "[%s %s]" % (self.work_api[0:-1], obj.name)
151 def poll_states(self):
152 """Poll status of jobs or containers listed in the processes dict.
154 Runs in a separate thread.
159 self.stop_polling.wait(15)
160 if self.stop_polling.is_set():
163 keys = self.processes.keys()
167 if self.work_api == "containers":
168 table = self.poll_api.container_requests()
169 elif self.work_api == "jobs":
170 table = self.poll_api.jobs()
173 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174 except Exception as e:
175 logger.warn("Error checking states on API server: %s", e)
178 for p in proc_states["items"]:
180 "object_uuid": p["uuid"],
181 "event_type": "update",
187 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
189 self.processes.clear()
193 self.stop_polling.set()
195 def get_uploaded(self):
196 return self.uploaded.copy()
198 def add_uploaded(self, src, pair):
199 self.uploaded[src] = pair
201 def check_features(self, obj):
202 if isinstance(obj, dict):
203 if obj.get("class") == "InitialWorkDirRequirement":
204 if self.work_api == "containers":
205 raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206 if obj.get("writable"):
207 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208 if obj.get("class") == "CommandLineTool":
209 if self.work_api == "containers":
211 raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212 if obj.get("stderr"):
213 raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214 for v in obj.itervalues():
215 self.check_features(v)
216 elif isinstance(obj, list):
217 for i,v in enumerate(obj):
218 with SourceLine(obj, i, UnsupportedRequirement):
219 self.check_features(v)
221 def make_output_collection(self, name, tagsString, outputObj):
222 outputObj = copy.deepcopy(outputObj)
225 def capture(fileobj):
226 files.append(fileobj)
228 adjustDirObjs(outputObj, capture)
229 adjustFileObjs(outputObj, capture)
231 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
233 final = arvados.collection.Collection(api_client=self.api,
234 keep_client=self.keep_client,
235 num_retries=self.num_retries)
238 for k,v in generatemapper.items():
239 if k.startswith("_:"):
240 if v.type == "Directory":
242 if v.type == "CreateFile":
243 with final.open(v.target, "wb") as f:
244 f.write(v.resolved.encode("utf-8"))
247 if not k.startswith("keep:"):
248 raise Exception("Output source is not in keep or a literal")
250 srccollection = sp[0][5:]
251 if srccollection not in srccollections:
253 srccollections[srccollection] = arvados.collection.CollectionReader(
256 keep_client=self.keep_client,
257 num_retries=self.num_retries)
258 except arvados.errors.ArgumentError as e:
259 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
261 reader = srccollections[srccollection]
263 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
264 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
266 logger.warn("While preparing output collection: %s", e)
268 def rewrite(fileobj):
269 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
270 for k in ("basename", "listing", "contents"):
274 adjustDirObjs(outputObj, rewrite)
275 adjustFileObjs(outputObj, rewrite)
277 with final.open("cwl.output.json", "w") as f:
278 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
280 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
282 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
283 final.api_response()["name"],
284 final.manifest_locator())
286 final_uuid = final.manifest_locator()
287 tags = tagsString.split(',')
289 self.api.links().create(body={
290 "head_uuid": final_uuid, "link_class": "tag", "name": tag
291 }).execute(num_retries=self.num_retries)
293 def finalcollection(fileobj):
294 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
296 adjustDirObjs(outputObj, finalcollection)
297 adjustFileObjs(outputObj, finalcollection)
299 return (outputObj, final)
301 def set_crunch_output(self):
302 if self.work_api == "containers":
304 current = self.api.containers().current().execute(num_retries=self.num_retries)
305 except ApiError as e:
306 # Status code 404 just means we're not running in a container.
307 if e.resp.status != 404:
308 logger.info("Getting current container: %s", e)
311 self.api.containers().update(uuid=current['uuid'],
313 'output': self.final_output_collection.portable_data_hash(),
314 }).execute(num_retries=self.num_retries)
315 except Exception as e:
316 logger.info("Setting container output: %s", e)
317 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
318 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
320 'output': self.final_output_collection.portable_data_hash(),
321 'success': self.final_status == "success",
323 }).execute(num_retries=self.num_retries)
325 def arv_executor(self, tool, job_order, **kwargs):
326 self.debug = kwargs.get("debug")
328 tool.visit(self.check_features)
330 self.project_uuid = kwargs.get("project_uuid")
332 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
334 keep_client=self.keep_client)
335 self.fs_access = make_fs_access(kwargs["basedir"])
337 if not kwargs.get("name"):
338 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
340 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
341 # Also uploads docker images.
342 upload_workflow_deps(self, tool)
344 # Reload tool object which may have been updated by
345 # upload_workflow_deps
346 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
347 makeTool=self.arv_make_tool,
348 loader=tool.doc_loader,
349 avsc_names=tool.doc_schema,
350 metadata=tool.metadata)
352 # Upload local file references in the job order.
353 job_order = upload_job_order(self, "%s input" % kwargs["name"],
356 existing_uuid = kwargs.get("update_workflow")
357 if existing_uuid or kwargs.get("create_workflow"):
358 # Create a pipeline template or workflow record and exit.
359 if self.work_api == "jobs":
360 tmpl = RunnerTemplate(self, tool, job_order,
361 kwargs.get("enable_reuse"),
363 submit_runner_ram=kwargs.get("submit_runner_ram"),
366 # cwltool.main will write our return value to stdout.
367 return (tmpl.uuid, "success")
368 elif self.work_api == "containers":
369 return (upload_workflow(self, tool, job_order,
372 submit_runner_ram=kwargs.get("submit_runner_ram"),
373 name=kwargs["name"]),
376 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
378 kwargs["make_fs_access"] = make_fs_access
379 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
380 kwargs["use_container"] = True
381 kwargs["tmpdir_prefix"] = "tmp"
382 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
384 if self.work_api == "containers":
385 kwargs["outdir"] = "/var/spool/cwl"
386 kwargs["docker_outdir"] = "/var/spool/cwl"
387 kwargs["tmpdir"] = "/tmp"
388 kwargs["docker_tmpdir"] = "/tmp"
389 elif self.work_api == "jobs":
390 kwargs["outdir"] = "$(task.outdir)"
391 kwargs["docker_outdir"] = "$(task.outdir)"
392 kwargs["tmpdir"] = "$(task.tmpdir)"
395 if kwargs.get("submit"):
396 # Submit a runner job to run the workflow for us.
397 if self.work_api == "containers":
398 if tool.tool["class"] == "CommandLineTool":
399 kwargs["runnerjob"] = tool.tool["id"]
400 upload_dependencies(self,
406 runnerjob = tool.job(job_order,
407 self.output_callback,
410 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
413 submit_runner_ram=kwargs.get("submit_runner_ram"),
414 name=kwargs.get("name"),
415 on_error=kwargs.get("on_error"),
416 submit_runner_image=kwargs.get("submit_runner_image"))
417 elif self.work_api == "jobs":
418 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
421 submit_runner_ram=kwargs.get("submit_runner_ram"),
422 name=kwargs.get("name"),
423 on_error=kwargs.get("on_error"),
424 submit_runner_image=kwargs.get("submit_runner_image"))
426 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
427 # Create pipeline for local run
428 self.pipeline = self.api.pipeline_instances().create(
430 "owner_uuid": self.project_uuid,
431 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
433 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
434 logger.info("Pipeline instance %s", self.pipeline["uuid"])
436 if runnerjob and not kwargs.get("wait"):
437 runnerjob.run(wait=kwargs.get("wait"))
438 return (runnerjob.uuid, "success")
440 self.poll_api = arvados.api('v1')
441 self.polling_thread = threading.Thread(target=self.poll_states)
442 self.polling_thread.start()
445 jobiter = iter((runnerjob,))
447 if "cwl_runner_job" in kwargs:
448 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
449 jobiter = tool.job(job_order,
450 self.output_callback,
455 # Will continue to hold the lock for the duration of this code
456 # except when in cond.wait(), at which point on_message can update
457 # job state and process output callbacks.
459 loopperf = Perf(metrics, "jobiter")
461 for runnable in jobiter:
464 if self.stop_polling.is_set():
468 with Perf(metrics, "run"):
469 runnable.run(**kwargs)
474 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
479 while self.processes:
482 except UnsupportedRequirement:
485 if sys.exc_info()[0] is KeyboardInterrupt:
486 logger.error("Interrupted, marking pipeline as failed")
488 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
490 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
491 body={"state": "Failed"}).execute(num_retries=self.num_retries)
492 if runnerjob and runnerjob.uuid and self.work_api == "containers":
493 self.api.container_requests().update(uuid=runnerjob.uuid,
494 body={"priority": "0"}).execute(num_retries=self.num_retries)
497 self.stop_polling.set()
498 self.polling_thread.join()
500 if self.final_status == "UnsupportedRequirement":
501 raise UnsupportedRequirement("Check log for details.")
503 if self.final_output is None:
504 raise WorkflowException("Workflow did not return a result.")
506 if kwargs.get("submit") and isinstance(runnerjob, Runner):
507 logger.info("Final output collection %s", runnerjob.final_output)
509 if self.output_name is None:
510 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
511 if self.output_tags is None:
512 self.output_tags = ""
513 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
514 self.set_crunch_output()
516 if kwargs.get("compute_checksum"):
517 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
518 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
520 return (self.final_output, self.final_status)
524 """Print version string of key packages for provenance and debugging."""
526 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
527 arvpkg = pkg_resources.require("arvados-python-client")
528 cwlpkg = pkg_resources.require("cwltool")
530 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
531 "arvados-python-client", arvpkg[0].version,
532 "cwltool", cwlpkg[0].version)
535 def arg_parser(): # type: () -> argparse.ArgumentParser
536 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
538 parser.add_argument("--basedir", type=str,
539 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
540 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
541 help="Output directory, default current directory")
543 parser.add_argument("--eval-timeout",
544 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
547 parser.add_argument("--version", action="store_true", help="Print version and exit")
549 exgroup = parser.add_mutually_exclusive_group()
550 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
551 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
552 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
554 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
556 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
558 exgroup = parser.add_mutually_exclusive_group()
559 exgroup.add_argument("--enable-reuse", action="store_true",
560 default=True, dest="enable_reuse",
562 exgroup.add_argument("--disable-reuse", action="store_false",
563 default=True, dest="enable_reuse",
566 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
567 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
568 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
569 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
570 help="Ignore Docker image version when deciding whether to reuse past jobs.",
573 exgroup = parser.add_mutually_exclusive_group()
574 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
575 default=True, dest="submit")
576 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
577 default=True, dest="submit")
578 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
579 dest="create_workflow")
580 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
581 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
583 exgroup = parser.add_mutually_exclusive_group()
584 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
585 default=True, dest="wait")
586 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
587 default=True, dest="wait")
589 exgroup = parser.add_mutually_exclusive_group()
590 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
591 default=True, dest="log_timestamps")
592 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
593 default=True, dest="log_timestamps")
595 parser.add_argument("--api", type=str,
596 default=None, dest="work_api",
597 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
599 parser.add_argument("--compute-checksum", action="store_true", default=False,
600 help="Compute checksum of contents while collecting outputs",
601 dest="compute_checksum")
603 parser.add_argument("--submit-runner-ram", type=int,
604 help="RAM (in MiB) required for the workflow runner job (default 1024)",
607 parser.add_argument("--submit-runner-image", type=str,
608 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
611 parser.add_argument("--name", type=str,
612 help="Name to use for workflow execution instance.",
615 parser.add_argument("--on-error", type=str,
616 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
617 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
619 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
620 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
626 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
627 cache["http://arvados.org/cwl"] = res.read()
629 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
630 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
631 for n in extnames.names:
632 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
633 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
634 document_loader.idx["http://arvados.org/cwl#"+n] = {}
636 def main(args, stdout, stderr, api_client=None, keep_client=None):
637 parser = arg_parser()
639 job_order_object = None
640 arvargs = parser.parse_args(args)
643 print versionstring()
646 if arvargs.update_workflow:
647 if arvargs.update_workflow.find('-7fd4e-') == 5:
648 want_api = 'containers'
649 elif arvargs.update_workflow.find('-p5p6p-') == 5:
653 if want_api and arvargs.work_api and want_api != arvargs.work_api:
654 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
655 arvargs.update_workflow, want_api, arvargs.work_api))
657 arvargs.work_api = want_api
659 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
660 job_order_object = ({}, "")
665 if api_client is None:
666 api_client=arvados.api('v1', model=OrderedJsonModel())
667 if keep_client is None:
668 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
669 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
670 num_retries=4, output_name=arvargs.output_name,
671 output_tags=arvargs.output_tags)
672 except Exception as e:
677 logger.setLevel(logging.DEBUG)
680 logger.setLevel(logging.WARN)
681 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
684 metrics.setLevel(logging.DEBUG)
685 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
687 if arvargs.log_timestamps:
688 arvados.log_handler.setFormatter(logging.Formatter(
689 '%(asctime)s %(name)s %(levelname)s: %(message)s',
690 '%Y-%m-%d %H:%M:%S'))
692 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
694 arvargs.conformance_test = None
695 arvargs.use_container = True
696 arvargs.relax_path_checks = True
697 arvargs.validate = None
699 return cwltool.main.main(args=arvargs,
702 executor=runner.arv_executor,
703 makeTool=runner.arv_make_tool,
704 versionfunc=versionstring,
705 job_order_object=job_order_object,
706 make_fs_access=partial(CollectionFsAccess,
707 api_client=api_client,
708 keep_client=keep_client),
709 fetcher_constructor=partial(CollectionFetcher,
710 api_client=api_client,
711 keep_client=keep_client),
712 resolver=partial(collectionResolver, api_client),
713 logger_handler=arvados.log_handler)