Merge branch 'master' into 9687-container-request-display
[arvados.git] / sdk / python / arvados / arvfile.py
index b78c63e301b81d5ddb2644983e2e83017e98bbdf..f2f7df2dce2121b0c0c0e4b562f7c7963f1fc0ab 100644 (file)
@@ -402,7 +402,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep):
+    def __init__(self, keep, copies=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = {}
@@ -414,6 +414,7 @@ class _BlockManager(object):
         self.prefetch_enabled = True
         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.copies = copies
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -464,7 +465,10 @@ class _BlockManager(object):
                 if bufferblock is None:
                     return
 
-                loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                if self.copies is None:
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                else:
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
             except Exception as e:
@@ -577,7 +581,10 @@ class _BlockManager(object):
 
         if sync:
             try:
-                loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                if self.copies is None:
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                else:
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)