5627: Python file-like objects use SEET_SET as the default whence.
[arvados.git] / sdk / python / arvados / arvfile.py
index c4bbf644245d97df61d914c154480f446cfe6a3f..3129bdf2ff0c890337e4594d9fcf9605745d4b04 100644 (file)
@@ -2,17 +2,19 @@ import functools
 import os
 import zlib
 import bz2
-from ._ranges import locators_and_ranges, replace_range, Range
-from arvados.retry import retry_method
 import config
 import hashlib
 import threading
 import Queue
 import copy
 import errno
+import re
+
 from .errors import KeepWriteError, AssertionError
 from .keep import KeepLocator
-from _normalize_stream import normalize_stream
+from ._normalize_stream import normalize_stream
+from ._ranges import locators_and_ranges, replace_range, Range
+from .retry import retry_method
 
 def split(path):
     """split(path) -> streamname, filename
@@ -75,7 +77,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
         return re.sub('\.(bz2|gz)$', '', self.name)
 
     @_FileLikeObjectBase._before_close
-    def seek(self, pos, whence=os.SEEK_CUR):
+    def seek(self, pos, whence=os.SEEK_SET):
         if whence == os.SEEK_CUR:
             pos += self._filepos
         elif whence == os.SEEK_END:
@@ -342,6 +344,7 @@ class NoopLock(object):
     def release(self):
         pass
 
+
 def must_be_writable(orig_func):
     @functools.wraps(orig_func)
     def must_be_writable_wrapper(self, *args, **kwargs):
@@ -534,7 +537,7 @@ class _BlockManager(object):
                             err.append(self._put_errors.get(False))
                     except Queue.Empty:
                         pass
-                    raise KeepWriteError("Error writing some blocks", err)
+                    raise KeepWriteError("Error writing some blocks", err, label="block")
 
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
@@ -791,6 +794,12 @@ class ArvadosFile(object):
 
         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
 
+    @synchronized
+    def flush(self):
+        if self._current_bblock:
+            self._repack_writes()
+            self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+
     @must_be_writable
     @synchronized
     def add_segment(self, blocks, pos, size):
@@ -819,16 +828,14 @@ class ArvadosFile(object):
         else:
             return 0
 
-
     @synchronized
     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
         buf = ""
-        item = self
         filestream = []
-        for segment in item.segments:
+        for segment in self.segments:
             loc = segment.locator
             if loc.startswith("bufferblock"):
-                loc = item._bufferblocks[loc].calculate_locator()
+                loc = self._bufferblocks[loc].calculate_locator()
             if portable_locators:
                 loc = KeepLocator(loc).stripped()
             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
@@ -900,9 +907,19 @@ class ArvadosFileWriter(ArvadosFileReader):
         for s in seq:
             self.write(s, num_retries)
 
+    @_FileLikeObjectBase._before_close
     def truncate(self, size=None):
         if size is None:
             size = self._filepos
         self.arvadosfile.truncate(size)
         if self._filepos > self.size():
             self._filepos = self.size()
+
+    @_FileLikeObjectBase._before_close
+    def flush(self):
+        self.arvadosfile.flush()
+
+    def close(self):
+        if not self.closed:
+            self.flush()
+            super(ArvadosFileWriter, self).close()