1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
25 from cwltool.workflow import WorkflowException
27 from .http import http_to_keep
29 logger = logging.getLogger('arvados.cwl-runner')
31 def trim_listing(obj):
32 """Remove 'listing' field from Directory objects that are keep references.
34 When Directory objects represent Keep references, it is redundant and
35 potentially very expensive to pass fully enumerated Directory objects
36 between instances of cwl-runner (e.g. a submitting a job, or using the
37 RunInSingleContainer feature), so delete the 'listing' field when it is
42 if obj.get("location", "").startswith("keep:") and "listing" in obj:
46 class ArvPathMapper(PathMapper):
47 """Convert container-local paths to and from Keep collection ids."""
49 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
50 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
52 def __init__(self, arvrunner, referenced_files, input_basedir,
53 collection_pattern, file_pattern, name=None, single_collection=False):
54 self.arvrunner = arvrunner
55 self.input_basedir = input_basedir
56 self.collection_pattern = collection_pattern
57 self.file_pattern = file_pattern
59 self.referenced_files = [r["location"] for r in referenced_files]
60 self.single_collection = single_collection
61 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
63 def visit(self, srcobj, uploadfiles):
64 src = srcobj["location"]
66 src = src[:src.index("#")]
68 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
69 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
71 debug = logger.isEnabledFor(logging.DEBUG)
73 if src not in self._pathmap:
74 if src.startswith("file:"):
75 # Local FS ref, may need to be uploaded or may be on keep
77 ab = abspath(src, self.input_basedir)
78 st = arvados.commands.run.statfile("", ab,
79 fnPattern="keep:%s/%s",
80 dirPattern="keep:%s/%s",
82 with SourceLine(srcobj, "location", WorkflowException, debug):
83 if isinstance(st, arvados.commands.run.UploadFile):
84 uploadfiles.add((src, ab, st))
85 elif isinstance(st, arvados.commands.run.ArvFile):
86 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
88 raise WorkflowException("Input file path '%s' is invalid" % st)
89 elif src.startswith("_:"):
90 if srcobj["class"] == "File" and "contents" not in srcobj:
91 raise WorkflowException("File literal '%s' is missing `contents`" % src)
92 if srcobj["class"] == "Directory" and "listing" not in srcobj:
93 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
94 elif src.startswith("http:") or src.startswith("https:"):
95 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
96 logger.info("%s is %s", src, keepref)
97 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
99 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
101 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
102 for l in srcobj.get("secondaryFiles", []):
103 self.visit(l, uploadfiles)
104 with SourceLine(srcobj, "listing", WorkflowException, debug):
105 for l in srcobj.get("listing", []):
106 self.visit(l, uploadfiles)
108 def addentry(self, obj, c, path, remap):
109 if obj["location"] in self._pathmap:
110 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
113 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
114 remap.append((obj["location"], path + "/" + obj["basename"]))
115 for l in obj.get("secondaryFiles", []):
116 self.addentry(l, c, path, remap)
117 elif obj["class"] == "Directory":
118 for l in obj.get("listing", []):
119 self.addentry(l, c, path + "/" + obj["basename"], remap)
120 remap.append((obj["location"], path + "/" + obj["basename"]))
121 elif obj["location"].startswith("_:") and "contents" in obj:
122 with c.open(path + "/" + obj["basename"], "w") as f:
123 f.write(obj["contents"])
124 remap.append((obj["location"], path + "/" + obj["basename"]))
126 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
128 def needs_new_collection(self, srcobj, prefix=""):
129 """Check if files need to be staged into a new collection.
131 If all the files are in the same collection and in the same
132 paths they would be staged to, return False. Otherwise, a new
133 collection is needed with files copied/created in the
137 loc = srcobj["location"]
138 if loc.startswith("_:"):
141 if loc != prefix+srcobj["basename"]:
149 if srcobj["class"] == "File" and loc not in self._pathmap:
151 for s in srcobj.get("secondaryFiles", []):
152 if self.needs_new_collection(s, prefix):
154 if srcobj.get("listing"):
155 prefix = "%s%s/" % (prefix, srcobj["basename"])
156 for l in srcobj["listing"]:
157 if self.needs_new_collection(l, prefix):
161 def setup(self, referenced_files, basedir):
162 # type: (List[Any], unicode) -> None
166 if self.single_collection:
167 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
168 keep_client=self.arvrunner.keep_client,
169 num_retries=self.arvrunner.num_retries)
171 for srcobj in referenced_files:
172 self.visit(srcobj, uploadfiles)
174 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
177 num_retries=self.arvrunner.num_retries,
178 fnPattern="keep:%s/%s",
180 project=self.arvrunner.project_uuid,
181 collection=collection,
184 for src, ab, st in uploadfiles:
185 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
186 "Directory" if os.path.isdir(ab) else "File", True)
188 for srcobj in referenced_files:
190 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
191 c = arvados.collection.Collection(api_client=self.arvrunner.api,
192 keep_client=self.arvrunner.keep_client,
193 num_retries=self.arvrunner.num_retries)
194 for l in srcobj.get("listing", []):
195 self.addentry(l, c, ".", remap)
197 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
198 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
200 c.save_new(name=info["name"],
201 owner_uuid=self.arvrunner.project_uuid,
202 ensure_unique_name=True,
203 trash_at=info["trash_at"],
204 properties=info["properties"])
206 ab = self.collection_pattern % c.portable_data_hash()
207 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
208 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
209 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
211 # If all secondary files/directories are located in
212 # the same collection as the primary file and the
213 # paths and names that are consistent with staging,
214 # don't create a new collection.
215 if not self.needs_new_collection(srcobj):
218 c = arvados.collection.Collection(api_client=self.arvrunner.api,
219 keep_client=self.arvrunner.keep_client,
220 num_retries=self.arvrunner.num_retries )
221 self.addentry(srcobj, c, ".", remap)
223 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
224 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
226 c.save_new(name=info["name"],
227 owner_uuid=self.arvrunner.project_uuid,
228 ensure_unique_name=True,
229 trash_at=info["trash_at"],
230 properties=info["properties"])
232 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
233 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
235 if srcobj.get("secondaryFiles"):
236 ab = self.collection_pattern % c.portable_data_hash()
237 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
240 for loc, sub in remap:
241 # subdirs start with "./", strip it off
242 if sub.startswith("./"):
243 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
245 ab = self.file_pattern % (c.portable_data_hash(), sub)
246 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
247 ab, "Directory", True)
251 def reversemap(self, target):
252 p = super(ArvPathMapper, self).reversemap(target)
255 elif target.startswith("keep:"):
256 return (target, target)
257 elif self.keepdir and target.startswith(self.keepdir):
258 kp = "keep:" + target[len(self.keepdir)+1:]
264 class StagingPathMapper(PathMapper):
267 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
269 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
271 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
272 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
273 loc = obj["location"]
274 tgt = os.path.join(stagedir, obj["basename"])
275 basetgt, baseext = os.path.splitext(tgt)
277 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
278 while tgt in self.targets:
280 tgt = "%s_%i%s" % (basetgt, n, baseext)
281 self.targets.add(tgt)
282 if obj["class"] == "Directory":
283 if obj.get("writable"):
284 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
286 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
287 if loc.startswith("_:") or self._follow_dirs:
288 self.visitlisting(obj.get("listing", []), tgt, basedir)
289 elif obj["class"] == "File":
290 if loc in self._pathmap:
292 if "contents" in obj and loc.startswith("_:"):
293 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
295 if copy or obj.get("writable"):
296 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
298 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
299 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
302 class VwdPathMapper(StagingPathMapper):
303 def setup(self, referenced_files, basedir):
304 # type: (List[Any], unicode) -> None
306 # Go through each file and set the target to its own directory along
307 # with any secondary files.
308 self.visitlisting(referenced_files, self.stagedir, basedir)
310 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
311 if type in ("File", "Directory") and ab.startswith("keep:"):
312 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
315 class NoFollowPathMapper(StagingPathMapper):
317 def setup(self, referenced_files, basedir):
318 # type: (List[Any], unicode) -> None
319 self.visitlisting(referenced_files, self.stagedir, basedir)