- def _cache_is_valid(self, filepath):
- try:
- with open(filepath, 'r') as cache_file:
- manifest = json.load(cache_file)['manifest']
- kc = arvados.keep.KeepClient(api_client=api_client)
- # Check that the first block's token (oldest) is valid
- for line in manifest.split('\n'):
- match = arvados.util.signed_locator_pattern.search(line)
- if match is not None:
- loc = match.group(0)
- return kc.head(loc, num_retries=self.num_retries)
- # No signed locator found, all ok.
- return True
- except Exception as e:
- self.logger.info("Something wrong happened when checking cache file: {}".format(e))
- return False
-