1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
29 from .http import http_to_keep
31 logger = logging.getLogger('arvados.cwl-runner')
33 def trim_listing(obj):
34 """Remove 'listing' field from Directory objects that are keep references.
36 When Directory objects represent Keep references, it is redundant and
37 potentially very expensive to pass fully enumerated Directory objects
38 between instances of cwl-runner (e.g. a submitting a job, or using the
39 RunInSingleContainer feature), so delete the 'listing' field when it is
44 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
51 class ArvPathMapper(PathMapper):
52 """Convert container-local paths to and from Keep collection ids."""
54 def __init__(self, arvrunner, referenced_files, input_basedir,
55 collection_pattern, file_pattern, name=None, single_collection=False):
56 self.arvrunner = arvrunner
57 self.input_basedir = input_basedir
58 self.collection_pattern = collection_pattern
59 self.file_pattern = file_pattern
61 self.referenced_files = [r["location"] for r in referenced_files]
62 self.single_collection = single_collection
64 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
66 def visit(self, srcobj, uploadfiles):
67 src = srcobj["location"]
69 src = src[:src.index("#")]
71 debug = logger.isEnabledFor(logging.DEBUG)
73 if isinstance(src, basestring) and src.startswith("keep:"):
74 if collection_pdh_pattern.match(src):
75 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
76 if arvados_cwl.util.collectionUUID in srcobj:
77 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
78 elif not collection_uuid_pattern.match(src):
79 with SourceLine(srcobj, "location", WorkflowException, debug):
80 raise WorkflowException("Invalid keep reference '%s'" % src)
82 if src not in self._pathmap:
83 if src.startswith("file:"):
84 # Local FS ref, may need to be uploaded or may be on keep
86 ab = abspath(src, self.input_basedir)
87 st = arvados.commands.run.statfile("", ab,
88 fnPattern="keep:%s/%s",
89 dirPattern="keep:%s/%s",
91 with SourceLine(srcobj, "location", WorkflowException, debug):
92 if isinstance(st, arvados.commands.run.UploadFile):
93 uploadfiles.add((src, ab, st))
94 elif isinstance(st, arvados.commands.run.ArvFile):
95 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
97 raise WorkflowException("Input file path '%s' is invalid" % st)
98 elif src.startswith("_:"):
99 if srcobj["class"] == "File" and "contents" not in srcobj:
100 raise WorkflowException("File literal '%s' is missing `contents`" % src)
101 if srcobj["class"] == "Directory" and "listing" not in srcobj:
102 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
103 elif src.startswith("http:") or src.startswith("https:"):
104 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
105 logger.info("%s is %s", src, keepref)
106 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
108 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
110 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
111 for l in srcobj.get("secondaryFiles", []):
112 self.visit(l, uploadfiles)
113 with SourceLine(srcobj, "listing", WorkflowException, debug):
114 for l in srcobj.get("listing", []):
115 self.visit(l, uploadfiles)
117 def addentry(self, obj, c, path, remap):
118 if obj["location"] in self._pathmap:
119 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
122 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
123 remap.append((obj["location"], path + "/" + obj["basename"]))
124 for l in obj.get("secondaryFiles", []):
125 self.addentry(l, c, path, remap)
126 elif obj["class"] == "Directory":
127 for l in obj.get("listing", []):
128 self.addentry(l, c, path + "/" + obj["basename"], remap)
129 remap.append((obj["location"], path + "/" + obj["basename"]))
130 elif obj["location"].startswith("_:") and "contents" in obj:
131 with c.open(path + "/" + obj["basename"], "w") as f:
132 f.write(obj["contents"])
133 remap.append((obj["location"], path + "/" + obj["basename"]))
135 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
137 def needs_new_collection(self, srcobj, prefix=""):
138 """Check if files need to be staged into a new collection.
140 If all the files are in the same collection and in the same
141 paths they would be staged to, return False. Otherwise, a new
142 collection is needed with files copied/created in the
146 loc = srcobj["location"]
147 if loc.startswith("_:"):
150 if loc != prefix+srcobj["basename"]:
158 if srcobj["class"] == "File" and loc not in self._pathmap:
160 for s in srcobj.get("secondaryFiles", []):
161 if self.needs_new_collection(s, prefix):
163 if srcobj.get("listing"):
164 prefix = "%s%s/" % (prefix, srcobj["basename"])
165 for l in srcobj["listing"]:
166 if self.needs_new_collection(l, prefix):
170 def setup(self, referenced_files, basedir):
171 # type: (List[Any], unicode) -> None
175 if self.single_collection:
176 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
177 keep_client=self.arvrunner.keep_client,
178 num_retries=self.arvrunner.num_retries)
180 for srcobj in referenced_files:
181 self.visit(srcobj, uploadfiles)
183 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
186 num_retries=self.arvrunner.num_retries,
187 fnPattern="keep:%s/%s",
189 project=self.arvrunner.project_uuid,
190 collection=collection,
193 for src, ab, st in uploadfiles:
194 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
195 "Directory" if os.path.isdir(ab) else "File", True)
197 for srcobj in referenced_files:
199 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
200 c = arvados.collection.Collection(api_client=self.arvrunner.api,
201 keep_client=self.arvrunner.keep_client,
202 num_retries=self.arvrunner.num_retries)
203 for l in srcobj.get("listing", []):
204 self.addentry(l, c, ".", remap)
206 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
209 c.save_new(name=info["name"],
210 owner_uuid=self.arvrunner.project_uuid,
211 ensure_unique_name=True,
212 trash_at=info["trash_at"],
213 properties=info["properties"])
215 ab = self.collection_pattern % c.portable_data_hash()
216 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
217 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
218 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
220 # If all secondary files/directories are located in
221 # the same collection as the primary file and the
222 # paths and names that are consistent with staging,
223 # don't create a new collection.
224 if not self.needs_new_collection(srcobj):
227 c = arvados.collection.Collection(api_client=self.arvrunner.api,
228 keep_client=self.arvrunner.keep_client,
229 num_retries=self.arvrunner.num_retries )
230 self.addentry(srcobj, c, ".", remap)
232 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
233 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
235 c.save_new(name=info["name"],
236 owner_uuid=self.arvrunner.project_uuid,
237 ensure_unique_name=True,
238 trash_at=info["trash_at"],
239 properties=info["properties"])
241 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
242 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
244 if srcobj.get("secondaryFiles"):
245 ab = self.collection_pattern % c.portable_data_hash()
246 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
249 for loc, sub in remap:
250 # subdirs start with "./", strip it off
251 if sub.startswith("./"):
252 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
254 ab = self.file_pattern % (c.portable_data_hash(), sub)
255 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
256 ab, "Directory", True)
260 def reversemap(self, target):
261 p = super(ArvPathMapper, self).reversemap(target)
264 elif target.startswith("keep:"):
265 return (target, target)
266 elif self.keepdir and target.startswith(self.keepdir):
267 kp = "keep:" + target[len(self.keepdir)+1:]
273 class StagingPathMapper(PathMapper):
274 # Note that StagingPathMapper internally maps files from target to source.
275 # Specifically, the 'self._pathmap' dict keys are the target location and the
276 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
277 # as the file identifier. This makes it possible to map an input file to multiple
278 # target directories. The exception is for file literals, which store the contents of
279 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
283 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
285 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
287 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
288 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
289 loc = obj["location"]
290 stagedir = obj.get("dirname") or stagedir
291 tgt = os.path.join(stagedir, obj["basename"])
292 basetgt, baseext = os.path.splitext(tgt)
295 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
296 def literalTargetExists():
297 return tgt in self.targets and "contents" in obj
300 if targetExists() or literalTargetExists():
301 while tgt in self.targets:
303 tgt = "%s_%i%s" % (basetgt, n, baseext)
304 self.targets.add(tgt)
305 if obj["class"] == "Directory":
306 if obj.get("writable"):
307 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
309 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
310 if loc.startswith("_:") or self._follow_dirs:
311 self.visitlisting(obj.get("listing", []), tgt, basedir)
312 elif obj["class"] == "File":
313 if tgt in self._pathmap:
315 if "contents" in obj and loc.startswith("_:"):
316 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
318 if copy or obj.get("writable"):
319 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
321 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
322 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
324 def mapper(self, src): # type: (Text) -> MapperEnt.
325 # Overridden to maintain the use case of mapping by source (identifier) to
326 # target regardless of how the map is structured interally.
327 def getMapperEnt(src):
328 for k,v in viewitems(self._pathmap):
329 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
334 v = getMapperEnt(src[i:])
335 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
336 return getMapperEnt(src)
339 class VwdPathMapper(StagingPathMapper):
340 def setup(self, referenced_files, basedir):
341 # type: (List[Any], unicode) -> None
343 # Go through each file and set the target to its own directory along
344 # with any secondary files.
345 self.visitlisting(referenced_files, self.stagedir, basedir)
347 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
348 if type in ("File", "Directory") and ab.startswith("keep:"):
349 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
352 class NoFollowPathMapper(StagingPathMapper):
354 def setup(self, referenced_files, basedir):
355 # type: (List[Any], unicode) -> None
356 self.visitlisting(referenced_files, self.stagedir, basedir)