1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
11 import arvados.commands.run
12 import arvados.collection
14 from schema_salad.sourceline import SourceLine
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
19 logger = logging.getLogger('arvados.cwl-runner')
21 def trim_listing(obj):
22 """Remove 'listing' field from Directory objects that are keep references.
24 When Directory objects represent Keep references, it is redundant and
25 potentially very expensive to pass fully enumerated Directory objects
26 between instances of cwl-runner (e.g. a submitting a job, or using the
27 RunInSingleContainer feature), so delete the 'listing' field when it is
32 if obj.get("location", "").startswith("keep:") and "listing" in obj:
36 class ArvPathMapper(PathMapper):
37 """Convert container-local paths to and from Keep collection ids."""
39 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
42 def __init__(self, arvrunner, referenced_files, input_basedir,
43 collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
44 self.arvrunner = arvrunner
45 self.input_basedir = input_basedir
46 self.collection_pattern = collection_pattern
47 self.file_pattern = file_pattern
49 self.referenced_files = [r["location"] for r in referenced_files]
50 self.single_collection = single_collection
51 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
53 def visit(self, srcobj, uploadfiles):
54 src = srcobj["location"]
56 src = src[:src.index("#")]
58 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
59 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
61 if src not in self._pathmap:
62 if src.startswith("file:"):
63 # Local FS ref, may need to be uploaded or may be on keep
65 ab = abspath(src, self.input_basedir)
66 st = arvados.commands.run.statfile("", ab,
67 fnPattern="keep:%s/%s",
68 dirPattern="keep:%s/%s",
70 with SourceLine(srcobj, "location", WorkflowException):
71 if isinstance(st, arvados.commands.run.UploadFile):
72 uploadfiles.add((src, ab, st))
73 elif isinstance(st, arvados.commands.run.ArvFile):
74 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
76 raise WorkflowException("Input file path '%s' is invalid" % st)
77 elif src.startswith("_:"):
78 if srcobj["class"] == "File" and "contents" not in srcobj:
79 raise WorkflowException("File literal '%s' is missing `contents`" % src)
80 if srcobj["class"] == "Directory" and "listing" not in srcobj:
81 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
83 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
85 with SourceLine(srcobj, "secondaryFiles", WorkflowException):
86 for l in srcobj.get("secondaryFiles", []):
87 self.visit(l, uploadfiles)
88 with SourceLine(srcobj, "listing", WorkflowException):
89 for l in srcobj.get("listing", []):
90 self.visit(l, uploadfiles)
92 def addentry(self, obj, c, path, subdirs):
93 if obj["location"] in self._pathmap:
94 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
97 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
98 for l in obj.get("secondaryFiles", []):
99 self.addentry(l, c, path, subdirs)
100 elif obj["class"] == "Directory":
101 for l in obj.get("listing", []):
102 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
103 subdirs.append((obj["location"], path + "/" + obj["basename"]))
104 elif obj["location"].startswith("_:") and "contents" in obj:
105 with c.open(path + "/" + obj["basename"], "w") as f:
106 f.write(obj["contents"].encode("utf-8"))
108 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
110 def setup(self, referenced_files, basedir):
111 # type: (List[Any], unicode) -> None
115 if self.single_collection:
116 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
117 keep_client=self.arvrunner.keep_client,
118 num_retries=self.arvrunner.num_retries)
120 already_uploaded = self.arvrunner.get_uploaded()
122 for k in referenced_files:
124 if loc in already_uploaded:
125 v = already_uploaded[loc]
126 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
127 if self.single_collection:
128 basename = k["basename"]
129 if basename not in collection:
130 self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
131 copied_files.add((loc, basename, v.type))
133 for srcobj in referenced_files:
134 self.visit(srcobj, uploadfiles)
136 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
139 num_retries=self.arvrunner.num_retries,
140 fnPattern="keep:%s/%s",
142 project=self.arvrunner.project_uuid,
143 collection=collection)
145 for src, ab, st in uploadfiles:
146 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
147 "Directory" if os.path.isdir(ab) else "File", True)
148 self.arvrunner.add_uploaded(src, self._pathmap[src])
150 for loc, basename, cls in copied_files:
151 fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
152 self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
154 for srcobj in referenced_files:
156 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
157 c = arvados.collection.Collection(api_client=self.arvrunner.api,
158 keep_client=self.arvrunner.keep_client,
159 num_retries=self.arvrunner.num_retries)
160 for l in srcobj.get("listing", []):
161 self.addentry(l, c, ".", subdirs)
163 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
164 if not check["items"]:
165 c.save_new(owner_uuid=self.arvrunner.project_uuid)
167 ab = self.collection_pattern % c.portable_data_hash()
168 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
169 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
170 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
172 c = arvados.collection.Collection(api_client=self.arvrunner.api,
173 keep_client=self.arvrunner.keep_client,
174 num_retries=self.arvrunner.num_retries )
175 self.addentry(srcobj, c, ".", subdirs)
177 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
178 if not check["items"]:
179 c.save_new(owner_uuid=self.arvrunner.project_uuid)
181 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
182 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
184 if srcobj.get("secondaryFiles"):
185 ab = self.collection_pattern % c.portable_data_hash()
186 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
189 for loc, sub in subdirs:
190 # subdirs will all start with "./", strip it off
191 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
192 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
193 ab, "Directory", True)
197 def reversemap(self, target):
198 if target.startswith("keep:"):
199 return (target, target)
200 elif self.keepdir and target.startswith(self.keepdir):
201 return (target, "keep:" + target[len(self.keepdir)+1:])
203 return super(ArvPathMapper, self).reversemap(target)
205 class StagingPathMapper(PathMapper):
208 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
210 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
212 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
213 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
214 loc = obj["location"]
215 tgt = os.path.join(stagedir, obj["basename"])
216 basetgt, baseext = os.path.splitext(tgt)
218 while tgt in self.targets:
220 tgt = "%s_%i%s" % (basetgt, n, baseext)
221 self.targets.add(tgt)
222 if obj["class"] == "Directory":
223 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
224 if loc.startswith("_:") or self._follow_dirs:
225 self.visitlisting(obj.get("listing", []), tgt, basedir)
226 elif obj["class"] == "File":
227 if loc in self._pathmap:
229 if "contents" in obj and loc.startswith("_:"):
230 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
233 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
235 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
236 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
239 class VwdPathMapper(StagingPathMapper):
240 def setup(self, referenced_files, basedir):
241 # type: (List[Any], unicode) -> None
243 # Go through each file and set the target to its own directory along
244 # with any secondary files.
245 self.visitlisting(referenced_files, self.stagedir, basedir)
247 for path, (ab, tgt, type, staged) in self._pathmap.items():
248 if type in ("File", "Directory") and ab.startswith("keep:"):
249 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
252 class NoFollowPathMapper(StagingPathMapper):
254 def setup(self, referenced_files, basedir):
255 # type: (List[Any], unicode) -> None
256 self.visitlisting(referenced_files, self.stagedir, basedir)