1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
14 import urllib.request, urllib.parse, urllib.error
16 import arvados_cwl.util
17 import arvados.commands.run
18 import arvados.collection
20 from schema_salad.sourceline import SourceLine
22 from arvados.errors import ApiError
23 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
24 from cwltool.workflow import WorkflowException
26 from .http import http_to_keep
28 logger = logging.getLogger('arvados.cwl-runner')
30 def trim_listing(obj):
31 """Remove 'listing' field from Directory objects that are keep references.
33 When Directory objects represent Keep references, it is redundant and
34 potentially very expensive to pass fully enumerated Directory objects
35 between instances of cwl-runner (e.g. a submitting a job, or using the
36 RunInSingleContainer feature), so delete the 'listing' field when it is
41 if obj.get("location", "").startswith("keep:") and "listing" in obj:
45 class ArvPathMapper(PathMapper):
46 """Convert container-local paths to and from Keep collection ids."""
48 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
49 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
51 def __init__(self, arvrunner, referenced_files, input_basedir,
52 collection_pattern, file_pattern, name=None, single_collection=False):
53 self.arvrunner = arvrunner
54 self.input_basedir = input_basedir
55 self.collection_pattern = collection_pattern
56 self.file_pattern = file_pattern
58 self.referenced_files = [r["location"] for r in referenced_files]
59 self.single_collection = single_collection
60 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
62 def visit(self, srcobj, uploadfiles):
63 src = srcobj["location"]
65 src = src[:src.index("#")]
67 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
68 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
70 debug = logger.isEnabledFor(logging.DEBUG)
72 if src not in self._pathmap:
73 if src.startswith("file:"):
74 # Local FS ref, may need to be uploaded or may be on keep
76 ab = abspath(src, self.input_basedir)
77 st = arvados.commands.run.statfile("", ab,
78 fnPattern="keep:%s/%s",
79 dirPattern="keep:%s/%s",
81 with SourceLine(srcobj, "location", WorkflowException, debug):
82 if isinstance(st, arvados.commands.run.UploadFile):
83 uploadfiles.add((src, ab, st))
84 elif isinstance(st, arvados.commands.run.ArvFile):
85 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
87 raise WorkflowException("Input file path '%s' is invalid" % st)
88 elif src.startswith("_:"):
89 if srcobj["class"] == "File" and "contents" not in srcobj:
90 raise WorkflowException("File literal '%s' is missing `contents`" % src)
91 if srcobj["class"] == "Directory" and "listing" not in srcobj:
92 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
93 elif src.startswith("http:") or src.startswith("https:"):
94 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
95 logger.info("%s is %s", src, keepref)
96 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
98 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
100 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
101 for l in srcobj.get("secondaryFiles", []):
102 self.visit(l, uploadfiles)
103 with SourceLine(srcobj, "listing", WorkflowException, debug):
104 for l in srcobj.get("listing", []):
105 self.visit(l, uploadfiles)
107 def addentry(self, obj, c, path, remap):
108 if obj["location"] in self._pathmap:
109 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
112 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
113 remap.append((obj["location"], path + "/" + obj["basename"]))
114 for l in obj.get("secondaryFiles", []):
115 self.addentry(l, c, path, remap)
116 elif obj["class"] == "Directory":
117 for l in obj.get("listing", []):
118 self.addentry(l, c, path + "/" + obj["basename"], remap)
119 remap.append((obj["location"], path + "/" + obj["basename"]))
120 elif obj["location"].startswith("_:") and "contents" in obj:
121 with c.open(path + "/" + obj["basename"], "w") as f:
122 f.write(obj["contents"].encode("utf-8"))
123 remap.append((obj["location"], path + "/" + obj["basename"]))
125 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
127 def needs_new_collection(self, srcobj, prefix=""):
128 """Check if files need to be staged into a new collection.
130 If all the files are in the same collection and in the same
131 paths they would be staged to, return False. Otherwise, a new
132 collection is needed with files copied/created in the
136 loc = srcobj["location"]
137 if loc.startswith("_:"):
140 if loc != prefix+srcobj["basename"]:
148 if srcobj["class"] == "File" and loc not in self._pathmap:
150 for s in srcobj.get("secondaryFiles", []):
151 if self.needs_new_collection(s, prefix):
153 if srcobj.get("listing"):
154 prefix = "%s%s/" % (prefix, srcobj["basename"])
155 for l in srcobj["listing"]:
156 if self.needs_new_collection(l, prefix):
160 def setup(self, referenced_files, basedir):
161 # type: (List[Any], unicode) -> None
165 if self.single_collection:
166 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
167 keep_client=self.arvrunner.keep_client,
168 num_retries=self.arvrunner.num_retries)
170 for srcobj in referenced_files:
171 self.visit(srcobj, uploadfiles)
173 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
176 num_retries=self.arvrunner.num_retries,
177 fnPattern="keep:%s/%s",
179 project=self.arvrunner.project_uuid,
180 collection=collection,
183 for src, ab, st in uploadfiles:
184 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
185 "Directory" if os.path.isdir(ab) else "File", True)
187 for srcobj in referenced_files:
189 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
190 c = arvados.collection.Collection(api_client=self.arvrunner.api,
191 keep_client=self.arvrunner.keep_client,
192 num_retries=self.arvrunner.num_retries)
193 for l in srcobj.get("listing", []):
194 self.addentry(l, c, ".", remap)
196 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
197 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
199 c.save_new(name=info["name"],
200 owner_uuid=self.arvrunner.project_uuid,
201 ensure_unique_name=True,
202 trash_at=info["trash_at"],
203 properties=info["properties"])
205 ab = self.collection_pattern % c.portable_data_hash()
206 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
207 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
208 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
210 # If all secondary files/directories are located in
211 # the same collection as the primary file and the
212 # paths and names that are consistent with staging,
213 # don't create a new collection.
214 if not self.needs_new_collection(srcobj):
217 c = arvados.collection.Collection(api_client=self.arvrunner.api,
218 keep_client=self.arvrunner.keep_client,
219 num_retries=self.arvrunner.num_retries )
220 self.addentry(srcobj, c, ".", remap)
222 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
223 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
225 c.save_new(name=info["name"],
226 owner_uuid=self.arvrunner.project_uuid,
227 ensure_unique_name=True,
228 trash_at=info["trash_at"],
229 properties=info["properties"])
231 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
232 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
234 if srcobj.get("secondaryFiles"):
235 ab = self.collection_pattern % c.portable_data_hash()
236 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
239 for loc, sub in remap:
240 # subdirs start with "./", strip it off
241 if sub.startswith("./"):
242 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
244 ab = self.file_pattern % (c.portable_data_hash(), sub)
245 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
246 ab, "Directory", True)
250 def reversemap(self, target):
251 p = super(ArvPathMapper, self).reversemap(target)
254 elif target.startswith("keep:"):
255 return (target, target)
256 elif self.keepdir and target.startswith(self.keepdir):
257 kp = "keep:" + target[len(self.keepdir)+1:]
263 class StagingPathMapper(PathMapper):
266 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
268 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
270 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
271 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
272 loc = obj["location"]
273 tgt = os.path.join(stagedir, obj["basename"])
274 basetgt, baseext = os.path.splitext(tgt)
276 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
277 while tgt in self.targets:
279 tgt = "%s_%i%s" % (basetgt, n, baseext)
280 self.targets.add(tgt)
281 if obj["class"] == "Directory":
282 if obj.get("writable"):
283 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
285 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
286 if loc.startswith("_:") or self._follow_dirs:
287 self.visitlisting(obj.get("listing", []), tgt, basedir)
288 elif obj["class"] == "File":
289 if loc in self._pathmap:
291 if "contents" in obj and loc.startswith("_:"):
292 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
294 if copy or obj.get("writable"):
295 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
297 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
298 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
301 class VwdPathMapper(StagingPathMapper):
302 def setup(self, referenced_files, basedir):
303 # type: (List[Any], unicode) -> None
305 # Go through each file and set the target to its own directory along
306 # with any secondary files.
307 self.visitlisting(referenced_files, self.stagedir, basedir)
309 for path, (ab, tgt, type, staged) in list(self._pathmap.items()):
310 if type in ("File", "Directory") and ab.startswith("keep:"):
311 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
314 class NoFollowPathMapper(StagingPathMapper):
316 def setup(self, referenced_files, basedir):
317 # type: (List[Any], unicode) -> None
318 self.visitlisting(referenced_files, self.stagedir, basedir)