1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues
18 from collections import OrderedDict
20 import ruamel.yaml as yaml
22 import cwltool.stdfsaccess
23 from cwltool.pathmapper import abspath
24 import cwltool.resolver
27 import arvados.collection
28 import arvados.arvfile
31 from googleapiclient.errors import HttpError
33 from schema_salad.ref_resolver import DefaultFetcher
35 logger = logging.getLogger('arvados.cwl-runner')
37 pdh_size = re.compile(r'([0-9a-f]{32})\+(\d+)(\+\S+)*')
39 class CollectionCache(object):
40 def __init__(self, api_client, keep_client, num_retries,
43 self.api_client = api_client
44 self.keep_client = keep_client
45 self.num_retries = num_retries
46 self.collections = OrderedDict()
47 self.lock = threading.Lock()
50 self.min_entries = min_entries
52 def set_cap(self, cap):
55 def cap_cache(self, required):
56 # ordered dict iterates from oldest to newest
57 for pdh, v in list(self.collections.items()):
58 available = self.cap - self.total
59 if available >= required or len(self.collections) < self.min_entries:
62 logger.debug("Evicting collection reader %s from cache (cap %s total %s required %s)", pdh, self.cap, self.total, required)
63 del self.collections[pdh]
66 def get(self, locator):
68 if locator not in self.collections:
69 m = pdh_size.match(locator)
71 self.cap_cache(int(m.group(2)) * 128)
72 logger.debug("Creating collection reader for %s", locator)
74 cr = arvados.collection.CollectionReader(locator, api_client=self.api_client,
75 keep_client=self.keep_client,
76 num_retries=self.num_retries)
77 except arvados.errors.ApiError as ap:
78 raise IOError(errno.ENOENT, "Could not access collection '%s': %s" % (locator, str(ap._get_reason())))
79 sz = len(cr.manifest_text()) * 128
80 self.collections[locator] = (cr, sz)
83 cr, sz = self.collections[locator]
85 del self.collections[locator]
86 self.collections[locator] = (cr, sz)
90 class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
91 """Implement the cwltool FsAccess interface for Arvados Collections."""
93 def __init__(self, basedir, collection_cache=None):
94 super(CollectionFsAccess, self).__init__(basedir)
95 self.collection_cache = collection_cache
97 def get_collection(self, path):
98 sp = path.split("/", 1)
100 if p.startswith("keep:") and (arvados.util.keep_locator_pattern.match(p[5:]) or
101 arvados.util.collection_uuid_pattern.match(p[5:])):
103 return (self.collection_cache.get(locator), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
107 def _match(self, collection, patternsegments, parent):
108 if not patternsegments:
111 if not isinstance(collection, arvados.collection.RichCollectionBase):
115 # iterate over the files and subcollections in 'collection'
116 for filename in collection:
117 if patternsegments[0] == '.':
118 # Pattern contains something like "./foo" so just shift
120 ret.extend(self._match(collection, patternsegments[1:], parent))
121 elif fnmatch.fnmatch(filename, patternsegments[0]):
122 cur = os.path.join(parent, filename)
123 if len(patternsegments) == 1:
126 ret.extend(self._match(collection[filename], patternsegments[1:], cur))
129 def glob(self, pattern):
130 collection, rest = self.get_collection(pattern)
131 if collection is not None and not rest:
133 patternsegments = rest.split("/")
134 return sorted(self._match(collection, patternsegments, "keep:" + collection.manifest_locator()))
136 def open(self, fn, mode, encoding=None):
137 collection, rest = self.get_collection(fn)
138 if collection is not None:
139 return collection.open(rest, mode, encoding=encoding)
141 return super(CollectionFsAccess, self).open(self._abs(fn), mode)
143 def exists(self, fn):
145 collection, rest = self.get_collection(fn)
146 except HttpError as err:
147 if err.resp.status == 404:
151 if collection is not None:
153 return collection.exists(rest)
157 return super(CollectionFsAccess, self).exists(fn)
159 def size(self, fn): # type: (unicode) -> bool
160 collection, rest = self.get_collection(fn)
161 if collection is not None:
163 arvfile = collection.find(rest)
164 if isinstance(arvfile, arvados.arvfile.ArvadosFile):
165 return arvfile.size()
166 raise IOError(errno.EINVAL, "Not a path to a file %s" % (fn))
168 return super(CollectionFsAccess, self).size(fn)
170 def isfile(self, fn): # type: (unicode) -> bool
171 collection, rest = self.get_collection(fn)
172 if collection is not None:
174 return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
178 return super(CollectionFsAccess, self).isfile(fn)
180 def isdir(self, fn): # type: (unicode) -> bool
181 collection, rest = self.get_collection(fn)
182 if collection is not None:
184 return isinstance(collection.find(rest), arvados.collection.RichCollectionBase)
188 return super(CollectionFsAccess, self).isdir(fn)
190 def listdir(self, fn): # type: (unicode) -> List[unicode]
191 collection, rest = self.get_collection(fn)
192 if collection is not None:
194 dir = collection.find(rest)
198 raise IOError(errno.ENOENT, "Directory '%s' in '%s' not found" % (rest, collection.portable_data_hash()))
199 if not isinstance(dir, arvados.collection.RichCollectionBase):
200 raise IOError(errno.ENOENT, "Path '%s' in '%s' is not a Directory" % (rest, collection.portable_data_hash()))
201 return [abspath(l, fn) for l in list(dir.keys())]
203 return super(CollectionFsAccess, self).listdir(fn)
205 def join(self, path, *paths): # type: (unicode, *unicode) -> unicode
206 if paths and paths[-1].startswith("keep:") and arvados.util.keep_locator_pattern.match(paths[-1][5:]):
208 return os.path.join(path, *paths)
210 def realpath(self, path):
211 if path.startswith("$(task.tmpdir)") or path.startswith("$(task.outdir)"):
213 collection, rest = self.get_collection(path)
214 if collection is not None:
217 return os.path.realpath(path)
219 class CollectionFetcher(DefaultFetcher):
220 def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
221 super(CollectionFetcher, self).__init__(cache, session)
222 self.api_client = api_client
223 self.fsaccess = fs_access
224 self.num_retries = num_retries
226 def fetch_text(self, url):
227 if url.startswith("keep:"):
228 with self.fsaccess.open(url, "r", encoding="utf-8") as f:
230 if url.startswith("arvwf:"):
231 record = self.api_client.workflows().get(uuid=url[6:]).execute(num_retries=self.num_retries)
232 definition = record["definition"] + ('\nlabel: "%s"\n' % record["name"].replace('"', '\\"'))
234 return super(CollectionFetcher, self).fetch_text(url)
236 def check_exists(self, url):
238 if url.startswith("http://arvados.org/cwl"):
240 if url.startswith("keep:"):
241 return self.fsaccess.exists(url)
242 if url.startswith("arvwf:"):
243 if self.fetch_text(url):
245 except arvados.errors.NotFoundError:
248 logger.exception("Got unexpected exception checking if file exists")
250 return super(CollectionFetcher, self).check_exists(url)
252 def urljoin(self, base_url, url):
256 urlsp = urllib.parse.urlsplit(url)
257 if urlsp.scheme or not base_url:
260 basesp = urllib.parse.urlsplit(base_url)
261 if basesp.scheme in ("keep", "arvwf"):
263 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
265 baseparts = basesp.path.split("/")
266 urlparts = urlsp.path.split("/") if urlsp.path else []
268 locator = baseparts.pop(0)
270 if (basesp.scheme == "keep" and
271 (not arvados.util.keep_locator_pattern.match(locator)) and
272 (not arvados.util.collection_uuid_pattern.match(locator))):
273 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
275 if urlsp.path.startswith("/"):
279 if baseparts and urlsp.path:
282 path = "/".join([locator] + baseparts + urlparts)
283 return urllib.parse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
285 return super(CollectionFetcher, self).urljoin(base_url, url)
287 schemes = [u"file", u"http", u"https", u"mailto", u"keep", u"arvwf"]
289 def supported_schemes(self): # type: () -> List[Text]
293 workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
294 pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
296 def collectionResolver(api_client, document_loader, uri, num_retries=4):
297 if uri.startswith("keep:") or uri.startswith("arvwf:"):
300 if workflow_uuid_pattern.match(uri):
301 return u"arvwf:%s#main" % (uri)
303 if pipeline_template_uuid_pattern.match(uri):
304 pt = api_client.pipeline_templates().get(uuid=uri).execute(num_retries=num_retries)
305 return u"keep:" + viewvalues(pt["components"])[0]["script_parameters"]["cwl:tool"]
308 if arvados.util.keep_locator_pattern.match(p[0]):
309 return u"keep:%s" % (uri)
311 if arvados.util.collection_uuid_pattern.match(p[0]):
312 return u"keep:%s%s" % (api_client.collections().
313 get(uuid=p[0]).execute()["portable_data_hash"],
316 return cwltool.resolver.tool_resolver(document_loader, uri)