from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from ._version import __version__
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+
self.work_api = None
expected_api = ["jobs", "containers"]
for api in expected_api:
kwargs["work_api"] = self.work_api
kwargs["fetcher_constructor"] = partial(CollectionFetcher,
api_client=self.api,
- keep_client=self.keep_client)
+ fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+ num_retries=self.num_retries)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
keep_client=self.keep_client,
num_retries=self.num_retries)
- srccollections = {}
for k,v in generatemapper.items():
if k.startswith("_:"):
if v.type == "Directory":
raise Exception("Output source is not in keep or a literal")
sp = k.split("/")
srccollection = sp[0][5:]
- if srccollection not in srccollections:
- try:
- srccollections[srccollection] = arvados.collection.CollectionReader(
- srccollection,
- api_client=self.api,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
- except arvados.errors.ArgumentError as e:
- logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
- raise
- reader = srccollections[srccollection]
+ reader = self.collection_cache.get(srccollection)
try:
srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
self.project_uuid = kwargs.get("project_uuid")
self.pipeline = None
make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
- api_client=self.api,
- keep_client=self.keep_client)
+ collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
if not kwargs.get("name"):
"http://commonwl.org/cwltool#LoadListingRequirement"
])
-
def main(args, stdout, stderr, api_client=None, keep_client=None):
parser = arg_parser()
arvargs.relax_path_checks = True
arvargs.validate = None
+ make_fs_access = partial(CollectionFsAccess,
+ collection_cache=runner.collection_cache)
+
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
makeTool=runner.arv_make_tool,
versionfunc=versionstring,
job_order_object=job_order_object,
- make_fs_access=partial(CollectionFsAccess,
- api_client=api_client,
- keep_client=keep_client),
+ make_fs_access=make_fs_access,
fetcher_constructor=partial(CollectionFetcher,
api_client=api_client,
- keep_client=keep_client,
+ fs_access=make_fs_access(""),
num_retries=runner.num_retries),
resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
logger_handler=arvados.log_handler,
logger = logging.getLogger('arvados.cwl-runner')
+class CollectionCache(object):
+ def __init__(self, api_client, keep_client, num_retries):
+ self.api_client = api_client
+ self.keep_client = keep_client
+ self.collections = {}
+
+ def get(self, pdh):
+ if pdh not in self.collections:
+ logger.debug("Creating collection reader for %s", pdh)
+ self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
+ keep_client=self.keep_client)
+ return self.collections[pdh]
+
+
class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
- def __init__(self, basedir, api_client=None, keep_client=None):
+ def __init__(self, basedir, collection_cache=None):
super(CollectionFsAccess, self).__init__(basedir)
- self.api_client = api_client
- self.keep_client = keep_client
- self.collections = {}
+ self.collection_cache = collection_cache
def get_collection(self, path):
sp = path.split("/", 1)
p = sp[0]
if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
pdh = p[5:]
- if pdh not in self.collections:
- self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
- keep_client=self.keep_client)
- return (self.collections[pdh], sp[1] if len(sp) == 2 else None)
+ return (self.collection_cache.get(pdh), sp[1] if len(sp) == 2 else None)
else:
return (None, path)
return os.path.realpath(path)
class CollectionFetcher(DefaultFetcher):
- def __init__(self, cache, session, api_client=None, keep_client=None, num_retries=4):
+ def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
super(CollectionFetcher, self).__init__(cache, session)
self.api_client = api_client
- self.fsaccess = CollectionFsAccess("", api_client=api_client, keep_client=keep_client)
+ self.fsaccess = fs_access
self.num_retries = num_retries
def fetch_text(self, url):
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}]
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
}],
"baseCommand": "ls"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
avsc_names=avsc_names, make_fs_access=make_fs_access,
loader=Loader({}))
}],
"baseCommand": "ls"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
avsc_names=avsc_names, make_fs_access=make_fs_access,
loader=Loader({}))
"stdin": "/keep/99999999999999999999999999999996+99/file.txt",
"arguments": [{"valueFrom": "$(runtime.outdir)"}]
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}]
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
}],
"baseCommand": "ls"
}
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=document_loader,
makeTool=runner.arv_make_tool, metadata=metadata)