1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
29 from .http import http_to_keep
31 logger = logging.getLogger('arvados.cwl-runner')
33 def trim_listing(obj):
34 """Remove 'listing' field from Directory objects that are keep references.
36 When Directory objects represent Keep references, it is redundant and
37 potentially very expensive to pass fully enumerated Directory objects
38 between instances of cwl-runner (e.g. a submitting a job, or using the
39 RunInSingleContainer feature), so delete the 'listing' field when it is
44 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
51 class ArvPathMapper(PathMapper):
52 """Convert container-local paths to and from Keep collection ids."""
54 def __init__(self, arvrunner, referenced_files, input_basedir,
55 collection_pattern, file_pattern, name=None, single_collection=False):
56 self.arvrunner = arvrunner
57 self.input_basedir = input_basedir
58 self.collection_pattern = collection_pattern
59 self.file_pattern = file_pattern
61 self.referenced_files = [r["location"] for r in referenced_files]
62 self.single_collection = single_collection
64 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
66 def visit(self, srcobj, uploadfiles):
67 src = srcobj["location"]
69 src = src[:src.index("#")]
71 debug = logger.isEnabledFor(logging.DEBUG)
73 if isinstance(src, basestring) and src.startswith("keep:"):
74 if collection_pdh_pattern.match(src):
75 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
76 if arvados_cwl.util.collectionUUID in srcobj:
77 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
78 elif not collection_uuid_pattern.match(src):
79 with SourceLine(srcobj, "location", WorkflowException, debug):
80 raise WorkflowException("Invalid keep reference '%s'" % src)
82 if src not in self._pathmap:
83 if src.startswith("file:"):
84 # Local FS ref, may need to be uploaded or may be on keep
86 ab = abspath(src, self.input_basedir)
87 st = arvados.commands.run.statfile("", ab,
88 fnPattern="keep:%s/%s",
89 dirPattern="keep:%s/%s",
91 with SourceLine(srcobj, "location", WorkflowException, debug):
92 if isinstance(st, arvados.commands.run.UploadFile):
93 uploadfiles.add((src, ab, st))
94 elif isinstance(st, arvados.commands.run.ArvFile):
95 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
97 raise WorkflowException("Input file path '%s' is invalid" % st)
98 elif src.startswith("_:"):
99 if srcobj["class"] == "File" and "contents" not in srcobj:
100 raise WorkflowException("File literal '%s' is missing `contents`" % src)
101 if srcobj["class"] == "Directory" and "listing" not in srcobj:
102 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
103 elif src.startswith("http:") or src.startswith("https:"):
105 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
106 logger.info("%s is %s", src, keepref)
107 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
108 except Exception as e:
109 logger.warning(str(e))
111 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
113 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
114 for l in srcobj.get("secondaryFiles", []):
115 self.visit(l, uploadfiles)
116 with SourceLine(srcobj, "listing", WorkflowException, debug):
117 for l in srcobj.get("listing", []):
118 self.visit(l, uploadfiles)
120 def addentry(self, obj, c, path, remap):
121 if obj["location"] in self._pathmap:
122 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
125 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
126 remap.append((obj["location"], path + "/" + obj["basename"]))
127 for l in obj.get("secondaryFiles", []):
128 self.addentry(l, c, path, remap)
129 elif obj["class"] == "Directory":
130 for l in obj.get("listing", []):
131 self.addentry(l, c, path + "/" + obj["basename"], remap)
132 remap.append((obj["location"], path + "/" + obj["basename"]))
133 elif obj["location"].startswith("_:") and "contents" in obj:
134 with c.open(path + "/" + obj["basename"], "w") as f:
135 f.write(obj["contents"])
136 remap.append((obj["location"], path + "/" + obj["basename"]))
138 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
140 def needs_new_collection(self, srcobj, prefix=""):
141 """Check if files need to be staged into a new collection.
143 If all the files are in the same collection and in the same
144 paths they would be staged to, return False. Otherwise, a new
145 collection is needed with files copied/created in the
149 loc = srcobj["location"]
150 if loc.startswith("_:"):
153 if loc != prefix+srcobj["basename"]:
161 if srcobj["class"] == "File" and loc not in self._pathmap:
163 for s in srcobj.get("secondaryFiles", []):
164 if self.needs_new_collection(s, prefix):
166 if srcobj.get("listing"):
167 prefix = "%s%s/" % (prefix, srcobj["basename"])
168 for l in srcobj["listing"]:
169 if self.needs_new_collection(l, prefix):
173 def setup(self, referenced_files, basedir):
174 # type: (List[Any], unicode) -> None
178 if self.single_collection:
179 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
180 keep_client=self.arvrunner.keep_client,
181 num_retries=self.arvrunner.num_retries)
183 for srcobj in referenced_files:
184 self.visit(srcobj, uploadfiles)
186 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
189 num_retries=self.arvrunner.num_retries,
190 fnPattern="keep:%s/%s",
192 project=self.arvrunner.project_uuid,
193 collection=collection,
196 for src, ab, st in uploadfiles:
197 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
198 "Directory" if os.path.isdir(ab) else "File", True)
200 for srcobj in referenced_files:
202 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
203 c = arvados.collection.Collection(api_client=self.arvrunner.api,
204 keep_client=self.arvrunner.keep_client,
205 num_retries=self.arvrunner.num_retries)
206 for l in srcobj.get("listing", []):
207 self.addentry(l, c, ".", remap)
209 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
210 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
212 c.save_new(name=info["name"],
213 owner_uuid=self.arvrunner.project_uuid,
214 ensure_unique_name=True,
215 trash_at=info["trash_at"],
216 properties=info["properties"])
218 ab = self.collection_pattern % c.portable_data_hash()
219 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
220 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
221 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
223 # If all secondary files/directories are located in
224 # the same collection as the primary file and the
225 # paths and names that are consistent with staging,
226 # don't create a new collection.
227 if not self.needs_new_collection(srcobj):
230 c = arvados.collection.Collection(api_client=self.arvrunner.api,
231 keep_client=self.arvrunner.keep_client,
232 num_retries=self.arvrunner.num_retries )
233 self.addentry(srcobj, c, ".", remap)
235 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
236 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
238 c.save_new(name=info["name"],
239 owner_uuid=self.arvrunner.project_uuid,
240 ensure_unique_name=True,
241 trash_at=info["trash_at"],
242 properties=info["properties"])
244 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
245 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
247 if srcobj.get("secondaryFiles"):
248 ab = self.collection_pattern % c.portable_data_hash()
249 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
252 for loc, sub in remap:
253 # subdirs start with "./", strip it off
254 if sub.startswith("./"):
255 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
257 ab = self.file_pattern % (c.portable_data_hash(), sub)
258 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
259 ab, "Directory", True)
263 def reversemap(self, target):
264 p = super(ArvPathMapper, self).reversemap(target)
267 elif target.startswith("keep:"):
268 return (target, target)
269 elif self.keepdir and target.startswith(self.keepdir):
270 kp = "keep:" + target[len(self.keepdir)+1:]
276 class StagingPathMapper(PathMapper):
277 # Note that StagingPathMapper internally maps files from target to source.
278 # Specifically, the 'self._pathmap' dict keys are the target location and the
279 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
280 # as the file identifier. This makes it possible to map an input file to multiple
281 # target directories. The exception is for file literals, which store the contents of
282 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
286 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
288 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
290 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
291 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
292 loc = obj["location"]
293 stagedir = obj.get("dirname") or stagedir
294 tgt = os.path.join(stagedir, obj["basename"])
295 basetgt, baseext = os.path.splitext(tgt)
298 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
299 def literalTargetExists():
300 return tgt in self.targets and "contents" in obj
303 if targetExists() or literalTargetExists():
304 while tgt in self.targets:
306 tgt = "%s_%i%s" % (basetgt, n, baseext)
307 self.targets.add(tgt)
308 if obj["class"] == "Directory":
309 if obj.get("writable"):
310 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
312 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
313 if loc.startswith("_:") or self._follow_dirs:
314 self.visitlisting(obj.get("listing", []), tgt, basedir)
315 elif obj["class"] == "File":
316 if tgt in self._pathmap:
318 if "contents" in obj and loc.startswith("_:"):
319 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
321 if copy or obj.get("writable"):
322 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
324 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
325 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
327 def mapper(self, src): # type: (Text) -> MapperEnt.
328 # Overridden to maintain the use case of mapping by source (identifier) to
329 # target regardless of how the map is structured interally.
330 def getMapperEnt(src):
331 for k,v in viewitems(self._pathmap):
332 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
337 v = getMapperEnt(src[i:])
338 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
339 return getMapperEnt(src)
342 class VwdPathMapper(StagingPathMapper):
343 def setup(self, referenced_files, basedir):
344 # type: (List[Any], unicode) -> None
346 # Go through each file and set the target to its own directory along
347 # with any secondary files.
348 self.visitlisting(referenced_files, self.stagedir, basedir)
350 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
351 if type in ("File", "Directory") and ab.startswith("keep:"):
352 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
355 class NoFollowPathMapper(StagingPathMapper):
357 def setup(self, referenced_files, basedir):
358 # type: (List[Any], unicode) -> None
359 self.visitlisting(referenced_files, self.stagedir, basedir)