from arvados.retry import retry_method
import config
import hashlib
-import hashlib
import threading
import Queue
import copy
from _normalize_stream import normalize_stream
def split(path):
- """Separate the stream name and file name in a /-separated stream path and
- return a tuple (stream_name, file_name).
+ """split(path) -> streamname, filename
- If no stream name is available, assume '.'.
+ Separate the stream name and file name in a /-separated stream path and
+ return a tuple (stream_name, file_name). If no stream name is available,
+ assume '.'.
"""
try:
class ArvadosFileReaderBase(_FileLikeObjectBase):
- class _NameAttribute(str):
- # The Python file API provides a plain .name attribute.
- # Older SDK provided a name() method.
- # This class provides both, for maximum compatibility.
- def __call__(self):
- return self
-
def __init__(self, name, mode, num_retries=None):
- super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
+ super(ArvadosFileReaderBase, self).__init__(name, mode)
self._filepos = 0L
self.num_retries = num_retries
self._readline_cache = (None, None)
class StreamFileReader(ArvadosFileReaderBase):
+ class _NameAttribute(str):
+ # The Python file API provides a plain .name attribute.
+ # Older SDK provided a name() method.
+ # This class provides both, for maximum compatibility.
+ def __call__(self):
+ return self
+
def __init__(self, stream, segments, name):
- super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
+ super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
self._stream = stream
self.segments = segments
available_chunks = locators_and_ranges(self.segments, self._filepos, size)
if available_chunks:
lr = available_chunks[0]
- data = self._stream._readfrom(lr.locator+lr.segment_offset,
+ data = self._stream.readfrom(lr.locator+lr.segment_offset,
lr.segment_size,
num_retries=num_retries)
data = []
for lr in locators_and_ranges(self.segments, start, size):
- data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
+ data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
num_retries=num_retries))
return ''.join(data)
return synchronized_wrapper
class _BufferBlock(object):
- """A BufferBlock is a stand-in for a Keep block that is in the process of being
- written.
+ """A stand-in for a Keep block that is in the process of being written.
Writers can append to it, get the size, and compute the Keep locator.
There are three valid states:
self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
return self._locator
+ @synchronized
+ def clone(self, new_blockid, owner):
+ if self._state == _BufferBlock.COMMITTED:
+ raise AssertionError("Can only duplicate a writable or pending buffer block")
+ bufferblock = _BufferBlock(new_blockid, self.size(), owner)
+ bufferblock.append(self.buffer_view[0:self.size()])
+ return bufferblock
+
class NoopLock(object):
def __enter__(self):
def release(self):
pass
-SYNC_READONLY = 1
-SYNC_EXPLICIT = 2
-SYNC_LIVE = 3
-
def must_be_writable(orig_func):
@functools.wraps(orig_func)
def must_be_writable_wrapper(self, *args, **kwargs):
- if self.sync_mode() == SYNC_READONLY:
- raise IOError((errno.EROFS, "Collection is read only"))
+ if not self.writable():
+ raise IOError((errno.EROFS, "Collection must be writable."))
return orig_func(self, *args, **kwargs)
return must_be_writable_wrapper
class _BlockManager(object):
- """BlockManager handles buffer blocks, background block uploads, and background
- block prefetch for a Collection of ArvadosFiles.
+ """BlockManager handles buffer blocks.
+
+ Also handles background block uploads, and background block prefetch for a
+ Collection of ArvadosFiles.
"""
def __init__(self, keep):
@synchronized
def dup_block(self, block, owner):
- """Create a new bufferblock in WRITABLE state, initialized with the content of
- an existing bufferblock.
+ """Create a new bufferblock initialized with the content of an existing bufferblock.
:block:
the buffer block to copy.
"""
new_blockid = "bufferblock%i" % len(self._bufferblocks)
- with block.lock:
- if block._state == _BufferBlock.COMMITTED:
- raise AssertionError("Can only duplicate a writable or pending buffer block")
-
- bufferblock = _BufferBlock(new_blockid, block.size(), owner)
- bufferblock.append(block.buffer_view[0:block.size()])
+ bufferblock = block.clone(new_blockid, owner)
self._bufferblocks[bufferblock.blockid] = bufferblock
return bufferblock
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
- print e
self._put_errors.put((bufferblock.locator(), e))
finally:
if self._put_queue is not None:
err.append(self._put_errors.get(False))
except Queue.Empty:
pass
- raise KeepWriteError("Error writing some blocks", err)
+ raise KeepWriteError("Error writing some blocks", err, label="block")
def block_prefetch(self, locator):
"""Initiate a background download of a block.
if b is None:
return
self._keep.get(b)
- except:
+ except Exception:
pass
with self.lock:
class ArvadosFile(object):
- """ArvadosFile manages the underlying representation of a file in Keep as a
+ """Represent a file in a Collection.
+
+ ArvadosFile manages the underlying representation of a file in Keep as a
sequence of segments spanning a set of blocks, and implements random
read/write access.
self._add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
- def sync_mode(self):
- return self.parent.sync_mode()
+ def writable(self):
+ return self.parent.writable()
@synchronized
def segments(self):
return ''.join(data)
def _repack_writes(self):
- """Test if the buffer block has more data than is referenced by actual
- segments.
+ """Test if the buffer block has more data than actual segments.
This happens when a buffered write over-writes a file range written in
a previous buffered write. Re-pack the buffer block for efficiency
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
+ @synchronized
+ def flush(self):
+ if self._current_bblock:
+ self._repack_writes()
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+
+
@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
- """Add a segment to the end of the file, with `pos` and `offset` referencing a
- section of the stream described by `blocks` (a list of Range objects)
+ """Add a segment to the end of the file.
+
+ `pos` and `offset` reference a section of the stream described by
+ `blocks` (a list of Range objects)
"""
self._add_segment(blocks, pos, size)
loc = KeepLocator(loc).stripped()
filestream.append(LocatorAndRange(loc, locator_block_size(loc),
segment.segment_offset, segment.range_size))
- stream[stream_name] = filestream
- buf += ' '.join(normalize_stream(stream_name, stream))
+ buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
buf += "\n"
return buf
for s in seq:
self.write(s, num_retries)
+ @_FileLikeObjectBase._before_close
def truncate(self, size=None):
if size is None:
size = self._filepos
if self._filepos > self.size():
self._filepos = self.size()
+ @_FileLikeObjectBase._before_close
+ def flush(self):
+ self.arvadosfile.flush()
+
+ @_FileLikeObjectBase._before_close
def close(self):
- if self.arvadosfile.parent.sync_mode() == SYNC_LIVE:
- self.arvadosfile.parent.root_collection().save()
+ self.flush()
+ super(ArvadosFileWriter, self).close()