projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch '18027-unmount-fuse'
[arvados.git]
/
sdk
/
python
/
arvados
/
keep.py
diff --git
a/sdk/python/arvados/keep.py
b/sdk/python/arvados/keep.py
index 2f20132aecdf17ee9b8d6da2d043037508e989ce..9dfe0436dec9bdf22eb71ad9bfe2e8a201ee3ab6 100644
(file)
--- a/
sdk/python/arvados/keep.py
+++ b/
sdk/python/arvados/keep.py
@@
-570,8
+570,8
@@
class KeepClient(object):
self.confirmed_storage_classes = {}
self.response = None
self.storage_classes_tracking = True
self.confirmed_storage_classes = {}
self.response = None
self.storage_classes_tracking = True
- self.queue_data_lock = threading.Lock()
- self.pending_tries = max(copies, len(classes))
+1
+ self.queue_data_lock = threading.
R
Lock()
+ self.pending_tries = max(copies, len(classes))
self.pending_tries_notification = threading.Condition()
def write_success(self, response, replicas_nr, classes_confirmed):
self.pending_tries_notification = threading.Condition()
def write_success(self, response, replicas_nr, classes_confirmed):
@@
-585,6
+585,7
@@
class KeepClient(object):
self.confirmed_storage_classes[st_class] += st_copies
except KeyError:
self.confirmed_storage_classes[st_class] = st_copies
self.confirmed_storage_classes[st_class] += st_copies
except KeyError:
self.confirmed_storage_classes[st_class] = st_copies
+ self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
@@
-840,6
+841,8
@@
class KeepClient(object):
self.get_counter = Counter()
self.hits_counter = Counter()
self.misses_counter = Counter()
self.get_counter = Counter()
self.hits_counter = Counter()
self.misses_counter = Counter()
+ self._storage_classes_unsupported_warning = False
+ self._default_classes = []
if local_store:
self.local_store = local_store
if local_store:
self.local_store = local_store
@@
-880,6
+883,12
@@
class KeepClient(object):
self._writable_services = None
self.using_proxy = None
self._static_services_list = False
self._writable_services = None
self.using_proxy = None
self._static_services_list = False
+ try:
+ self._default_classes = [
+ k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
+ except KeyError:
+ # We're talking to an old cluster
+ pass
def current_timeout(self, attempt_number):
"""Return the appropriate timeout to use for this client.
def current_timeout(self, attempt_number):
"""Return the appropriate timeout to use for this client.
@@
-1172,7
+1181,7
@@
class KeepClient(object):
"failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
@retry.retry_method
"failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
@retry.retry_method
- def put(self, data, copies=2, num_retries=None, request_id=None, classes=
[]
):
+ def put(self, data, copies=2, num_retries=None, request_id=None, classes=
None
):
"""Save data in Keep.
This method will get a list of Keep services from the API server, and
"""Save data in Keep.
This method will get a list of Keep services from the API server, and
@@
-1193,6
+1202,8
@@
class KeepClient(object):
be written.
"""
be written.
"""
+ classes = classes or self._default_classes
+
if not isinstance(data, bytes):
data = data.encode()
if not isinstance(data, bytes):
data = data.encode()
@@
-1253,7
+1264,9
@@
class KeepClient(object):
# success is determined only by successful copies.
#
# Disable storage classes tracking from this point forward.
# success is determined only by successful copies.
#
# Disable storage classes tracking from this point forward.
- _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+ if not self._storage_classes_unsupported_warning:
+ self._storage_classes_unsupported_warning = True
+ _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
done_classes = None
loop.save_result(
(done_copies >= copies, writer_pool.total_task_nr))
done_classes = None
loop.save_result(
(done_copies >= copies, writer_pool.total_task_nr))