1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
11 import arvados.commands.run
12 import arvados.collection
14 from schema_salad.sourceline import SourceLine
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
19 logger = logging.getLogger('arvados.cwl-runner')
21 def trim_listing(obj):
22 """Remove 'listing' field from Directory objects that are keep references.
24 When Directory objects represent Keep references, it is redundant and
25 potentially very expensive to pass fully enumerated Directory objects
26 between instances of cwl-runner (e.g. a submitting a job, or using the
27 RunInSingleContainer feature), so delete the 'listing' field when it is
32 if obj.get("location", "").startswith("keep:") and "listing" in obj:
36 class ArvPathMapper(PathMapper):
37 """Convert container-local paths to and from Keep collection ids."""
39 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
42 def __init__(self, arvrunner, referenced_files, input_basedir,
43 collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
44 self.arvrunner = arvrunner
45 self.input_basedir = input_basedir
46 self.collection_pattern = collection_pattern
47 self.file_pattern = file_pattern
49 self.referenced_files = [r["location"] for r in referenced_files]
50 self.single_collection = single_collection
51 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
53 def visit(self, srcobj, uploadfiles):
54 src = srcobj["location"]
56 src = src[:src.index("#")]
58 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
59 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
61 debug = logger.isEnabledFor(logging.DEBUG)
63 if src not in self._pathmap:
64 if src.startswith("file:"):
65 # Local FS ref, may need to be uploaded or may be on keep
67 ab = abspath(src, self.input_basedir)
68 st = arvados.commands.run.statfile("", ab,
69 fnPattern="keep:%s/%s",
70 dirPattern="keep:%s/%s",
72 with SourceLine(srcobj, "location", WorkflowException, debug):
73 if isinstance(st, arvados.commands.run.UploadFile):
74 uploadfiles.add((src, ab, st))
75 elif isinstance(st, arvados.commands.run.ArvFile):
76 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
78 raise WorkflowException("Input file path '%s' is invalid" % st)
79 elif src.startswith("_:"):
80 if srcobj["class"] == "File" and "contents" not in srcobj:
81 raise WorkflowException("File literal '%s' is missing `contents`" % src)
82 if srcobj["class"] == "Directory" and "listing" not in srcobj:
83 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
85 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
87 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
88 for l in srcobj.get("secondaryFiles", []):
89 self.visit(l, uploadfiles)
90 with SourceLine(srcobj, "listing", WorkflowException, debug):
91 for l in srcobj.get("listing", []):
92 self.visit(l, uploadfiles)
94 def addentry(self, obj, c, path, subdirs):
95 if obj["location"] in self._pathmap:
96 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
99 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
100 for l in obj.get("secondaryFiles", []):
101 self.addentry(l, c, path, subdirs)
102 elif obj["class"] == "Directory":
103 for l in obj.get("listing", []):
104 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
105 subdirs.append((obj["location"], path + "/" + obj["basename"]))
106 elif obj["location"].startswith("_:") and "contents" in obj:
107 with c.open(path + "/" + obj["basename"], "w") as f:
108 f.write(obj["contents"].encode("utf-8"))
110 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
112 def setup(self, referenced_files, basedir):
113 # type: (List[Any], unicode) -> None
117 if self.single_collection:
118 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
119 keep_client=self.arvrunner.keep_client,
120 num_retries=self.arvrunner.num_retries)
122 already_uploaded = self.arvrunner.get_uploaded()
124 for k in referenced_files:
126 if loc in already_uploaded:
127 v = already_uploaded[loc]
128 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
129 if self.single_collection:
130 basename = k["basename"]
131 if basename not in collection:
132 self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
133 copied_files.add((loc, basename, v.type))
135 for srcobj in referenced_files:
136 self.visit(srcobj, uploadfiles)
138 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
141 num_retries=self.arvrunner.num_retries,
142 fnPattern="keep:%s/%s",
144 project=self.arvrunner.project_uuid,
145 collection=collection)
147 for src, ab, st in uploadfiles:
148 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
149 "Directory" if os.path.isdir(ab) else "File", True)
150 self.arvrunner.add_uploaded(src, self._pathmap[src])
152 for loc, basename, cls in copied_files:
153 fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
154 self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
156 for srcobj in referenced_files:
158 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
159 c = arvados.collection.Collection(api_client=self.arvrunner.api,
160 keep_client=self.arvrunner.keep_client,
161 num_retries=self.arvrunner.num_retries)
162 for l in srcobj.get("listing", []):
163 self.addentry(l, c, ".", subdirs)
165 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
166 if not check["items"]:
167 c.save_new(owner_uuid=self.arvrunner.project_uuid)
169 ab = self.collection_pattern % c.portable_data_hash()
170 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
171 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
172 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
174 c = arvados.collection.Collection(api_client=self.arvrunner.api,
175 keep_client=self.arvrunner.keep_client,
176 num_retries=self.arvrunner.num_retries )
177 self.addentry(srcobj, c, ".", subdirs)
179 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
180 if not check["items"]:
181 c.save_new(owner_uuid=self.arvrunner.project_uuid)
183 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
184 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
186 if srcobj.get("secondaryFiles"):
187 ab = self.collection_pattern % c.portable_data_hash()
188 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
191 for loc, sub in subdirs:
192 # subdirs will all start with "./", strip it off
193 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
194 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
195 ab, "Directory", True)
199 def reversemap(self, target):
200 p = super(ArvPathMapper, self).reversemap(target)
203 elif target.startswith("keep:"):
204 return (target, target)
205 elif self.keepdir and target.startswith(self.keepdir):
206 kp = "keep:" + target[len(self.keepdir)+1:]
211 class StagingPathMapper(PathMapper):
214 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
216 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
218 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
219 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
220 loc = obj["location"]
221 tgt = os.path.join(stagedir, obj["basename"])
222 basetgt, baseext = os.path.splitext(tgt)
224 while tgt in self.targets:
226 tgt = "%s_%i%s" % (basetgt, n, baseext)
227 self.targets.add(tgt)
228 if obj["class"] == "Directory":
229 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
230 if loc.startswith("_:") or self._follow_dirs:
231 self.visitlisting(obj.get("listing", []), tgt, basedir)
232 elif obj["class"] == "File":
233 if loc in self._pathmap:
235 if "contents" in obj and loc.startswith("_:"):
236 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
239 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
241 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
242 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
245 class VwdPathMapper(StagingPathMapper):
246 def setup(self, referenced_files, basedir):
247 # type: (List[Any], unicode) -> None
249 # Go through each file and set the target to its own directory along
250 # with any secondary files.
251 self.visitlisting(referenced_files, self.stagedir, basedir)
253 for path, (ab, tgt, type, staged) in self._pathmap.items():
254 if type in ("File", "Directory") and ab.startswith("keep:"):
255 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
258 class NoFollowPathMapper(StagingPathMapper):
260 def setup(self, referenced_files, basedir):
261 # type: (List[Any], unicode) -> None
262 self.visitlisting(referenced_files, self.stagedir, basedir)