X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/607fe087f6167061714a524dd53cbbc21b974973..0eb72b526bf8bbb011551ecf019f604e17a534f1:/sdk/cwl/arvados_cwl/pathmapper.py diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py index 5e2ee46a87..e39c7d23ce 100644 --- a/sdk/cwl/arvados_cwl/pathmapper.py +++ b/sdk/cwl/arvados_cwl/pathmapper.py @@ -1,3 +1,7 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + import re import logging import uuid @@ -36,13 +40,14 @@ class ArvPathMapper(PathMapper): pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$') def __init__(self, arvrunner, referenced_files, input_basedir, - collection_pattern, file_pattern, name=None, **kwargs): + collection_pattern, file_pattern, name=None, single_collection=False, **kwargs): self.arvrunner = arvrunner self.input_basedir = input_basedir self.collection_pattern = collection_pattern self.file_pattern = file_pattern self.name = name self.referenced_files = [r["location"] for r in referenced_files] + self.single_collection = single_collection super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None) def visit(self, srcobj, uploadfiles): @@ -60,7 +65,8 @@ class ArvPathMapper(PathMapper): ab = abspath(src, self.input_basedir) st = arvados.commands.run.statfile("", ab, fnPattern="keep:%s/%s", - dirPattern="keep:%s/%s") + dirPattern="keep:%s/%s", + raiseOSError=True) with SourceLine(srcobj, "location", WorkflowException): if isinstance(st, arvados.commands.run.UploadFile): uploadfiles.add((src, ab, st)) @@ -105,46 +111,61 @@ class ArvPathMapper(PathMapper): # type: (List[Any], unicode) -> None uploadfiles = set() + collection = None + if self.single_collection: + collection = arvados.collection.Collection(api_client=self.arvrunner.api, + keep_client=self.arvrunner.keep_client, + num_retries=self.arvrunner.num_retries) + already_uploaded = self.arvrunner.get_uploaded() + copied_files = set() for k in referenced_files: loc = k["location"] if loc in already_uploaded: v = already_uploaded[loc] - self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), "File", True) + self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True) + if self.single_collection: + basename = k["basename"] + if basename not in collection: + self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", []) + copied_files.add((loc, basename, v.type)) for srcobj in referenced_files: self.visit(srcobj, uploadfiles) - if uploadfiles: - arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], - self.arvrunner.api, - dry_run=False, - num_retries=self.arvrunner.num_retries, - fnPattern="keep:%s/%s", - name=self.name, - project=self.arvrunner.project_uuid) + arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], + self.arvrunner.api, + dry_run=False, + num_retries=self.arvrunner.num_retries, + fnPattern="keep:%s/%s", + name=self.name, + project=self.arvrunner.project_uuid, + collection=collection) for src, ab, st in uploadfiles: self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:], "Directory" if os.path.isdir(ab) else "File", True) self.arvrunner.add_uploaded(src, self._pathmap[src]) + for loc, basename, cls in copied_files: + fn = "keep:%s/%s" % (collection.portable_data_hash(), basename) + self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True) + for srcobj in referenced_files: subdirs = [] - if srcobj["class"] == "Directory": - if srcobj["location"] not in self._pathmap: - c = arvados.collection.Collection(api_client=self.arvrunner.api, - keep_client=self.arvrunner.keep_client, - num_retries=self.arvrunner.num_retries) - for l in srcobj.get("listing", []): - self.addentry(l, c, ".", subdirs) - - check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries) - if not check["items"]: - c.save_new(owner_uuid=self.arvrunner.project_uuid) + if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap: + c = arvados.collection.Collection(api_client=self.arvrunner.api, + keep_client=self.arvrunner.keep_client, + num_retries=self.arvrunner.num_retries) + for l in srcobj.get("listing", []): + self.addentry(l, c, ".", subdirs) - ab = self.collection_pattern % c.portable_data_hash() - self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True) + check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries) + if not check["items"]: + c.save_new(owner_uuid=self.arvrunner.project_uuid) + + ab = self.collection_pattern % c.portable_data_hash() + self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True) elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or (srcobj["location"].startswith("_:") and "contents" in srcobj)):