import os
import zlib
import bz2
-from ._ranges import locators_and_ranges, replace_range, Range
-from arvados.retry import retry_method
import config
import hashlib
import threading
import Queue
import copy
import errno
-from .errors import KeepWriteError, AssertionError
+import re
+
+from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
-from _normalize_stream import normalize_stream
+from ._normalize_stream import normalize_stream
+from ._ranges import locators_and_ranges, replace_range, Range
+from .retry import retry_method
def split(path):
"""split(path) -> streamname, filename
return re.sub('\.(bz2|gz)$', '', self.name)
@_FileLikeObjectBase._before_close
- def seek(self, pos, whence=os.SEEK_CUR):
+ def seek(self, pos, whence=os.SEEK_SET):
if whence == os.SEEK_CUR:
pos += self._filepos
elif whence == os.SEEK_END:
self.buffer_view = None
self.buffer_block = None
else:
- raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
+ raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
@synchronized
def state(self):
def release(self):
pass
+
def must_be_writable(orig_func):
@functools.wraps(orig_func)
def must_be_writable_wrapper(self, *args, **kwargs):
thread.daemon = True
thread.start()
- # Mark the block as PENDING so to disallow any more appends.
- block.set_state(_BufferBlock.PENDING)
- self._put_queue.put(block)
+ if block.state() == _BufferBlock.WRITABLE:
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ self._put_queue.put(block)
@synchronized
def get_bufferblock(self, locator):
items = self._bufferblocks.items()
for k,v in items:
- if v.state() == _BufferBlock.WRITABLE:
- self.commit_bufferblock(v)
+ v.owner.flush()
with self.lock:
if self._put_queue is not None:
# segment is past the trucate size, all done
break
elif size < range_end:
- nr = Range(r.locator, r.range_start, size - r.range_start)
+ nr = Range(r.locator, r.range_start, size - r.range_start, 0)
nr.segment_offset = r.segment_offset
new_segs.append(nr)
break
elif size > self.size():
raise IOError("truncate() does not support extending the file size")
- def readfrom(self, offset, size, num_retries):
- """Read upto `size` bytes from the file starting at `offset`."""
+
+ def readfrom(self, offset, size, num_retries, exact=False):
+ """Read up to `size` bytes from the file starting at `offset`.
+
+ :exact:
+ If False (default), return less data than requested if the read
+ crosses a block boundary and the next block isn't cached. If True,
+ only return less data than requested when hitting EOF.
+ """
with self.lock:
if size == 0 or offset >= self.size():
data = []
for lr in readsegs:
- block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
+ block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
else:
break
return ''.join(data)
- def _repack_writes(self):
+ def _repack_writes(self, num_retries):
"""Test if the buffer block has more data than actual segments.
This happens when a buffered write over-writes a file range written in
if write_total < self._current_bblock.size():
# There is more data in the buffer block than is actually accounted for by segments, so
# re-pack into a new buffer by copying over to a new buffer block.
+ contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
for t in bufferblock_segs:
- new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
+ new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
t.segment_offset = new_bb.size() - t.range_size
self._current_bblock = new_bb
raise ArgumentError("Offset is past the end of the file")
if len(data) > config.KEEP_BLOCK_SIZE:
- raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
+ # Chunk it up into smaller writes
+ n = 0
+ dataview = memoryview(data)
+ while n < len(data):
+ self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
+ n += config.KEEP_BLOCK_SIZE
+ return
self._modified = True
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes()
+ self._repack_writes(num_retries)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
@synchronized
- def flush(self):
+ def flush(self, num_retries=0):
if self._current_bblock:
- self._repack_writes()
+ self._repack_writes(num_retries)
self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
-
@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
"""Internal implementation of add_segment."""
self._modified = True
for lr in locators_and_ranges(blocks, pos, size):
- last = self._segments[-1] if self._segments else Range(0, 0, 0)
+ last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
self._segments.append(r)
else:
return 0
-
@synchronized
def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
buf = ""
- item = self
filestream = []
- for segment in item.segments:
+ for segment in self.segments:
loc = segment.locator
if loc.startswith("bufferblock"):
- loc = item._bufferblocks[loc].calculate_locator()
+ loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
filestream.append(LocatorAndRange(loc, locator_block_size(loc),
@retry_method
def read(self, size, num_retries=None):
"""Read up to `size` bytes from the stream, starting at the current file position."""
- data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
+ data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
self._filepos += len(data)
return data
def flush(self):
self.arvadosfile.flush()
- @_FileLikeObjectBase._before_close
def close(self):
- self.flush()
- super(ArvadosFileWriter, self).close()
+ if not self.closed:
+ self.flush()
+ super(ArvadosFileWriter, self).close()