1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues
18 from collections import OrderedDict
20 import ruamel.yaml as yaml
22 import cwltool.stdfsaccess
23 from cwltool.pathmapper import abspath
24 import cwltool.resolver
27 import arvados.collection
28 import arvados.arvfile
31 from googleapiclient.errors import HttpError
33 from schema_salad.ref_resolver import DefaultFetcher
35 logger = logging.getLogger('arvados.cwl-runner')
37 pdh_size = re.compile(r'([0-9a-f]{32})\+(\d+)(\+\S+)*')
39 class CollectionCache(object):
40 def __init__(self, api_client, keep_client, num_retries,
43 self.api_client = api_client
44 self.keep_client = keep_client
45 self.num_retries = num_retries
46 self.collections = OrderedDict()
47 self.lock = threading.Lock()
50 self.min_entries = min_entries
52 def set_cap(self, cap):
55 def cap_cache(self, required):
56 # ordered dict iterates from oldest to newest
57 for pdh, v in list(self.collections.items()):
58 available = self.cap - self.total
59 if available >= required or len(self.collections) < self.min_entries:
62 logger.debug("Evicting collection reader %s from cache (cap %s total %s required %s)", pdh, self.cap, self.total, required)
63 del self.collections[pdh]
66 def get(self, locator):
68 if locator not in self.collections:
69 m = pdh_size.match(locator)
71 self.cap_cache(int(m.group(2)) * 128)
72 logger.debug("Creating collection reader for %s", locator)
74 cr = arvados.collection.CollectionReader(locator, api_client=self.api_client,
75 keep_client=self.keep_client,
76 num_retries=self.num_retries)
77 except arvados.errors.ApiError as ap:
78 raise IOError(errno.ENOENT, "Could not access collection '%s': %s" % (locator, str(ap._get_reason())))
79 sz = len(cr.manifest_text()) * 128
80 self.collections[locator] = (cr, sz)
83 cr, sz = self.collections[locator]
85 del self.collections[locator]
86 self.collections[locator] = (cr, sz)
90 class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
91 """Implement the cwltool FsAccess interface for Arvados Collections."""
93 def __init__(self, basedir, collection_cache=None):
94 super(CollectionFsAccess, self).__init__(basedir)
95 self.collection_cache = collection_cache
97 def get_collection(self, path):
98 sp = path.split("/", 1)
100 if p.startswith("keep:") and (arvados.util.keep_locator_pattern.match(p[5:]) or
101 arvados.util.collection_uuid_pattern.match(p[5:])):
103 return (self.collection_cache.get(locator), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
107 def _match(self, collection, patternsegments, parent):
108 if not patternsegments:
111 if not isinstance(collection, arvados.collection.RichCollectionBase):
115 # iterate over the files and subcollections in 'collection'
116 for filename in collection:
117 if patternsegments[0] == '.':
118 # Pattern contains something like "./foo" so just shift
120 ret.extend(self._match(collection, patternsegments[1:], parent))
121 elif fnmatch.fnmatch(filename, patternsegments[0]):
122 cur = os.path.join(parent, filename)
123 if len(patternsegments) == 1:
126 ret.extend(self._match(collection[filename], patternsegments[1:], cur))
129 def glob(self, pattern):
130 collection, rest = self.get_collection(pattern)
131 if collection is not None and not rest:
133 patternsegments = rest.split("/")
134 return sorted(self._match(collection, patternsegments, "keep:" + collection.manifest_locator()))
136 def open(self, fn, mode, encoding=None):
137 collection, rest = self.get_collection(fn)
138 if collection is not None:
139 return collection.open(rest, mode, encoding=encoding)
141 return super(CollectionFsAccess, self).open(self._abs(fn), mode)
143 def exists(self, fn):
145 collection, rest = self.get_collection(fn)
146 except HttpError as err:
147 if err.resp.status == 404:
151 if collection is not None:
153 return collection.exists(rest)
157 return super(CollectionFsAccess, self).exists(fn)
159 def size(self, fn): # type: (unicode) -> bool
160 collection, rest = self.get_collection(fn)
161 if collection is not None:
163 arvfile = collection.find(rest)
164 if isinstance(arvfile, arvados.arvfile.ArvadosFile):
165 return arvfile.size()
166 raise IOError(errno.EINVAL, "Not a path to a file %s" % (fn))
168 return super(CollectionFsAccess, self).size(fn)
170 def isfile(self, fn): # type: (unicode) -> bool
171 collection, rest = self.get_collection(fn)
172 if collection is not None:
174 return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
178 return super(CollectionFsAccess, self).isfile(fn)
180 def isdir(self, fn): # type: (unicode) -> bool
181 collection, rest = self.get_collection(fn)
182 if collection is not None:
184 return isinstance(collection.find(rest), arvados.collection.RichCollectionBase)
188 return super(CollectionFsAccess, self).isdir(fn)
190 def listdir(self, fn): # type: (unicode) -> List[unicode]
191 collection, rest = self.get_collection(fn)
192 if collection is not None:
194 dir = collection.find(rest)
198 raise IOError(errno.ENOENT, "Directory '%s' in '%s' not found" % (rest, collection.portable_data_hash()))
199 if not isinstance(dir, arvados.collection.RichCollectionBase):
200 raise IOError(errno.ENOENT, "Path '%s' in '%s' is not a Directory" % (rest, collection.portable_data_hash()))
201 return [abspath(l, fn) for l in list(dir.keys())]
203 return super(CollectionFsAccess, self).listdir(fn)
205 def join(self, path, *paths): # type: (unicode, *unicode) -> unicode
206 if paths and paths[-1].startswith("keep:") and arvados.util.keep_locator_pattern.match(paths[-1][5:]):
208 return os.path.join(path, *paths)
210 def realpath(self, path):
211 if path.startswith("$(task.tmpdir)") or path.startswith("$(task.outdir)"):
213 collection, rest = self.get_collection(path)
214 if collection is not None:
217 return os.path.realpath(path)
219 class CollectionFetcher(DefaultFetcher):
220 def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
221 super(CollectionFetcher, self).__init__(cache, session)
222 self.api_client = api_client
223 self.fsaccess = fs_access
224 self.num_retries = num_retries
226 def fetch_text(self, url):
227 if url.startswith("keep:"):
228 with self.fsaccess.open(url, "r", encoding="utf-8") as f:
230 if url.startswith("arvwf:"):
231 record = self.api_client.workflows().get(uuid=url[6:]).execute(num_retries=self.num_retries)
232 definition = yaml.round_trip_load(record["definition"])
233 definition["label"] = record["name"]
234 return yaml.round_trip_dump(definition)
235 return super(CollectionFetcher, self).fetch_text(url)
237 def check_exists(self, url):
239 if url.startswith("http://arvados.org/cwl"):
241 if url.startswith("keep:"):
242 return self.fsaccess.exists(url)
243 if url.startswith("arvwf:"):
244 if self.fetch_text(url):
246 except arvados.errors.NotFoundError:
249 logger.exception("Got unexpected exception checking if file exists")
251 return super(CollectionFetcher, self).check_exists(url)
253 def urljoin(self, base_url, url):
257 urlsp = urllib.parse.urlsplit(url)
258 if urlsp.scheme or not base_url:
261 basesp = urllib.parse.urlsplit(base_url)
262 if basesp.scheme in ("keep", "arvwf"):
264 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
266 baseparts = basesp.path.split("/")
267 urlparts = urlsp.path.split("/") if urlsp.path else []
269 locator = baseparts.pop(0)
271 if (basesp.scheme == "keep" and
272 (not arvados.util.keep_locator_pattern.match(locator)) and
273 (not arvados.util.collection_uuid_pattern.match(locator))):
274 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
276 if urlsp.path.startswith("/"):
280 if baseparts and urlsp.path:
283 path = "/".join([locator] + baseparts + urlparts)
284 return urllib.parse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
286 return super(CollectionFetcher, self).urljoin(base_url, url)
288 schemes = [u"file", u"http", u"https", u"mailto", u"keep", u"arvwf"]
290 def supported_schemes(self): # type: () -> List[Text]
294 workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
295 pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
297 def collectionResolver(api_client, document_loader, uri, num_retries=4):
298 if uri.startswith("keep:") or uri.startswith("arvwf:"):
301 if workflow_uuid_pattern.match(uri):
302 return u"arvwf:%s#main" % (uri)
304 if pipeline_template_uuid_pattern.match(uri):
305 pt = api_client.pipeline_templates().get(uuid=uri).execute(num_retries=num_retries)
306 return u"keep:" + viewvalues(pt["components"])[0]["script_parameters"]["cwl:tool"]
309 if arvados.util.keep_locator_pattern.match(p[0]):
310 return u"keep:%s" % (uri)
312 if arvados.util.collection_uuid_pattern.match(p[0]):
313 return u"keep:%s%s" % (api_client.collections().
314 get(uuid=p[0]).execute()["portable_data_hash"],
317 return cwltool.resolver.tool_resolver(document_loader, uri)