1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
25 from cwltool.workflow import WorkflowException
27 from .http import http_to_keep
29 logger = logging.getLogger('arvados.cwl-runner')
31 def trim_listing(obj):
32 """Remove 'listing' field from Directory objects that are keep references.
34 When Directory objects represent Keep references, it is redundant and
35 potentially very expensive to pass fully enumerated Directory objects
36 between instances of cwl-runner (e.g. a submitting a job, or using the
37 RunInSingleContainer feature), so delete the 'listing' field when it is
42 if obj.get("location", "").startswith("keep:") and "listing" in obj:
45 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
46 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
47 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
49 class ArvPathMapper(PathMapper):
50 """Convert container-local paths to and from Keep collection ids."""
52 def __init__(self, arvrunner, referenced_files, input_basedir,
53 collection_pattern, file_pattern, name=None, single_collection=False):
54 self.arvrunner = arvrunner
55 self.input_basedir = input_basedir
56 self.collection_pattern = collection_pattern
57 self.file_pattern = file_pattern
59 self.referenced_files = [r["location"] for r in referenced_files]
60 self.single_collection = single_collection
62 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
64 def visit(self, srcobj, uploadfiles):
65 src = srcobj["location"]
67 src = src[:src.index("#")]
69 debug = logger.isEnabledFor(logging.DEBUG)
71 if isinstance(src, basestring) and src.startswith("keep:"):
72 if collection_pdh_pattern.match(src):
73 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
74 if arvados_cwl.util.collectionUUID in srcobj:
75 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
76 elif not collection_uuid_pattern.match(src):
77 with SourceLine(srcobj, "location", WorkflowException, debug):
78 raise WorkflowException("Invalid keep reference '%s'" % src)
80 if src not in self._pathmap:
81 if src.startswith("file:"):
82 # Local FS ref, may need to be uploaded or may be on keep
84 ab = abspath(src, self.input_basedir)
85 st = arvados.commands.run.statfile("", ab,
86 fnPattern="keep:%s/%s",
87 dirPattern="keep:%s/%s",
89 with SourceLine(srcobj, "location", WorkflowException, debug):
90 if isinstance(st, arvados.commands.run.UploadFile):
91 uploadfiles.add((src, ab, st))
92 elif isinstance(st, arvados.commands.run.ArvFile):
93 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
95 raise WorkflowException("Input file path '%s' is invalid" % st)
96 elif src.startswith("_:"):
97 if srcobj["class"] == "File" and "contents" not in srcobj:
98 raise WorkflowException("File literal '%s' is missing `contents`" % src)
99 if srcobj["class"] == "Directory" and "listing" not in srcobj:
100 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
101 elif src.startswith("http:") or src.startswith("https:"):
102 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
103 logger.info("%s is %s", src, keepref)
104 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
106 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
108 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
109 for l in srcobj.get("secondaryFiles", []):
110 self.visit(l, uploadfiles)
111 with SourceLine(srcobj, "listing", WorkflowException, debug):
112 for l in srcobj.get("listing", []):
113 self.visit(l, uploadfiles)
115 def addentry(self, obj, c, path, remap):
116 if obj["location"] in self._pathmap:
117 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
120 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
121 remap.append((obj["location"], path + "/" + obj["basename"]))
122 for l in obj.get("secondaryFiles", []):
123 self.addentry(l, c, path, remap)
124 elif obj["class"] == "Directory":
125 for l in obj.get("listing", []):
126 self.addentry(l, c, path + "/" + obj["basename"], remap)
127 remap.append((obj["location"], path + "/" + obj["basename"]))
128 elif obj["location"].startswith("_:") and "contents" in obj:
129 with c.open(path + "/" + obj["basename"], "w") as f:
130 f.write(obj["contents"])
131 remap.append((obj["location"], path + "/" + obj["basename"]))
133 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
135 def needs_new_collection(self, srcobj, prefix=""):
136 """Check if files need to be staged into a new collection.
138 If all the files are in the same collection and in the same
139 paths they would be staged to, return False. Otherwise, a new
140 collection is needed with files copied/created in the
144 loc = srcobj["location"]
145 if loc.startswith("_:"):
148 if loc != prefix+srcobj["basename"]:
156 if srcobj["class"] == "File" and loc not in self._pathmap:
158 for s in srcobj.get("secondaryFiles", []):
159 if self.needs_new_collection(s, prefix):
161 if srcobj.get("listing"):
162 prefix = "%s%s/" % (prefix, srcobj["basename"])
163 for l in srcobj["listing"]:
164 if self.needs_new_collection(l, prefix):
168 def setup(self, referenced_files, basedir):
169 # type: (List[Any], unicode) -> None
173 if self.single_collection:
174 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
175 keep_client=self.arvrunner.keep_client,
176 num_retries=self.arvrunner.num_retries)
178 for srcobj in referenced_files:
179 self.visit(srcobj, uploadfiles)
181 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
184 num_retries=self.arvrunner.num_retries,
185 fnPattern="keep:%s/%s",
187 project=self.arvrunner.project_uuid,
188 collection=collection,
191 for src, ab, st in uploadfiles:
192 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
193 "Directory" if os.path.isdir(ab) else "File", True)
195 for srcobj in referenced_files:
197 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
198 c = arvados.collection.Collection(api_client=self.arvrunner.api,
199 keep_client=self.arvrunner.keep_client,
200 num_retries=self.arvrunner.num_retries)
201 for l in srcobj.get("listing", []):
202 self.addentry(l, c, ".", remap)
204 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
205 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
207 c.save_new(name=info["name"],
208 owner_uuid=self.arvrunner.project_uuid,
209 ensure_unique_name=True,
210 trash_at=info["trash_at"],
211 properties=info["properties"])
213 ab = self.collection_pattern % c.portable_data_hash()
214 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
215 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
216 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
218 # If all secondary files/directories are located in
219 # the same collection as the primary file and the
220 # paths and names that are consistent with staging,
221 # don't create a new collection.
222 if not self.needs_new_collection(srcobj):
225 c = arvados.collection.Collection(api_client=self.arvrunner.api,
226 keep_client=self.arvrunner.keep_client,
227 num_retries=self.arvrunner.num_retries )
228 self.addentry(srcobj, c, ".", remap)
230 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
231 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
233 c.save_new(name=info["name"],
234 owner_uuid=self.arvrunner.project_uuid,
235 ensure_unique_name=True,
236 trash_at=info["trash_at"],
237 properties=info["properties"])
239 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
240 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
242 if srcobj.get("secondaryFiles"):
243 ab = self.collection_pattern % c.portable_data_hash()
244 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
247 for loc, sub in remap:
248 # subdirs start with "./", strip it off
249 if sub.startswith("./"):
250 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
252 ab = self.file_pattern % (c.portable_data_hash(), sub)
253 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
254 ab, "Directory", True)
258 def reversemap(self, target):
259 p = super(ArvPathMapper, self).reversemap(target)
262 elif target.startswith("keep:"):
263 return (target, target)
264 elif self.keepdir and target.startswith(self.keepdir):
265 kp = "keep:" + target[len(self.keepdir)+1:]
271 class StagingPathMapper(PathMapper):
272 # Note that StagingPathMapper internally maps files from target to source.
273 # Specifically, the 'self._pathmap' dict keys are the target location and the
274 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
275 # as the file identifier. This makes it possible to map an input file to multiple
276 # target directories. The exception is for file literals, which store the contents of
277 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
281 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
283 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
285 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
286 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
287 loc = obj["location"]
288 stagedir = obj.get("dirname") or stagedir
289 tgt = os.path.join(stagedir, obj["basename"])
290 basetgt, baseext = os.path.splitext(tgt)
293 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
294 def literalTargetExists():
295 return tgt in self.targets and "contents" in obj
298 if targetExists() or literalTargetExists():
299 while tgt in self.targets:
301 tgt = "%s_%i%s" % (basetgt, n, baseext)
302 self.targets.add(tgt)
303 if obj["class"] == "Directory":
304 if obj.get("writable"):
305 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
307 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
308 if loc.startswith("_:") or self._follow_dirs:
309 self.visitlisting(obj.get("listing", []), tgt, basedir)
310 elif obj["class"] == "File":
311 if tgt in self._pathmap:
313 if "contents" in obj and loc.startswith("_:"):
314 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
316 if copy or obj.get("writable"):
317 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
319 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
320 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
322 def mapper(self, src): # type: (Text) -> MapperEnt.
323 # Overridden to maintain the use case of mapping by source (identifier) to
324 # target regardless of how the map is structured interally.
325 def getMapperEnt(src):
326 for k,v in viewitems(self._pathmap):
327 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
332 v = getMapperEnt(src[i:])
333 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
334 return getMapperEnt(src)
337 class VwdPathMapper(StagingPathMapper):
338 def setup(self, referenced_files, basedir):
339 # type: (List[Any], unicode) -> None
341 # Go through each file and set the target to its own directory along
342 # with any secondary files.
343 self.visitlisting(referenced_files, self.stagedir, basedir)
345 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
346 if type in ("File", "Directory") and ab.startswith("keep:"):
347 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
350 class NoFollowPathMapper(StagingPathMapper):
352 def setup(self, referenced_files, basedir):
353 # type: (List[Any], unicode) -> None
354 self.visitlisting(referenced_files, self.stagedir, basedir)