import os
import zlib
import bz2
-from ._ranges import locators_and_ranges, replace_range, Range
-from arvados.retry import retry_method
import config
import hashlib
import threading
import Queue
import copy
import errno
+import re
+
from .errors import KeepWriteError, AssertionError
from .keep import KeepLocator
-from _normalize_stream import normalize_stream
+from ._normalize_stream import normalize_stream
+from ._ranges import locators_and_ranges, replace_range, Range
+from .retry import retry_method
def split(path):
"""split(path) -> streamname, filename
return re.sub('\.(bz2|gz)$', '', self.name)
@_FileLikeObjectBase._before_close
- def seek(self, pos, whence=os.SEEK_CUR):
+ def seek(self, pos, whence=os.SEEK_SET):
if whence == os.SEEK_CUR:
pos += self._filepos
elif whence == os.SEEK_END:
def release(self):
pass
+
def must_be_writable(orig_func):
@functools.wraps(orig_func)
def must_be_writable_wrapper(self, *args, **kwargs):
err.append(self._put_errors.get(False))
except Queue.Empty:
pass
- raise KeepWriteError("Error writing some blocks", err)
+ raise KeepWriteError("Error writing some blocks", err, label="block")
def block_prefetch(self, locator):
"""Initiate a background download of a block.
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
+ @synchronized
+ def flush(self):
+ if self._current_bblock:
+ self._repack_writes()
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+
@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
else:
return 0
-
@synchronized
def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
buf = ""
- item = self
filestream = []
- for segment in item.segments:
+ for segment in self.segments:
loc = segment.locator
if loc.startswith("bufferblock"):
- loc = item._bufferblocks[loc].calculate_locator()
+ loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
filestream.append(LocatorAndRange(loc, locator_block_size(loc),
for s in seq:
self.write(s, num_retries)
+ @_FileLikeObjectBase._before_close
def truncate(self, size=None):
if size is None:
size = self._filepos
self.arvadosfile.truncate(size)
if self._filepos > self.size():
self._filepos = self.size()
+
+ @_FileLikeObjectBase._before_close
+ def flush(self):
+ self.arvadosfile.flush()
+
+ def close(self):
+ if not self.closed:
+ self.flush()
+ super(ArvadosFileWriter, self).close()