Merge branch 'master' into 9687-container-request-display
[arvados.git] / sdk / python / arvados / arvfile.py
index ce0e5e3564559c707825d9acad5717a3a5e42be0..f2f7df2dce2121b0c0c0e4b562f7c7963f1fc0ab 100644 (file)
@@ -108,6 +108,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
         cache_pos, cache_data = self._readline_cache
         if self.tell() == cache_pos:
             data = [cache_data]
+            self._filepos += len(cache_data)
         else:
             data = ['']
         data_size = len(data[-1])
@@ -123,6 +124,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
         except ValueError:
             nextline_index = len(data)
         nextline_index = min(nextline_index, size)
+        self._filepos -= len(data) - nextline_index
         self._readline_cache = (self.tell(), data[nextline_index:])
         return data[:nextline_index]
 
@@ -320,7 +322,7 @@ class _BufferBlock(object):
     @synchronized
     def set_state(self, nextstate, val=None):
         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
-            raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate)
+            raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
         self._state = nextstate
 
         if self._state == _BufferBlock.PENDING:
@@ -400,7 +402,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep):
+    def __init__(self, keep, copies=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = {}
@@ -412,6 +414,7 @@ class _BlockManager(object):
         self.prefetch_enabled = True
         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.copies = copies
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -462,7 +465,10 @@ class _BlockManager(object):
                 if bufferblock is None:
                     return
 
-                loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                if self.copies is None:
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                else:
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
             except Exception as e:
@@ -491,7 +497,7 @@ class _BlockManager(object):
             for i in xrange(0, self.num_put_threads):
                 thread = threading.Thread(target=self._commit_bufferblock_worker)
                 self._put_threads.append(thread)
-                thread.daemon = False
+                thread.daemon = True
                 thread.start()
 
     def _block_prefetch_worker(self):
@@ -543,9 +549,6 @@ class _BlockManager(object):
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
-    def __del__(self):
-        self.stop_threads()
-
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
 
@@ -564,15 +567,24 @@ class _BlockManager(object):
             # Mark the block as PENDING so to disallow any more appends.
             block.set_state(_BufferBlock.PENDING)
         except StateChangeError as e:
-            if e.state == _BufferBlock.PENDING and sync:
-                block.wait_for_commit.wait()
-                if block.state() == _BufferBlock.ERROR:
-                    raise block.error
-            return
+            if e.state == _BufferBlock.PENDING:
+                if sync:
+                    block.wait_for_commit.wait()
+                else:
+                    return
+            if block.state() == _BufferBlock.COMMITTED:
+                return
+            elif block.state() == _BufferBlock.ERROR:
+                raise block.error
+            else:
+                raise
 
         if sync:
             try:
-                loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                if self.copies is None:
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                else:
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
@@ -929,7 +941,7 @@ class ArvadosFile(object):
                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
                 if bb:
                     if bb.state() != _BufferBlock.COMMITTED:
-                        self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=True)
+                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
                     to_delete.add(s.locator)
                     s.locator = bb.locator()
             for s in to_delete: