+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
import fnmatch
import os
import errno
return os.path.realpath(path)
class CollectionFetcher(DefaultFetcher):
- def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
+ def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4, overrides=None):
super(CollectionFetcher, self).__init__(cache, session)
self.api_client = api_client
self.fsaccess = fs_access
self.num_retries = num_retries
+ self.overrides = overrides if overrides else {}
def fetch_text(self, url):
+ if url in self.overrides:
+ return self.overrides[url]
if url.startswith("keep:"):
with self.fsaccess.open(url, "r") as f:
return f.read()
return super(CollectionFetcher, self).fetch_text(url)
def check_exists(self, url):
+ if url in self.overrides:
+ return True
try:
if url.startswith("http://arvados.org/cwl"):
return True