1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
11 import arvados.commands.run
12 import arvados.collection
14 from schema_salad.sourceline import SourceLine
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
19 from .http import http_to_keep
21 logger = logging.getLogger('arvados.cwl-runner')
23 def trim_listing(obj):
24 """Remove 'listing' field from Directory objects that are keep references.
26 When Directory objects represent Keep references, it is redundant and
27 potentially very expensive to pass fully enumerated Directory objects
28 between instances of cwl-runner (e.g. a submitting a job, or using the
29 RunInSingleContainer feature), so delete the 'listing' field when it is
34 if obj.get("location", "").startswith("keep:") and "listing" in obj:
38 class ArvPathMapper(PathMapper):
39 """Convert container-local paths to and from Keep collection ids."""
41 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
42 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
44 def __init__(self, arvrunner, referenced_files, input_basedir,
45 collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
46 self.arvrunner = arvrunner
47 self.input_basedir = input_basedir
48 self.collection_pattern = collection_pattern
49 self.file_pattern = file_pattern
51 self.referenced_files = [r["location"] for r in referenced_files]
52 self.single_collection = single_collection
53 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
55 def visit(self, srcobj, uploadfiles):
56 src = srcobj["location"]
58 src = src[:src.index("#")]
60 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
61 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
63 debug = logger.isEnabledFor(logging.DEBUG)
65 if src not in self._pathmap:
66 if src.startswith("file:"):
67 # Local FS ref, may need to be uploaded or may be on keep
69 ab = abspath(src, self.input_basedir)
70 st = arvados.commands.run.statfile("", ab,
71 fnPattern="keep:%s/%s",
72 dirPattern="keep:%s/%s",
74 with SourceLine(srcobj, "location", WorkflowException, debug):
75 if isinstance(st, arvados.commands.run.UploadFile):
76 uploadfiles.add((src, ab, st))
77 elif isinstance(st, arvados.commands.run.ArvFile):
78 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
80 raise WorkflowException("Input file path '%s' is invalid" % st)
81 elif src.startswith("_:"):
82 if srcobj["class"] == "File" and "contents" not in srcobj:
83 raise WorkflowException("File literal '%s' is missing `contents`" % src)
84 if srcobj["class"] == "Directory" and "listing" not in srcobj:
85 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
86 elif src.startswith("http:") or src.startswith("https:"):
87 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
88 logger.info("%s is %s", src, keepref)
89 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
91 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
93 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
94 for l in srcobj.get("secondaryFiles", []):
95 self.visit(l, uploadfiles)
96 with SourceLine(srcobj, "listing", WorkflowException, debug):
97 for l in srcobj.get("listing", []):
98 self.visit(l, uploadfiles)
100 def addentry(self, obj, c, path, remap):
101 if obj["location"] in self._pathmap:
102 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
105 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
106 remap.append((obj["location"], path + "/" + obj["basename"]))
107 for l in obj.get("secondaryFiles", []):
108 self.addentry(l, c, path, remap)
109 elif obj["class"] == "Directory":
110 for l in obj.get("listing", []):
111 self.addentry(l, c, path + "/" + obj["basename"], remap)
112 remap.append((obj["location"], path + "/" + obj["basename"]))
113 elif obj["location"].startswith("_:") and "contents" in obj:
114 with c.open(path + "/" + obj["basename"], "w") as f:
115 f.write(obj["contents"].encode("utf-8"))
116 remap.append((obj["location"], path + "/" + obj["basename"]))
118 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
120 def setup(self, referenced_files, basedir):
121 # type: (List[Any], unicode) -> None
125 if self.single_collection:
126 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
127 keep_client=self.arvrunner.keep_client,
128 num_retries=self.arvrunner.num_retries)
130 already_uploaded = self.arvrunner.get_uploaded()
132 for k in referenced_files:
134 if loc in already_uploaded:
135 v = already_uploaded[loc]
136 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
137 if self.single_collection:
138 basename = k["basename"]
139 if basename not in collection:
140 self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
141 copied_files.add((loc, basename, v.type))
143 for srcobj in referenced_files:
144 self.visit(srcobj, uploadfiles)
146 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
149 num_retries=self.arvrunner.num_retries,
150 fnPattern="keep:%s/%s",
152 project=self.arvrunner.project_uuid,
153 collection=collection)
155 for src, ab, st in uploadfiles:
156 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
157 "Directory" if os.path.isdir(ab) else "File", True)
158 self.arvrunner.add_uploaded(src, self._pathmap[src])
160 for loc, basename, cls in copied_files:
161 fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
162 self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
164 for srcobj in referenced_files:
166 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
167 c = arvados.collection.Collection(api_client=self.arvrunner.api,
168 keep_client=self.arvrunner.keep_client,
169 num_retries=self.arvrunner.num_retries)
170 for l in srcobj.get("listing", []):
171 self.addentry(l, c, ".", remap)
173 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
174 if not check["items"]:
175 c.save_new(owner_uuid=self.arvrunner.project_uuid)
177 ab = self.collection_pattern % c.portable_data_hash()
178 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
179 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
180 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
182 c = arvados.collection.Collection(api_client=self.arvrunner.api,
183 keep_client=self.arvrunner.keep_client,
184 num_retries=self.arvrunner.num_retries )
185 self.addentry(srcobj, c, ".", remap)
187 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
188 if not check["items"]:
189 c.save_new(owner_uuid=self.arvrunner.project_uuid)
191 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
192 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
194 if srcobj.get("secondaryFiles"):
195 ab = self.collection_pattern % c.portable_data_hash()
196 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
199 for loc, sub in remap:
200 # subdirs start with "./", strip it off
201 if sub.startswith("./"):
202 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
204 ab = self.file_pattern % (c.portable_data_hash(), sub)
205 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
206 ab, "Directory", True)
210 def reversemap(self, target):
211 p = super(ArvPathMapper, self).reversemap(target)
214 elif target.startswith("keep:"):
215 return (target, target)
216 elif self.keepdir and target.startswith(self.keepdir):
217 kp = "keep:" + target[len(self.keepdir)+1:]
222 class StagingPathMapper(PathMapper):
225 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
227 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
229 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
230 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
231 loc = obj["location"]
232 tgt = os.path.join(stagedir, obj["basename"])
233 basetgt, baseext = os.path.splitext(tgt)
235 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
236 while tgt in self.targets:
238 tgt = "%s_%i%s" % (basetgt, n, baseext)
239 self.targets.add(tgt)
240 if obj["class"] == "Directory":
241 if obj.get("writable"):
242 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
244 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
245 if loc.startswith("_:") or self._follow_dirs:
246 self.visitlisting(obj.get("listing", []), tgt, basedir)
247 elif obj["class"] == "File":
248 if loc in self._pathmap:
250 if "contents" in obj and loc.startswith("_:"):
251 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
253 if copy or obj.get("writable"):
254 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
256 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
257 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
260 class VwdPathMapper(StagingPathMapper):
261 def setup(self, referenced_files, basedir):
262 # type: (List[Any], unicode) -> None
264 # Go through each file and set the target to its own directory along
265 # with any secondary files.
266 self.visitlisting(referenced_files, self.stagedir, basedir)
268 for path, (ab, tgt, type, staged) in self._pathmap.items():
269 if type in ("File", "Directory") and ab.startswith("keep:"):
270 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
273 class NoFollowPathMapper(StagingPathMapper):
275 def setup(self, referenced_files, basedir):
276 # type: (List[Any], unicode) -> None
277 self.visitlisting(referenced_files, self.stagedir, basedir)