1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
11 import arvados.commands.run
12 import arvados.collection
14 from schema_salad.sourceline import SourceLine
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
19 logger = logging.getLogger('arvados.cwl-runner')
21 def trim_listing(obj):
22 """Remove 'listing' field from Directory objects that are keep references.
24 When Directory objects represent Keep references, it is redundant and
25 potentially very expensive to pass fully enumerated Directory objects
26 between instances of cwl-runner (e.g. a submitting a job, or using the
27 RunInSingleContainer feature), so delete the 'listing' field when it is
32 if obj.get("location", "").startswith("keep:") and "listing" in obj:
36 class ArvPathMapper(PathMapper):
37 """Convert container-local paths to and from Keep collection ids."""
39 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
42 def __init__(self, arvrunner, referenced_files, input_basedir,
43 collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
44 self.arvrunner = arvrunner
45 self.input_basedir = input_basedir
46 self.collection_pattern = collection_pattern
47 self.file_pattern = file_pattern
49 self.referenced_files = [r["location"] for r in referenced_files]
50 self.single_collection = single_collection
51 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
53 def visit(self, srcobj, uploadfiles):
54 src = srcobj["location"]
56 src = src[:src.index("#")]
58 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
59 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
61 debug = logger.isEnabledFor(logging.DEBUG)
63 if src not in self._pathmap:
64 if src.startswith("file:"):
65 # Local FS ref, may need to be uploaded or may be on keep
67 ab = abspath(src, self.input_basedir)
68 st = arvados.commands.run.statfile("", ab,
69 fnPattern="keep:%s/%s",
70 dirPattern="keep:%s/%s",
72 with SourceLine(srcobj, "location", WorkflowException, debug):
73 if isinstance(st, arvados.commands.run.UploadFile):
74 uploadfiles.add((src, ab, st))
75 elif isinstance(st, arvados.commands.run.ArvFile):
76 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
78 raise WorkflowException("Input file path '%s' is invalid" % st)
79 elif src.startswith("_:"):
80 if srcobj["class"] == "File" and "contents" not in srcobj:
81 raise WorkflowException("File literal '%s' is missing `contents`" % src)
82 if srcobj["class"] == "Directory" and "listing" not in srcobj:
83 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
85 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
87 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
88 for l in srcobj.get("secondaryFiles", []):
89 self.visit(l, uploadfiles)
90 with SourceLine(srcobj, "listing", WorkflowException, debug):
91 for l in srcobj.get("listing", []):
92 self.visit(l, uploadfiles)
94 def addentry(self, obj, c, path, remap):
95 if obj["location"] in self._pathmap:
96 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
99 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
100 remap.append((obj["location"], path + "/" + obj["basename"]))
101 for l in obj.get("secondaryFiles", []):
102 self.addentry(l, c, path, remap)
103 elif obj["class"] == "Directory":
104 for l in obj.get("listing", []):
105 self.addentry(l, c, path + "/" + obj["basename"], remap)
106 remap.append((obj["location"], path + "/" + obj["basename"]))
107 elif obj["location"].startswith("_:") and "contents" in obj:
108 with c.open(path + "/" + obj["basename"], "w") as f:
109 f.write(obj["contents"].encode("utf-8"))
110 remap.append((obj["location"], path + "/" + obj["basename"]))
112 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
114 def setup(self, referenced_files, basedir):
115 # type: (List[Any], unicode) -> None
119 if self.single_collection:
120 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
121 keep_client=self.arvrunner.keep_client,
122 num_retries=self.arvrunner.num_retries)
124 already_uploaded = self.arvrunner.get_uploaded()
126 for k in referenced_files:
128 if loc in already_uploaded:
129 v = already_uploaded[loc]
130 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
131 if self.single_collection:
132 basename = k["basename"]
133 if basename not in collection:
134 self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
135 copied_files.add((loc, basename, v.type))
137 for srcobj in referenced_files:
138 self.visit(srcobj, uploadfiles)
140 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
143 num_retries=self.arvrunner.num_retries,
144 fnPattern="keep:%s/%s",
146 project=self.arvrunner.project_uuid,
147 collection=collection)
149 for src, ab, st in uploadfiles:
150 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
151 "Directory" if os.path.isdir(ab) else "File", True)
152 self.arvrunner.add_uploaded(src, self._pathmap[src])
154 for loc, basename, cls in copied_files:
155 fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
156 self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
158 for srcobj in referenced_files:
160 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
161 c = arvados.collection.Collection(api_client=self.arvrunner.api,
162 keep_client=self.arvrunner.keep_client,
163 num_retries=self.arvrunner.num_retries)
164 for l in srcobj.get("listing", []):
165 self.addentry(l, c, ".", remap)
167 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
168 if not check["items"]:
169 c.save_new(owner_uuid=self.arvrunner.project_uuid)
171 ab = self.collection_pattern % c.portable_data_hash()
172 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
173 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
174 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
176 c = arvados.collection.Collection(api_client=self.arvrunner.api,
177 keep_client=self.arvrunner.keep_client,
178 num_retries=self.arvrunner.num_retries )
179 self.addentry(srcobj, c, ".", remap)
181 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
182 if not check["items"]:
183 c.save_new(owner_uuid=self.arvrunner.project_uuid)
185 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
186 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
188 if srcobj.get("secondaryFiles"):
189 ab = self.collection_pattern % c.portable_data_hash()
190 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
193 for loc, sub in remap:
194 # subdirs start with "./", strip it off
195 if sub.startswith("./"):
196 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
198 ab = self.file_pattern % (c.portable_data_hash(), sub)
199 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
200 ab, "Directory", True)
204 def reversemap(self, target):
205 p = super(ArvPathMapper, self).reversemap(target)
208 elif target.startswith("keep:"):
209 return (target, target)
210 elif self.keepdir and target.startswith(self.keepdir):
211 kp = "keep:" + target[len(self.keepdir)+1:]
216 class StagingPathMapper(PathMapper):
219 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
221 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
223 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
224 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
225 loc = obj["location"]
226 tgt = os.path.join(stagedir, obj["basename"])
227 basetgt, baseext = os.path.splitext(tgt)
229 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
230 while tgt in self.targets:
232 tgt = "%s_%i%s" % (basetgt, n, baseext)
233 self.targets.add(tgt)
234 if obj["class"] == "Directory":
235 if obj.get("writable"):
236 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
238 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
239 if loc.startswith("_:") or self._follow_dirs:
240 self.visitlisting(obj.get("listing", []), tgt, basedir)
241 elif obj["class"] == "File":
242 if loc in self._pathmap:
244 if "contents" in obj and loc.startswith("_:"):
245 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
247 if copy or obj.get("writable"):
248 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
250 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
251 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
254 class VwdPathMapper(StagingPathMapper):
255 def setup(self, referenced_files, basedir):
256 # type: (List[Any], unicode) -> None
258 # Go through each file and set the target to its own directory along
259 # with any secondary files.
260 self.visitlisting(referenced_files, self.stagedir, basedir)
262 for path, (ab, tgt, type, staged) in self._pathmap.items():
263 if type in ("File", "Directory") and ab.startswith("keep:"):
264 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
267 class NoFollowPathMapper(StagingPathMapper):
269 def setup(self, referenced_files, basedir):
270 # type: (List[Any], unicode) -> None
271 self.visitlisting(referenced_files, self.stagedir, basedir)