X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a1fc5b9e889f8359a32470c3a7d91190d0894899..fd0074f2200bc41bc63be770fffbe2446fb0cc03:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index c4bbf64424..3129bdf2ff 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -2,17 +2,19 @@ import functools import os import zlib import bz2 -from ._ranges import locators_and_ranges, replace_range, Range -from arvados.retry import retry_method import config import hashlib import threading import Queue import copy import errno +import re + from .errors import KeepWriteError, AssertionError from .keep import KeepLocator -from _normalize_stream import normalize_stream +from ._normalize_stream import normalize_stream +from ._ranges import locators_and_ranges, replace_range, Range +from .retry import retry_method def split(path): """split(path) -> streamname, filename @@ -75,7 +77,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): return re.sub('\.(bz2|gz)$', '', self.name) @_FileLikeObjectBase._before_close - def seek(self, pos, whence=os.SEEK_CUR): + def seek(self, pos, whence=os.SEEK_SET): if whence == os.SEEK_CUR: pos += self._filepos elif whence == os.SEEK_END: @@ -342,6 +344,7 @@ class NoopLock(object): def release(self): pass + def must_be_writable(orig_func): @functools.wraps(orig_func) def must_be_writable_wrapper(self, *args, **kwargs): @@ -534,7 +537,7 @@ class _BlockManager(object): err.append(self._put_errors.get(False)) except Queue.Empty: pass - raise KeepWriteError("Error writing some blocks", err) + raise KeepWriteError("Error writing some blocks", err, label="block") def block_prefetch(self, locator): """Initiate a background download of a block. @@ -791,6 +794,12 @@ class ArvadosFile(object): replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) + @synchronized + def flush(self): + if self._current_bblock: + self._repack_writes() + self.parent._my_block_manager().commit_bufferblock(self._current_bblock) + @must_be_writable @synchronized def add_segment(self, blocks, pos, size): @@ -819,16 +828,14 @@ class ArvadosFile(object): else: return 0 - @synchronized def manifest_text(self, stream_name=".", portable_locators=False, normalize=False): buf = "" - item = self filestream = [] - for segment in item.segments: + for segment in self.segments: loc = segment.locator if loc.startswith("bufferblock"): - loc = item._bufferblocks[loc].calculate_locator() + loc = self._bufferblocks[loc].calculate_locator() if portable_locators: loc = KeepLocator(loc).stripped() filestream.append(LocatorAndRange(loc, locator_block_size(loc), @@ -900,9 +907,19 @@ class ArvadosFileWriter(ArvadosFileReader): for s in seq: self.write(s, num_retries) + @_FileLikeObjectBase._before_close def truncate(self, size=None): if size is None: size = self._filepos self.arvadosfile.truncate(size) if self._filepos > self.size(): self._filepos = self.size() + + @_FileLikeObjectBase._before_close + def flush(self): + self.arvadosfile.flush() + + def close(self): + if not self.closed: + self.flush() + super(ArvadosFileWriter, self).close()