Merge branch '16169-cwl-hints' refs #16169
[arvados.git] / sdk / python / arvados / arvfile.py
index 37666eb8e8b8f7e2d8f4cbbdf76ff7bda56b003b..6893b94bf78b7b16a1da1a802fdabe419563eb0d 100644 (file)
@@ -481,7 +481,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -500,6 +500,7 @@ class _BlockManager(object):
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
         self.padding_block = None
+        self.num_retries = num_retries
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -554,9 +555,9 @@ class _BlockManager(object):
                     return
 
                 if self.copies is None:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries)
                 else:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
@@ -725,9 +726,9 @@ class _BlockManager(object):
         if sync:
             try:
                 if self.copies is None:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries)
                 else:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies)
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)