1 from future import standard_library
2 standard_library.install_aliases()
3 from builtins import str
4 from past.builtins import basestring
5 # Copyright (C) The Arvados Authors. All rights reserved.
7 # SPDX-License-Identifier: Apache-2.0
13 import urllib.request, urllib.parse, urllib.error
15 import arvados_cwl.util
16 import arvados.commands.run
17 import arvados.collection
19 from schema_salad.sourceline import SourceLine
21 from arvados.errors import ApiError
22 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
23 from cwltool.workflow import WorkflowException
25 from .http import http_to_keep
27 logger = logging.getLogger('arvados.cwl-runner')
29 def trim_listing(obj):
30 """Remove 'listing' field from Directory objects that are keep references.
32 When Directory objects represent Keep references, it is redundant and
33 potentially very expensive to pass fully enumerated Directory objects
34 between instances of cwl-runner (e.g. a submitting a job, or using the
35 RunInSingleContainer feature), so delete the 'listing' field when it is
40 if obj.get("location", "").startswith("keep:") and "listing" in obj:
44 class ArvPathMapper(PathMapper):
45 """Convert container-local paths to and from Keep collection ids."""
47 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
50 def __init__(self, arvrunner, referenced_files, input_basedir,
51 collection_pattern, file_pattern, name=None, single_collection=False):
52 self.arvrunner = arvrunner
53 self.input_basedir = input_basedir
54 self.collection_pattern = collection_pattern
55 self.file_pattern = file_pattern
57 self.referenced_files = [r["location"] for r in referenced_files]
58 self.single_collection = single_collection
59 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
61 def visit(self, srcobj, uploadfiles):
62 src = srcobj["location"]
64 src = src[:src.index("#")]
66 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
67 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
69 debug = logger.isEnabledFor(logging.DEBUG)
71 if src not in self._pathmap:
72 if src.startswith("file:"):
73 # Local FS ref, may need to be uploaded or may be on keep
75 ab = abspath(src, self.input_basedir)
76 st = arvados.commands.run.statfile("", ab,
77 fnPattern="keep:%s/%s",
78 dirPattern="keep:%s/%s",
80 with SourceLine(srcobj, "location", WorkflowException, debug):
81 if isinstance(st, arvados.commands.run.UploadFile):
82 uploadfiles.add((src, ab, st))
83 elif isinstance(st, arvados.commands.run.ArvFile):
84 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
86 raise WorkflowException("Input file path '%s' is invalid" % st)
87 elif src.startswith("_:"):
88 if srcobj["class"] == "File" and "contents" not in srcobj:
89 raise WorkflowException("File literal '%s' is missing `contents`" % src)
90 if srcobj["class"] == "Directory" and "listing" not in srcobj:
91 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
92 elif src.startswith("http:") or src.startswith("https:"):
93 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
94 logger.info("%s is %s", src, keepref)
95 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
97 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
99 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
100 for l in srcobj.get("secondaryFiles", []):
101 self.visit(l, uploadfiles)
102 with SourceLine(srcobj, "listing", WorkflowException, debug):
103 for l in srcobj.get("listing", []):
104 self.visit(l, uploadfiles)
106 def addentry(self, obj, c, path, remap):
107 if obj["location"] in self._pathmap:
108 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
111 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
112 remap.append((obj["location"], path + "/" + obj["basename"]))
113 for l in obj.get("secondaryFiles", []):
114 self.addentry(l, c, path, remap)
115 elif obj["class"] == "Directory":
116 for l in obj.get("listing", []):
117 self.addentry(l, c, path + "/" + obj["basename"], remap)
118 remap.append((obj["location"], path + "/" + obj["basename"]))
119 elif obj["location"].startswith("_:") and "contents" in obj:
120 with c.open(path + "/" + obj["basename"], "w") as f:
121 f.write(obj["contents"].encode("utf-8"))
122 remap.append((obj["location"], path + "/" + obj["basename"]))
124 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
126 def needs_new_collection(self, srcobj, prefix=""):
127 """Check if files need to be staged into a new collection.
129 If all the files are in the same collection and in the same
130 paths they would be staged to, return False. Otherwise, a new
131 collection is needed with files copied/created in the
135 loc = srcobj["location"]
136 if loc.startswith("_:"):
139 if loc != prefix+srcobj["basename"]:
147 if srcobj["class"] == "File" and loc not in self._pathmap:
149 for s in srcobj.get("secondaryFiles", []):
150 if self.needs_new_collection(s, prefix):
152 if srcobj.get("listing"):
153 prefix = "%s%s/" % (prefix, srcobj["basename"])
154 for l in srcobj["listing"]:
155 if self.needs_new_collection(l, prefix):
159 def setup(self, referenced_files, basedir):
160 # type: (List[Any], unicode) -> None
164 if self.single_collection:
165 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
166 keep_client=self.arvrunner.keep_client,
167 num_retries=self.arvrunner.num_retries)
169 for srcobj in referenced_files:
170 self.visit(srcobj, uploadfiles)
172 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
175 num_retries=self.arvrunner.num_retries,
176 fnPattern="keep:%s/%s",
178 project=self.arvrunner.project_uuid,
179 collection=collection,
182 for src, ab, st in uploadfiles:
183 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
184 "Directory" if os.path.isdir(ab) else "File", True)
186 for srcobj in referenced_files:
188 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
189 c = arvados.collection.Collection(api_client=self.arvrunner.api,
190 keep_client=self.arvrunner.keep_client,
191 num_retries=self.arvrunner.num_retries)
192 for l in srcobj.get("listing", []):
193 self.addentry(l, c, ".", remap)
195 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
196 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
198 c.save_new(name=info["name"],
199 owner_uuid=self.arvrunner.project_uuid,
200 ensure_unique_name=True,
201 trash_at=info["trash_at"],
202 properties=info["properties"])
204 ab = self.collection_pattern % c.portable_data_hash()
205 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
206 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
207 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
209 # If all secondary files/directories are located in
210 # the same collection as the primary file and the
211 # paths and names that are consistent with staging,
212 # don't create a new collection.
213 if not self.needs_new_collection(srcobj):
216 c = arvados.collection.Collection(api_client=self.arvrunner.api,
217 keep_client=self.arvrunner.keep_client,
218 num_retries=self.arvrunner.num_retries )
219 self.addentry(srcobj, c, ".", remap)
221 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
222 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
224 c.save_new(name=info["name"],
225 owner_uuid=self.arvrunner.project_uuid,
226 ensure_unique_name=True,
227 trash_at=info["trash_at"],
228 properties=info["properties"])
230 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
231 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
233 if srcobj.get("secondaryFiles"):
234 ab = self.collection_pattern % c.portable_data_hash()
235 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
238 for loc, sub in remap:
239 # subdirs start with "./", strip it off
240 if sub.startswith("./"):
241 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
243 ab = self.file_pattern % (c.portable_data_hash(), sub)
244 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
245 ab, "Directory", True)
249 def reversemap(self, target):
250 p = super(ArvPathMapper, self).reversemap(target)
253 elif target.startswith("keep:"):
254 return (target, target)
255 elif self.keepdir and target.startswith(self.keepdir):
256 kp = "keep:" + target[len(self.keepdir)+1:]
262 class StagingPathMapper(PathMapper):
265 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
267 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
269 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
270 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
271 loc = obj["location"]
272 tgt = os.path.join(stagedir, obj["basename"])
273 basetgt, baseext = os.path.splitext(tgt)
275 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
276 while tgt in self.targets:
278 tgt = "%s_%i%s" % (basetgt, n, baseext)
279 self.targets.add(tgt)
280 if obj["class"] == "Directory":
281 if obj.get("writable"):
282 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
284 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
285 if loc.startswith("_:") or self._follow_dirs:
286 self.visitlisting(obj.get("listing", []), tgt, basedir)
287 elif obj["class"] == "File":
288 if loc in self._pathmap:
290 if "contents" in obj and loc.startswith("_:"):
291 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
293 if copy or obj.get("writable"):
294 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
296 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
297 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
300 class VwdPathMapper(StagingPathMapper):
301 def setup(self, referenced_files, basedir):
302 # type: (List[Any], unicode) -> None
304 # Go through each file and set the target to its own directory along
305 # with any secondary files.
306 self.visitlisting(referenced_files, self.stagedir, basedir)
308 for path, (ab, tgt, type, staged) in list(self._pathmap.items()):
309 if type in ("File", "Directory") and ab.startswith("keep:"):
310 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
313 class NoFollowPathMapper(StagingPathMapper):
315 def setup(self, referenced_files, basedir):
316 # type: (List[Any], unicode) -> None
317 self.visitlisting(referenced_files, self.stagedir, basedir)