1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
12 import arvados.commands.run
13 import arvados.collection
15 from schema_salad.sourceline import SourceLine
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
19 from cwltool.workflow import WorkflowException
21 from .http import http_to_keep
23 logger = logging.getLogger('arvados.cwl-runner')
25 def trim_listing(obj):
26 """Remove 'listing' field from Directory objects that are keep references.
28 When Directory objects represent Keep references, it is redundant and
29 potentially very expensive to pass fully enumerated Directory objects
30 between instances of cwl-runner (e.g. a submitting a job, or using the
31 RunInSingleContainer feature), so delete the 'listing' field when it is
36 if obj.get("location", "").startswith("keep:") and "listing" in obj:
40 class ArvPathMapper(PathMapper):
41 """Convert container-local paths to and from Keep collection ids."""
43 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
44 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
46 def __init__(self, arvrunner, referenced_files, input_basedir,
47 collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
48 self.arvrunner = arvrunner
49 self.input_basedir = input_basedir
50 self.collection_pattern = collection_pattern
51 self.file_pattern = file_pattern
53 self.referenced_files = [r["location"] for r in referenced_files]
54 self.single_collection = single_collection
55 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
57 def visit(self, srcobj, uploadfiles):
58 src = srcobj["location"]
60 src = src[:src.index("#")]
62 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
63 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
65 debug = logger.isEnabledFor(logging.DEBUG)
67 if src not in self._pathmap:
68 if src.startswith("file:"):
69 # Local FS ref, may need to be uploaded or may be on keep
71 ab = abspath(src, self.input_basedir)
72 st = arvados.commands.run.statfile("", ab,
73 fnPattern="keep:%s/%s",
74 dirPattern="keep:%s/%s",
76 with SourceLine(srcobj, "location", WorkflowException, debug):
77 if isinstance(st, arvados.commands.run.UploadFile):
78 uploadfiles.add((src, ab, st))
79 elif isinstance(st, arvados.commands.run.ArvFile):
80 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
82 raise WorkflowException("Input file path '%s' is invalid" % st)
83 elif src.startswith("_:"):
84 if srcobj["class"] == "File" and "contents" not in srcobj:
85 raise WorkflowException("File literal '%s' is missing `contents`" % src)
86 if srcobj["class"] == "Directory" and "listing" not in srcobj:
87 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
88 elif src.startswith("http:") or src.startswith("https:"):
89 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
90 logger.info("%s is %s", src, keepref)
91 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
93 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
95 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
96 for l in srcobj.get("secondaryFiles", []):
97 self.visit(l, uploadfiles)
98 with SourceLine(srcobj, "listing", WorkflowException, debug):
99 for l in srcobj.get("listing", []):
100 self.visit(l, uploadfiles)
102 def addentry(self, obj, c, path, remap):
103 if obj["location"] in self._pathmap:
104 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
107 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
108 remap.append((obj["location"], path + "/" + obj["basename"]))
109 for l in obj.get("secondaryFiles", []):
110 self.addentry(l, c, path, remap)
111 elif obj["class"] == "Directory":
112 for l in obj.get("listing", []):
113 self.addentry(l, c, path + "/" + obj["basename"], remap)
114 remap.append((obj["location"], path + "/" + obj["basename"]))
115 elif obj["location"].startswith("_:") and "contents" in obj:
116 with c.open(path + "/" + obj["basename"], "w") as f:
117 f.write(obj["contents"].encode("utf-8"))
118 remap.append((obj["location"], path + "/" + obj["basename"]))
120 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
122 def setup(self, referenced_files, basedir):
123 # type: (List[Any], unicode) -> None
127 if self.single_collection:
128 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
129 keep_client=self.arvrunner.keep_client,
130 num_retries=self.arvrunner.num_retries)
132 for srcobj in referenced_files:
133 self.visit(srcobj, uploadfiles)
135 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
138 num_retries=self.arvrunner.num_retries,
139 fnPattern="keep:%s/%s",
141 project=self.arvrunner.project_uuid,
142 collection=collection,
145 for src, ab, st in uploadfiles:
146 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
147 "Directory" if os.path.isdir(ab) else "File", True)
149 for srcobj in referenced_files:
151 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
152 c = arvados.collection.Collection(api_client=self.arvrunner.api,
153 keep_client=self.arvrunner.keep_client,
154 num_retries=self.arvrunner.num_retries)
155 for l in srcobj.get("listing", []):
156 self.addentry(l, c, ".", remap)
158 trash_time, props = self.__get_collection_attributes()
160 c.save_new(name="Intermediate collection",
161 owner_uuid=self.arvrunner.project_uuid,
162 ensure_unique_name=True,
166 ab = self.collection_pattern % c.portable_data_hash()
167 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
168 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
169 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
171 c = arvados.collection.Collection(api_client=self.arvrunner.api,
172 keep_client=self.arvrunner.keep_client,
173 num_retries=self.arvrunner.num_retries )
174 self.addentry(srcobj, c, ".", remap)
176 trash_time, props = self.__get_collection_attributes()
178 c.save_new(name="Intermediate collection",
179 owner_uuid=self.arvrunner.project_uuid,
180 ensure_unique_name=True,
184 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
185 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
187 if srcobj.get("secondaryFiles"):
188 ab = self.collection_pattern % c.portable_data_hash()
189 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
192 for loc, sub in remap:
193 # subdirs start with "./", strip it off
194 if sub.startswith("./"):
195 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
197 ab = self.file_pattern % (c.portable_data_hash(), sub)
198 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
199 ab, "Directory", True)
203 def reversemap(self, target):
204 p = super(ArvPathMapper, self).reversemap(target)
207 elif target.startswith("keep:"):
208 return (target, target)
209 elif self.keepdir and target.startswith(self.keepdir):
210 kp = "keep:" + target[len(self.keepdir)+1:]
215 def __get_collection_attributes(self):
217 if self.arvrunner.intermediate_output_ttl > 0:
218 trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
220 current_container_uuid = None
222 current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
223 current_container_uuid = current_container['uuid']
224 except ApiError as e:
225 # Status code 404 just means we're not running in a container.
226 if e.resp.status != 404:
227 logger.info("Getting current container: %s", e)
228 properties = {"type": "Intermediate",
229 "container": current_container_uuid}
231 return (trash_time, properties)
234 class StagingPathMapper(PathMapper):
237 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
239 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
241 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
242 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
243 loc = obj["location"]
244 tgt = os.path.join(stagedir, obj["basename"])
245 basetgt, baseext = os.path.splitext(tgt)
247 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
248 while tgt in self.targets:
250 tgt = "%s_%i%s" % (basetgt, n, baseext)
251 self.targets.add(tgt)
252 if obj["class"] == "Directory":
253 if obj.get("writable"):
254 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
256 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
257 if loc.startswith("_:") or self._follow_dirs:
258 self.visitlisting(obj.get("listing", []), tgt, basedir)
259 elif obj["class"] == "File":
260 if loc in self._pathmap:
262 if "contents" in obj and loc.startswith("_:"):
263 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
265 if copy or obj.get("writable"):
266 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
268 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
269 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
272 class VwdPathMapper(StagingPathMapper):
273 def setup(self, referenced_files, basedir):
274 # type: (List[Any], unicode) -> None
276 # Go through each file and set the target to its own directory along
277 # with any secondary files.
278 self.visitlisting(referenced_files, self.stagedir, basedir)
280 for path, (ab, tgt, type, staged) in self._pathmap.items():
281 if type in ("File", "Directory") and ab.startswith("keep:"):
282 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
285 class NoFollowPathMapper(StagingPathMapper):
287 def setup(self, referenced_files, basedir):
288 # type: (List[Any], unicode) -> None
289 self.visitlisting(referenced_files, self.stagedir, basedir)