1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
29 from .http import http_to_keep
31 logger = logging.getLogger('arvados.cwl-runner')
33 def trim_listing(obj):
34 """Remove 'listing' field from Directory objects that are keep references.
36 When Directory objects represent Keep references, it is redundant and
37 potentially very expensive to pass fully enumerated Directory objects
38 between instances of cwl-runner (e.g. a submitting a job, or using the
39 RunInSingleContainer feature), so delete the 'listing' field when it is
44 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
51 class ArvPathMapper(PathMapper):
52 """Convert container-local paths to and from Keep collection ids."""
54 def __init__(self, arvrunner, referenced_files, input_basedir,
55 collection_pattern, file_pattern, name=None, single_collection=False):
56 self.arvrunner = arvrunner
57 self.input_basedir = input_basedir
58 self.collection_pattern = collection_pattern
59 self.file_pattern = file_pattern
61 self.referenced_files = [r["location"] for r in referenced_files]
62 self.single_collection = single_collection
64 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
66 def visit(self, srcobj, uploadfiles):
67 src = srcobj["location"]
69 src = src[:src.index("#")]
71 debug = logger.isEnabledFor(logging.DEBUG)
73 if isinstance(src, basestring) and src.startswith("keep:"):
74 if collection_pdh_pattern.match(src):
75 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
76 if arvados_cwl.util.collectionUUID in srcobj:
77 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
78 elif not collection_uuid_pattern.match(src):
79 with SourceLine(srcobj, "location", WorkflowException, debug):
80 raise WorkflowException("Invalid keep reference '%s'" % src)
82 if src not in self._pathmap:
83 if src.startswith("file:"):
84 # Local FS ref, may need to be uploaded or may be on keep
86 ab = abspath(src, self.input_basedir)
87 st = arvados.commands.run.statfile("", ab,
88 fnPattern="keep:%s/%s",
89 dirPattern="keep:%s/%s",
91 with SourceLine(srcobj, "location", WorkflowException, debug):
92 if isinstance(st, arvados.commands.run.UploadFile):
93 uploadfiles.add((src, ab, st))
94 elif isinstance(st, arvados.commands.run.ArvFile):
95 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
97 raise WorkflowException("Input file path '%s' is invalid" % st)
98 elif src.startswith("_:"):
99 if srcobj["class"] == "File" and "contents" not in srcobj:
100 raise WorkflowException("File literal '%s' is missing `contents`" % src)
101 if srcobj["class"] == "Directory" and "listing" not in srcobj:
102 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
103 elif src.startswith("http:") or src.startswith("https:"):
105 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
106 logger.info("%s is %s", src, keepref)
107 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
108 except Exception as e:
109 logger.warning(str(e))
111 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
113 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
114 for l in srcobj.get("secondaryFiles", []):
115 self.visit(l, uploadfiles)
116 with SourceLine(srcobj, "listing", WorkflowException, debug):
117 for l in srcobj.get("listing", []):
118 self.visit(l, uploadfiles)
120 def addentry(self, obj, c, path, remap):
121 if obj["location"] in self._pathmap:
122 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
125 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
126 remap.append((obj["location"], path + "/" + obj["basename"]))
127 for l in obj.get("secondaryFiles", []):
128 self.addentry(l, c, path, remap)
129 elif obj["class"] == "Directory":
130 for l in obj.get("listing", []):
131 self.addentry(l, c, path + "/" + obj["basename"], remap)
132 remap.append((obj["location"], path + "/" + obj["basename"]))
133 elif obj["location"].startswith("_:") and "contents" in obj:
134 with c.open(path + "/" + obj["basename"], "w") as f:
135 f.write(obj["contents"])
136 remap.append((obj["location"], path + "/" + obj["basename"]))
138 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
140 def needs_new_collection(self, srcobj, prefix=""):
141 """Check if files need to be staged into a new collection.
143 If all the files are in the same collection and in the same
144 paths they would be staged to, return False. Otherwise, a new
145 collection is needed with files copied/created in the
149 loc = srcobj["location"]
150 if loc.startswith("_:"):
159 if loc != prefix+srcobj["basename"]:
162 if srcobj["class"] == "File" and loc not in self._pathmap:
164 for s in srcobj.get("secondaryFiles", []):
165 if self.needs_new_collection(s, prefix):
167 if srcobj.get("listing"):
168 prefix = "%s%s/" % (prefix, srcobj["basename"])
169 for l in srcobj["listing"]:
170 if self.needs_new_collection(l, prefix):
174 def setup(self, referenced_files, basedir):
175 # type: (List[Any], unicode) -> None
179 if self.single_collection:
180 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
181 keep_client=self.arvrunner.keep_client,
182 num_retries=self.arvrunner.num_retries)
184 for srcobj in referenced_files:
185 self.visit(srcobj, uploadfiles)
187 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
190 num_retries=self.arvrunner.num_retries,
191 fnPattern="keep:%s/%s",
193 project=self.arvrunner.project_uuid,
194 collection=collection,
197 for src, ab, st in uploadfiles:
198 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
199 "Directory" if os.path.isdir(ab) else "File", True)
201 for srcobj in referenced_files:
202 print("na na na", srcobj, srcobj["location"].endswith("/"+srcobj["basename"]))
204 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
205 c = arvados.collection.Collection(api_client=self.arvrunner.api,
206 keep_client=self.arvrunner.keep_client,
207 num_retries=self.arvrunner.num_retries)
208 for l in srcobj.get("listing", []):
209 self.addentry(l, c, ".", remap)
211 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
212 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
214 c.save_new(name=info["name"],
215 owner_uuid=self.arvrunner.project_uuid,
216 ensure_unique_name=True,
217 trash_at=info["trash_at"],
218 properties=info["properties"])
220 ab = self.collection_pattern % c.portable_data_hash()
221 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
222 elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
223 c = arvados.collection.Collection(api_client=self.arvrunner.api,
224 keep_client=self.arvrunner.keep_client,
225 num_retries=self.arvrunner.num_retries )
226 self.addentry(srcobj, c, ".", remap)
228 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
229 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
231 c.save_new(name=info["name"],
232 owner_uuid=self.arvrunner.project_uuid,
233 ensure_unique_name=True,
234 trash_at=info["trash_at"],
235 properties=info["properties"])
237 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
238 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
240 if srcobj.get("secondaryFiles"):
241 ab = self.collection_pattern % c.portable_data_hash()
242 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
245 for loc, sub in remap:
246 # subdirs start with "./", strip it off
247 if sub.startswith("./"):
248 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
250 ab = self.file_pattern % (c.portable_data_hash(), sub)
251 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
252 ab, "Directory", True)
256 def reversemap(self, target):
257 p = super(ArvPathMapper, self).reversemap(target)
260 elif target.startswith("keep:"):
261 return (target, target)
262 elif self.keepdir and target.startswith(self.keepdir):
263 kp = "keep:" + target[len(self.keepdir)+1:]
269 class StagingPathMapper(PathMapper):
270 # Note that StagingPathMapper internally maps files from target to source.
271 # Specifically, the 'self._pathmap' dict keys are the target location and the
272 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
273 # as the file identifier. This makes it possible to map an input file to multiple
274 # target directories. The exception is for file literals, which store the contents of
275 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
279 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
281 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
283 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
284 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
285 loc = obj["location"]
286 stagedir = obj.get("dirname") or stagedir
287 tgt = os.path.join(stagedir, obj["basename"])
288 basetgt, baseext = os.path.splitext(tgt)
291 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
292 def literalTargetExists():
293 return tgt in self.targets and "contents" in obj
296 if targetExists() or literalTargetExists():
297 while tgt in self.targets:
299 tgt = "%s_%i%s" % (basetgt, n, baseext)
300 self.targets.add(tgt)
301 if obj["class"] == "Directory":
302 if obj.get("writable"):
303 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
305 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
306 if loc.startswith("_:") or self._follow_dirs:
307 self.visitlisting(obj.get("listing", []), tgt, basedir)
308 elif obj["class"] == "File":
309 if tgt in self._pathmap:
311 if "contents" in obj and loc.startswith("_:"):
312 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
314 if copy or obj.get("writable"):
315 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
317 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
318 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
320 def mapper(self, src): # type: (Text) -> MapperEnt.
321 # Overridden to maintain the use case of mapping by source (identifier) to
322 # target regardless of how the map is structured interally.
323 def getMapperEnt(src):
324 for k,v in viewitems(self._pathmap):
325 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
330 v = getMapperEnt(src[i:])
331 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
332 return getMapperEnt(src)
335 class VwdPathMapper(StagingPathMapper):
336 def setup(self, referenced_files, basedir):
337 # type: (List[Any], unicode) -> None
339 # Go through each file and set the target to its own directory along
340 # with any secondary files.
341 self.visitlisting(referenced_files, self.stagedir, basedir)
343 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
344 if type in ("File", "Directory") and ab.startswith("keep:"):
345 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
348 class NoFollowPathMapper(StagingPathMapper):
350 def setup(self, referenced_files, basedir):
351 # type: (List[Any], unicode) -> None
352 self.visitlisting(referenced_files, self.stagedir, basedir)