import copy
import errno
import re
+import logging
-from .errors import KeepWriteError, AssertionError
+from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
from ._normalize_stream import normalize_stream
from ._ranges import locators_and_ranges, replace_range, Range
from .retry import retry_method
+MOD = "mod"
+WRITE = "write"
+
+_logger = logging.getLogger('arvados.arvfile')
+
def split(path):
"""split(path) -> streamname, filename
cache_pos, cache_data = self._readline_cache
if self.tell() == cache_pos:
data = [cache_data]
+ self._filepos += len(cache_data)
else:
data = ['']
data_size = len(data[-1])
except ValueError:
nextline_index = len(data)
nextline_index = min(nextline_index, size)
+ self._filepos -= len(data) - nextline_index
self._readline_cache = (self.tell(), data[nextline_index:])
return data[:nextline_index]
@_FileLikeObjectBase._before_close
@retry_method
def decompress(self, decompress, size, num_retries=None):
- for segment in self.readall(size, num_retries):
+ for segment in self.readall(size, num_retries=num_retries):
data = decompress(segment)
if data:
yield data
return orig_func(self, *args, **kwargs)
return synchronized_wrapper
+
+class StateChangeError(Exception):
+ def __init__(self, message, state, nextstate):
+ super(StateChangeError, self).__init__(message)
+ self.state = state
+ self.nextstate = nextstate
+
class _BufferBlock(object):
"""A stand-in for a Keep block that is in the process of being written.
WRITABLE = 0
PENDING = 1
COMMITTED = 2
+ ERROR = 3
def __init__(self, blockid, starting_capacity, owner):
"""
self._locator = None
self.owner = owner
self.lock = threading.Lock()
+ self.wait_for_commit = threading.Event()
+ self.error = None
@synchronized
def append(self, data):
else:
raise AssertionError("Buffer block is not writable")
+ STATE_TRANSITIONS = frozenset([
+ (WRITABLE, PENDING),
+ (PENDING, COMMITTED),
+ (PENDING, ERROR),
+ (ERROR, PENDING)])
+
@synchronized
- def set_state(self, nextstate, loc=None):
- if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
- (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
- self._state = nextstate
- if self._state == _BufferBlock.COMMITTED:
- self._locator = loc
- self.buffer_view = None
- self.buffer_block = None
- else:
- raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
+ def set_state(self, nextstate, val=None):
+ if (self._state, nextstate) not in self.STATE_TRANSITIONS:
+ raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
+ self._state = nextstate
+
+ if self._state == _BufferBlock.PENDING:
+ self.wait_for_commit.clear()
+
+ if self._state == _BufferBlock.COMMITTED:
+ self._locator = val
+ self.buffer_view = None
+ self.buffer_block = None
+ self.wait_for_commit.set()
+
+ if self._state == _BufferBlock.ERROR:
+ self.error = val
+ self.wait_for_commit.set()
@synchronized
def state(self):
@synchronized
def clone(self, new_blockid, owner):
if self._state == _BufferBlock.COMMITTED:
- raise AssertionError("Can only duplicate a writable or pending buffer block")
+ raise AssertionError("Cannot duplicate committed buffer block")
bufferblock = _BufferBlock(new_blockid, self.size(), owner)
bufferblock.append(self.buffer_view[0:self.size()])
return bufferblock
+ @synchronized
+ def clear(self):
+ self.owner = None
+ self.buffer_block = None
+ self.buffer_view = None
+
class NoopLock(object):
def __enter__(self):
@functools.wraps(orig_func)
def must_be_writable_wrapper(self, *args, **kwargs):
if not self.writable():
- raise IOError((errno.EROFS, "Collection must be writable."))
+ raise IOError(errno.EROFS, "Collection is read-only.")
return orig_func(self, *args, **kwargs)
return must_be_writable_wrapper
Collection of ArvadosFiles.
"""
- def __init__(self, keep):
+
+ DEFAULT_PUT_THREADS = 2
+ DEFAULT_GET_THREADS = 2
+
+ def __init__(self, keep, copies=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = {}
self._put_queue = None
- self._put_errors = None
self._put_threads = None
self._prefetch_queue = None
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- self.num_put_threads = 2
- self.num_get_threads = 2
+ self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+ self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.copies = copies
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
def is_bufferblock(self, locator):
return locator in self._bufferblocks
+ def _commit_bufferblock_worker(self):
+ """Background uploader thread."""
+
+ while True:
+ try:
+ bufferblock = self._put_queue.get()
+ if bufferblock is None:
+ return
+
+ if self.copies is None:
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ else:
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
+ bufferblock.set_state(_BufferBlock.COMMITTED, loc)
+
+ except Exception as e:
+ bufferblock.set_state(_BufferBlock.ERROR, e)
+ finally:
+ if self._put_queue is not None:
+ self._put_queue.task_done()
+
+ @synchronized
+ def start_put_threads(self):
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+
+ self._put_threads = []
+ for i in xrange(0, self.num_put_threads):
+ thread = threading.Thread(target=self._commit_bufferblock_worker)
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ def _block_prefetch_worker(self):
+ """The background downloader thread."""
+ while True:
+ try:
+ b = self._prefetch_queue.get()
+ if b is None:
+ return
+ self._keep.get(b)
+ except Exception:
+ pass
+
+ @synchronized
+ def start_get_threads(self):
+ if self._prefetch_threads is None:
+ self._prefetch_queue = Queue.Queue()
+ self._prefetch_threads = []
+ for i in xrange(0, self.num_get_threads):
+ thread = threading.Thread(target=self._block_prefetch_worker)
+ self._prefetch_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+
@synchronized
def stop_threads(self):
"""Shut down and wait for background upload and download threads to finish."""
t.join()
self._put_threads = None
self._put_queue = None
- self._put_errors = None
if self._prefetch_threads is not None:
for t in self._prefetch_threads:
self._prefetch_threads = None
self._prefetch_queue = None
- def commit_bufferblock(self, block):
- """Initiate a background upload of a bufferblock.
+ def __enter__(self):
+ return self
- This will block if the upload queue is at capacity, otherwise it will
- return immediately.
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.stop_threads()
- """
+ def commit_bufferblock(self, block, sync):
+ """Initiate a background upload of a bufferblock.
- def commit_bufferblock_worker(self):
- """Background uploader thread."""
+ :block:
+ The block object to upload
- while True:
- try:
- bufferblock = self._put_queue.get()
- if bufferblock is None:
- return
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
- bufferblock.set_state(_BufferBlock.COMMITTED, loc)
+ :sync:
+ If `sync` is True, upload the block synchronously.
+ If `sync` is False, upload the block asynchronously. This will
+ return immediately unless the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.
- except Exception as e:
- self._put_errors.put((bufferblock.locator(), e))
- finally:
- if self._put_queue is not None:
- self._put_queue.task_done()
+ """
- with self.lock:
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
- self._put_errors = Queue.Queue()
-
- self._put_threads = []
- for i in xrange(0, self.num_put_threads):
- thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
- self._put_threads.append(thread)
- thread.daemon = True
- thread.start()
-
- if block.state() == _BufferBlock.WRITABLE:
+ try:
# Mark the block as PENDING so to disallow any more appends.
block.set_state(_BufferBlock.PENDING)
+ except StateChangeError as e:
+ if e.state == _BufferBlock.PENDING:
+ if sync:
+ block.wait_for_commit.wait()
+ else:
+ return
+ if block.state() == _BufferBlock.COMMITTED:
+ return
+ elif block.state() == _BufferBlock.ERROR:
+ raise block.error
+ else:
+ raise
+
+ if sync:
+ try:
+ if self.copies is None:
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ else:
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
+ block.set_state(_BufferBlock.COMMITTED, loc)
+ except Exception as e:
+ block.set_state(_BufferBlock.ERROR, e)
+ raise
+ else:
+ self.start_put_threads()
self._put_queue.put(block)
@synchronized
def get_bufferblock(self, locator):
return self._bufferblocks.get(locator)
+ @synchronized
+ def delete_bufferblock(self, locator):
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
+
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
def commit_all(self):
"""Commit all outstanding buffer blocks.
- Unlike commit_bufferblock(), this is a synchronous call, and will not
- return until all buffer blocks are uploaded. Raises
- KeepWriteError() if any blocks failed to upload.
+ This is a synchronous call, and will not return until all buffer blocks
+ are uploaded. Raises KeepWriteError() if any blocks failed to upload.
"""
with self.lock:
items = self._bufferblocks.items()
for k,v in items:
- v.owner.flush()
+ if v.state() != _BufferBlock.COMMITTED:
+ v.owner.flush(sync=False)
with self.lock:
if self._put_queue is not None:
self._put_queue.join()
- if not self._put_errors.empty():
- err = []
- try:
- while True:
- err.append(self._put_errors.get(False))
- except Queue.Empty:
- pass
+ err = []
+ for k,v in items:
+ if v.state() == _BufferBlock.ERROR:
+ err.append((v.locator(), v.error))
+ if err:
raise KeepWriteError("Error writing some blocks", err, label="block")
+ for k,v in items:
+ # flush again with sync=True to remove committed bufferblocks from
+ # the segments.
+ if v.owner:
+ v.owner.flush(sync=True)
+
def block_prefetch(self, locator):
"""Initiate a background download of a block.
if not self.prefetch_enabled:
return
- def block_prefetch_worker(self):
- """The background downloader thread."""
- while True:
- try:
- b = self._prefetch_queue.get()
- if b is None:
- return
- self._keep.get(b)
- except Exception:
- pass
+ if self._keep.get_from_cache(locator) is not None:
+ return
with self.lock:
if locator in self._bufferblocks:
return
- if self._prefetch_threads is None:
- self._prefetch_queue = Queue.Queue()
- self._prefetch_threads = []
- for i in xrange(0, self.num_get_threads):
- thread = threading.Thread(target=block_prefetch_worker, args=(self,))
- self._prefetch_threads.append(thread)
- thread.daemon = True
- thread.start()
+
+ self.start_get_threads()
self._prefetch_queue.put(locator)
"""
- def __init__(self, parent, stream=[], segments=[]):
+ def __init__(self, parent, name, stream=[], segments=[]):
"""
ArvadosFile constructor.
a list of Range objects representing segments
"""
self.parent = parent
- self._modified = True
+ self.name = name
+ self._committed = False
self._segments = []
self.lock = parent.root_collection().lock
for s in segments:
return copy.copy(self._segments)
@synchronized
- def clone(self, new_parent):
+ def clone(self, new_parent, new_name):
"""Make a copy of this file."""
- cp = ArvadosFile(new_parent)
+ cp = ArvadosFile(new_parent, new_name)
cp.replace_contents(self)
return cp
self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
- self._modified = True
+ self._committed = False
def __eq__(self, other):
if other is self:
return not self.__eq__(other)
@synchronized
- def set_unmodified(self):
- """Clear the modified flag"""
- self._modified = False
+ def set_committed(self):
+ """Set committed flag to False"""
+ self._committed = True
@synchronized
- def modified(self):
- """Test the modified flag"""
- return self._modified
+ def committed(self):
+ """Get whether this is committed or not."""
+ return self._committed
@must_be_writable
@synchronized
# segment is past the trucate size, all done
break
elif size < range_end:
- nr = Range(r.locator, r.range_start, size - r.range_start)
+ nr = Range(r.locator, r.range_start, size - r.range_start, 0)
nr.segment_offset = r.segment_offset
new_segs.append(nr)
break
new_segs.append(r)
self._segments = new_segs
- self._modified = True
+ self._committed = False
elif size > self.size():
- raise IOError("truncate() does not support extending the file size")
-
+ raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
def readfrom(self, offset, size, num_retries, exact=False):
"""Read up to `size` bytes from the file starting at `offset`.
with self.lock:
if size == 0 or offset >= self.size():
return ''
- prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
readsegs = locators_and_ranges(self._segments, offset, size)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
- for lr in prefetch:
- self.parent._my_block_manager().block_prefetch(lr.locator)
-
+ locs = set()
data = []
for lr in readsegs:
block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
- data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ blockview = memoryview(block)
+ data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+ locs.add(lr.locator)
else:
break
+
+ for lr in prefetch:
+ if lr.locator not in locs:
+ self.parent._my_block_manager().block_prefetch(lr.locator)
+ locs.add(lr.locator)
+
return ''.join(data)
def _repack_writes(self, num_retries):
raise ArgumentError("Offset is past the end of the file")
if len(data) > config.KEEP_BLOCK_SIZE:
- raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
+ # Chunk it up into smaller writes
+ n = 0
+ dataview = memoryview(data)
+ while n < len(data):
+ self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
+ n += config.KEEP_BLOCK_SIZE
+ return
- self._modified = True
+ self._committed = False
if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self._repack_writes(num_retries)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
self._current_bblock.append(data)
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
+ self.parent.notify(WRITE, self.parent, self.name, (self, self))
+
+ return len(data)
+
@synchronized
- def flush(self, num_retries=0):
- if self._current_bblock:
- self._repack_writes(num_retries)
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ def flush(self, sync=True, num_retries=0):
+ """Flush the current bufferblock to Keep.
+
+ :sync:
+ If True, commit block synchronously, wait until buffer block has been written.
+ If False, commit block asynchronously, return immediately after putting block into
+ the keep put queue.
+ """
+ if self.committed():
+ return
+
+ if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
+ if self._current_bblock.state() == _BufferBlock.WRITABLE:
+ self._repack_writes(num_retries)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
+
+ if sync:
+ to_delete = set()
+ for s in self._segments:
+ bb = self.parent._my_block_manager().get_bufferblock(s.locator)
+ if bb:
+ if bb.state() != _BufferBlock.COMMITTED:
+ self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
+ to_delete.add(s.locator)
+ s.locator = bb.locator()
+ for s in to_delete:
+ self.parent._my_block_manager().delete_bufferblock(s)
+
+ self.parent.notify(MOD, self.parent, self.name, (self, self))
@must_be_writable
@synchronized
def _add_segment(self, blocks, pos, size):
"""Internal implementation of add_segment."""
- self._modified = True
+ self._committed = False
for lr in locators_and_ranges(blocks, pos, size):
- last = self._segments[-1] if self._segments else Range(0, 0, 0)
+ last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
self._segments.append(r)
buf += "\n"
return buf
+ @must_be_writable
+ @synchronized
+ def _reparent(self, newparent, newname):
+ self._committed = False
+ self.flush(sync=True)
+ self.parent.remove(self.name)
+ self.parent = newparent
+ self.name = newname
+ self.lock = self.parent.root_collection().lock
+
class ArvadosFileReader(ArvadosFileReaderBase):
"""Wraps ArvadosFile in a file-like object supporting reading only.
"""
- def __init__(self, arvadosfile, name, mode="r", num_retries=None):
- super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
+ def __init__(self, arvadosfile, num_retries=None):
+ super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
self.arvadosfile = arvadosfile
def size(self):
@_FileLikeObjectBase._before_close
@retry_method
- def read(self, size, num_retries=None):
- """Read up to `size` bytes from the stream, starting at the current file position."""
- data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
- self._filepos += len(data)
- return data
+ def read(self, size=None, num_retries=None):
+ """Read up to `size` bytes from the file and return the result.
+
+ Starts at the current file position. If `size` is None, read the
+ entire remainder of the file.
+ """
+ if size is None:
+ data = []
+ rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+ while rd:
+ data.append(rd)
+ self._filepos += len(rd)
+ rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+ return ''.join(data)
+ else:
+ data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
+ self._filepos += len(data)
+ return data
@_FileLikeObjectBase._before_close
@retry_method
def readfrom(self, offset, size, num_retries=None):
- """Read up to `size` bytes from the stream, starting at the current file position."""
+ """Read up to `size` bytes from the stream, starting at the specified file offset.
+
+ This method does not change the file position.
+ """
return self.arvadosfile.readfrom(offset, size, num_retries)
def flush(self):
"""
- def __init__(self, arvadosfile, name, mode, num_retries=None):
- super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
+ def __init__(self, arvadosfile, mode, num_retries=None):
+ super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
+ self.mode = mode
@_FileLikeObjectBase._before_close
@retry_method
else:
self.arvadosfile.writeto(self._filepos, data, num_retries)
self._filepos += len(data)
+ return len(data)
@_FileLikeObjectBase._before_close
@retry_method
def writelines(self, seq, num_retries=None):
for s in seq:
- self.write(s, num_retries)
+ self.write(s, num_retries=num_retries)
@_FileLikeObjectBase._before_close
def truncate(self, size=None):