import Queue
import copy
import errno
+from .errors import KeepWriteError
def split(path):
- """split(path) -> streamname, filename
+ """Separate the stream name and file name in a /-separated stream path and
+ return a tuple (stream_name, file_name).
- Separate the stream name and file name in a /-separated stream path.
If no stream name is available, assume '.'.
+
"""
try:
stream_name, file_name = path.rsplit('/', 1)
@staticmethod
def _before_close(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def before_close_wrapper(self, *args, **kwargs):
if self.closed:
raise ValueError("I/O operation on closed stream file")
return orig_func(self, *args, **kwargs)
- return wrapper
+ return before_close_wrapper
def __enter__(self):
return self
class BufferBlock(object):
- """
- A BufferBlock is a stand-in for a Keep block that is in the process of being
- written. Writers can append to it, get the size, and compute the Keep locator.
+ """A BufferBlock is a stand-in for a Keep block that is in the process of being
+ written.
+ Writers can append to it, get the size, and compute the Keep locator.
There are three valid states:
WRITABLE
released, fetching the block will fetch it via keep client (since we
discarded the internal copy), and identifiers referring to the BufferBlock
can be replaced with the block locator.
+
"""
+
WRITABLE = 0
PENDING = 1
COMMITTED = 2
:owner:
ArvadosFile that owns this block
+
"""
self.blockid = blockid
self.buffer_block = bytearray(starting_capacity)
self.owner = owner
def append(self, data):
- """
- Append some data to the buffer. Only valid if the block is in WRITABLE
- state. Implements an expanding buffer, doubling capacity as needed to
- accomdate all the data.
+ """Append some data to the buffer.
+
+ Only valid if the block is in WRITABLE state. Implements an expanding
+ buffer, doubling capacity as needed to accomdate all the data.
+
"""
if self.state == BufferBlock.WRITABLE:
while (self.write_pointer+len(data)) > len(self.buffer_block):
raise AssertionError("Buffer block is not writable")
def size(self):
- """Amount of data written to the buffer"""
+ """The amount of data written to the buffer."""
return self.write_pointer
def locator(self):
return self._locator
-class AsyncKeepWriteErrors(Exception):
- """
- Roll up one or more Keep write exceptions (generated by background
- threads) into a single one.
- """
- def __init__(self, errors):
- self.errors = errors
-
- def __repr__(self):
- return "\n".join(self.errors)
-
def synchronized(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def synchronized_wrapper(self, *args, **kwargs):
with self.lock:
return orig_func(self, *args, **kwargs)
- return wrapper
+ return synchronized_wrapper
class NoopLock(object):
def __enter__(self):
def must_be_writable(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def must_be_writable_wrapper(self, *args, **kwargs):
if self.sync_mode() == SYNC_READONLY:
raise IOError((errno.EROFS, "Collection is read only"))
return orig_func(self, *args, **kwargs)
- return wrapper
+ return must_be_writable_wrapper
class BlockManager(object):
- """
- BlockManager handles buffer blocks, background block uploads, and
- background block prefetch for a Collection of ArvadosFiles.
+ """BlockManager handles buffer blocks, background block uploads, and background
+ block prefetch for a Collection of ArvadosFiles.
+
"""
def __init__(self, keep):
"""keep: KeepClient object to use"""
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
- """
- Allocate a new, empty bufferblock in WRITABLE state and return it.
+ """Allocate a new, empty bufferblock in WRITABLE state and return it.
:blockid:
optional block identifier, otherwise one will be automatically assigned
:owner:
ArvadosFile that owns this block
+
"""
if blockid is None:
blockid = "bufferblock%i" % len(self._bufferblocks)
- bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
- self._bufferblocks[bb.blockid] = bb
- return bb
+ bufferblock = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
+ self._bufferblocks[bufferblock.blockid] = bufferblock
+ return bufferblock
@synchronized
def dup_block(self, blockid, owner):
- """
- Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
+ """Create a new bufferblock in WRITABLE state, initialized with the content of
+ an existing bufferblock.
:blockid:
the block to copy. May be an existing buffer block id.
:owner:
ArvadosFile that owns the new block
+
"""
new_blockid = "bufferblock%i" % len(self._bufferblocks)
block = self._bufferblocks[blockid]
- bb = BufferBlock(new_blockid, len(block), owner)
- bb.append(block)
- self._bufferblocks[bb.blockid] = bb
- return bb
+ bufferblock = BufferBlock(new_blockid, len(block), owner)
+ bufferblock.append(block)
+ self._bufferblocks[bufferblock.blockid] = bufferblock
+ return bufferblock
@synchronized
- def is_bufferblock(self, id):
- return id in self._bufferblocks
+ def is_bufferblock(self, locator):
+ return locator in self._bufferblocks
@synchronized
def stop_threads(self):
- """
- Shut down and wait for background upload and download threads to finish.
- """
+ """Shut down and wait for background upload and download threads to finish."""
+
if self._put_threads is not None:
for t in self._put_threads:
self._put_queue.put(None)
self._prefetch_queue = None
def commit_bufferblock(self, block):
+ """Initiate a background upload of a bufferblock.
+
+ This will block if the upload queue is at capacity, otherwise it will
+ return immediately.
+
"""
- Initiate a background upload of a bufferblock. This will block if the
- upload queue is at capacity, otherwise it will return immediately.
- """
- def worker(self):
- """
- Background uploader thread.
- """
+ def commit_bufferblock_worker(self):
+ """Background uploader thread."""
+
while True:
try:
- b = self._put_queue.get()
- if b is None:
+ bufferblock = self._put_queue.get()
+ if bufferblock is None:
return
- b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
- b.state = BufferBlock.COMMITTED
- b.buffer_view = None
- b.buffer_block = None
+ bufferblock._locator = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ bufferblock.state = BufferBlock.COMMITTED
+ bufferblock.buffer_view = None
+ bufferblock.buffer_block = None
except Exception as e:
print e
- self._put_errors.put(e)
+ self._put_errors.put((bufferblock.locator(), e))
finally:
if self._put_queue is not None:
self._put_queue.task_done()
self._put_threads = []
for i in xrange(0, self.num_put_threads):
- t = threading.Thread(target=worker, args=(self,))
- self._put_threads.append(t)
- t.daemon = True
- t.start()
+ thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
# Mark the block as PENDING so to disallow any more appends.
block.state = BufferBlock.PENDING
self._put_queue.put(block)
def get_block(self, locator, num_retries, cache_only=False):
- """
- Fetch a block. First checks to see if the locator is a BufferBlock and
- return that, if not, passes the request through to KeepClient.get().
+ """Fetch a block.
+
+ First checks to see if the locator is a BufferBlock and return that, if
+ not, passes the request through to KeepClient.get().
+
"""
with self.lock:
if locator in self._bufferblocks:
- bb = self._bufferblocks[locator]
- if bb.state != BufferBlock.COMMITTED:
- return bb.buffer_view[0:bb.write_pointer].tobytes()
+ bufferblock = self._bufferblocks[locator]
+ if bufferblock.state != BufferBlock.COMMITTED:
+ return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
else:
- locator = bb._locator
- return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
+ locator = bufferblock._locator
+ if cache_only:
+ return self._keep.get_from_cache(locator)
+ else:
+ return self._keep.get(locator, num_retries=num_retries)
def commit_all(self):
- """
- Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this
- is a synchronous call, and will not return until all buffer blocks are
- uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to
- upload.
+ """Commit all outstanding buffer blocks.
+
+ Unlike commit_bufferblock(), this is a synchronous call, and will not
+ return until all buffer blocks are uploaded. Raises
+ KeepWriteError() if any blocks failed to upload.
+
"""
with self.lock:
items = self._bufferblocks.items()
if self._put_queue is not None:
self._put_queue.join()
if not self._put_errors.empty():
- e = []
+ err = []
try:
while True:
- e.append(self._put_errors.get(False))
+ err.append(self._put_errors.get(False))
except Queue.Empty:
pass
- raise AsyncKeepWriteErrors(e)
+ raise KeepWriteError("Error writing some blocks", err)
def block_prefetch(self, locator):
- """
- Initiate a background download of a block. This assumes that the
- underlying KeepClient implements a block cache, so repeated requests
- for the same block will not result in repeated downloads (unless the
- block is evicted from the cache.) This method does not block.
+ """Initiate a background download of a block.
+
+ This assumes that the underlying KeepClient implements a block cache,
+ so repeated requests for the same block will not result in repeated
+ downloads (unless the block is evicted from the cache.) This method
+ does not block.
+
"""
if not self.prefetch_enabled:
return
- def worker(self):
- """Background downloader thread."""
+ def block_prefetch_worker(self):
+ """The background downloader thread."""
while True:
try:
b = self._prefetch_queue.get()
self._prefetch_queue = Queue.Queue()
self._prefetch_threads = []
for i in xrange(0, self.num_get_threads):
- t = threading.Thread(target=worker, args=(self,))
- self._prefetch_threads.append(t)
- t.daemon = True
- t.start()
+ thread = threading.Thread(target=block_prefetch_worker, args=(self,))
+ self._prefetch_threads.append(thread)
+ thread.daemon = True
+ thread.start()
self._prefetch_queue.put(locator)
class ArvadosFile(object):
"""ArvadosFile manages the underlying representation of a file in Keep as a
sequence of segments spanning a set of blocks, and implements random
- read/write access. This object may be accessed from multiple threads.
+ read/write access.
+
+ This object may be accessed from multiple threads.
"""
def __init__(self, parent, stream=[], segments=[]):
"""
+ ArvadosFile constructor.
+
:stream:
a list of Range objects representing a block stream
@must_be_writable
@synchronized
def truncate(self, size):
- """
- Adjust the size of the file. If `size` is less than the size of the file,
- the file contents after `size` will be discarded. If `size` is greater
- than the current size of the file, an IOError will be raised.
+ """Shrink the size of the file.
+
+ If `size` is less than the size of the file, the file contents after
+ `size` will be discarded. If `size` is greater than the current size
+ of the file, an IOError will be raised.
+
"""
if size < self.size():
new_segs = []
raise IOError("truncate() does not support extending the file size")
def readfrom(self, offset, size, num_retries):
- """
- read upto `size` bytes from the file starting at `offset`.
- """
+ """Read upto `size` bytes from the file starting at `offset`."""
+
with self.lock:
if size == 0 or offset >= self.size():
return ''
data = []
for lr in readsegs:
- d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
- if d:
- data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ block = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
+ if block:
+ data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
else:
break
return ''.join(data)
def _repack_writes(self):
- """
- Test if the buffer block has more data than is referenced by actual segments
- (this happens when a buffered write over-writes a file range written in
- a previous buffered write). Re-pack the buffer block for efficiency
+ """Test if the buffer block has more data than is referenced by actual
+ segments.
+
+ This happens when a buffered write over-writes a file range written in
+ a previous buffered write. Re-pack the buffer block for efficiency
and to avoid leaking information.
+
"""
segs = self._segments
@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
- """
- Write `data` to the file starting at `offset`. This will update
- existing bytes and/or extend the size of the file as necessary.
+ """Write `data` to the file starting at `offset`.
+
+ This will update existing bytes and/or extend the size of the file as
+ necessary.
+
"""
if len(data) == 0:
return
@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
- """
- Add a segment to the end of the file, with `pos` and `offset` referencing a
+ """Add a segment to the end of the file, with `pos` and `offset` referencing a
section of the stream described by `blocks` (a list of Range objects)
+
"""
self._add_segment(blocks, pos, size)
def _add_segment(self, blocks, pos, size):
- """
- (Internal version.)
- """
+ """Internal implementation of add_segment."""
self._modified = True
for lr in locators_and_ranges(blocks, pos, size):
last = self._segments[-1] if self._segments else Range(0, 0, 0)
@synchronized
def size(self):
- """Get the file size"""
+ """Get the file size."""
if self._segments:
n = self._segments[-1]
return n.range_start + n.range_size
return 0
class ArvadosFileReader(ArvadosFileReaderBase):
- """Wraps ArvadosFile in a file-like object supporting reading only. Be aware
- that this class is NOT thread safe as there is no locking around updating file
- pointer.
+ """Wraps ArvadosFile in a file-like object supporting reading only.
+
+ Be aware that this class is NOT thread safe as there is no locking around
+ updating file pointer.
+
"""
def __init__(self, arvadosfile, name, mode="r", num_retries=None):
@ArvadosFileBase._before_close
@retry_method
def read(self, size, num_retries=None):
- """Read up to `size` bytes from the stream, starting at the current file position"""
+ """Read up to `size` bytes from the stream, starting at the current file position."""
data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
self._filepos += len(data)
return data
@ArvadosFileBase._before_close
@retry_method
def readfrom(self, offset, size, num_retries=None):
- """Read up to `size` bytes from the stream, starting at the current file position"""
+ """Read up to `size` bytes from the stream, starting at the current file position."""
return self.arvadosfile.readfrom(offset, size, num_retries)
def flush(self):
class ArvadosFileWriter(ArvadosFileReader):
"""Wraps ArvadosFile in a file-like object supporting both reading and writing.
+
Be aware that this class is NOT thread safe as there is no locking around
updating file pointer.
from keep import *
from .stream import StreamReader, normalize_stream, locator_block_size
from .ranges import Range, LocatorAndRange
-from .safeapi import SafeApi
+from .safeapi import ThreadSafeApiCache
import config
import errors
import util
def _populate_first(orig_func):
# Decorator for methods that read actual Collection data.
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def populate_first_wrapper(self, *args, **kwargs):
if self._streams is None:
self._populate()
return orig_func(self, *args, **kwargs)
- return wrapper
+ return populate_first_wrapper
@_populate_first
def api_response(self):
ADD = "add"
DEL = "del"
MOD = "mod"
+FILE = "file"
+COLLECTION = "collection"
class SynchronizedCollectionBase(CollectionBase):
- """Base class for Collections and Subcollections. Implements the majority of
- functionality relating to accessing items in the Collection."""
+ """Base class for Collections and Subcollections.
+
+ Implements the majority of functionality relating to accessing items in the
+ Collection.
+
+ """
def __init__(self, parent=None):
self.parent = parent
raise NotImplementedError()
@synchronized
- def find(self, path, create=False, create_collection=False):
- """Recursively search the specified file path. May return either a Collection
- or ArvadosFile.
+ def _find(self, path, create, create_collection):
+ """Internal method. Don't use this. Use `find()` or `find_or_create()`
+ instead.
+
+ Recursively search the specified file path. May return either a
+ Collection or ArvadosFile. Return None if not found and create=False.
+ Will create a new item if create=True.
:create:
If true, create path components (i.e. Collections) that are
component.
"""
+
if create and self.sync_mode() == SYNC_READONLY:
raise IOError((errno.EROFS, "Collection is read only"))
self.notify(ADD, self, p[0], item)
del p[0]
if isinstance(item, SynchronizedCollectionBase):
- return item.find("/".join(p), create=create)
+ return item._find("/".join(p), create, create_collection)
else:
raise errors.ArgumentError("Interior path components must be subcollection")
else:
return self
+ def find(self, path):
+ """Recursively search the specified file path.
+
+ May return either a Collection or ArvadosFile. Return None if not
+ found.
+
+ """
+ return self._find(path, False, False)
+
+ def find_or_create(self, path, create_type):
+ """Recursively search the specified file path.
+
+ May return either a Collection or ArvadosFile. Will create a new item
+ at the specified path if none exists.
+
+ :create_type:
+ One of `arvado.collection.FILE` or
+ `arvado.collection.COLLECTION`. If the path is not found, and value
+ of create_type is FILE then create and return a new ArvadosFile for
+ the last path component. If COLLECTION, then create and return a new
+ Collection for the last path component.
+
+ """
+ return self._find(path, True, (create_type == COLLECTION))
+
def open(self, path, mode):
"""Open a file-like object for access.
if create and self.sync_mode() == SYNC_READONLY:
raise IOError((errno.EROFS, "Collection is read only"))
- f = self.find(path, create=create)
+ f = self._find(path, create, False)
if f is None:
raise IOError((errno.ENOENT, "File not found"))
@synchronized
def set_unmodified(self):
- """Recursively clear modified flag"""
+ """Recursively clear modified flag."""
self._modified = False
for k,v in self._items.items():
v.set_unmodified()
@synchronized
def __iter__(self):
"""Iterate over names of files and collections contained in this collection."""
- return self._items.keys().__iter__()
+ return iter(self._items.keys())
@synchronized
def iterkeys(self):
@synchronized
def __contains__(self, k):
"""If there is a file or collection a directly contained by this collection
- with name "k"."""
+ with name `k`."""
return k in self._items
@synchronized
def __len__(self):
- """Get the number of items directly contained in this collection"""
+ """Get the number of items directly contained in this collection."""
return len(self._items)
@must_be_writable
return self._items.items()
def exists(self, path):
- """Test if there is a file or collection at "path" """
+ """Test if there is a file or collection at `path`."""
return self.find(path) != None
@must_be_writable
@synchronized
- def remove(self, path, rm_r=False):
+ def remove(self, path, recursive=False):
"""Remove the file or subcollection (directory) at `path`.
- :rm_r:
+
+ :recursive:
Specify whether to remove non-empty subcollections (True), or raise an error (False).
"""
p = path.split("/")
if item is None:
raise IOError((errno.ENOENT, "File not found"))
if len(p) == 1:
- if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
+ if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not recursive:
raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
d = self._items[p[0]]
del self._items[p[0]]
if not target_name:
raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
- target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
+ target_dir = self.find_or_create("/".join(tp[0:-1]), COLLECTION)
with target_dir.lock:
if target_name in target_dir:
@must_be_writable
@synchronized
def apply(self, changes):
- """
- Apply changes from `diff`. If a change conflicts with a local change, it
- will be saved to an alternate path indicating the conflict.
+ """Apply changes from `diff`.
+
+ If a change conflicts with a local change, it will be saved to an
+ alternate path indicating the conflict.
+
"""
for c in changes:
path = c[1]
elif c[0] == DEL:
if local == initial:
# Local item matches "initial" value, so it is safe to remove.
- self.remove(path, rm_r=True)
+ self.remove(path, recursive=True)
# else, the file is modified or already removed, in either
# case we don't want to try to remove it.
num_retries=None,
block_manager=None,
sync=None):
- """:manifest_locator_or_text:
+ """CollectionRoot constructor.
+
+ :manifest_locator_or_text:
One of Arvados collection UUID, block locator of
a manifest, raw manifest text, or None (to create an empty collection).
:parent:
@synchronized
@retry_method
def update(self, other=None, num_retries=None):
+ """Fetch the latest collection record on the API server and merge it with the
+ current collection contents.
+
+ """
if other is None:
if self._manifest_locator is None:
raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
@synchronized
def _my_api(self):
if self._api_client is None:
- self._api_client = arvados.SafeApi(self._config)
+ self._api_client = ThreadSafeApiCache(self._config)
self._keep_client = self._api_client.keep
return self._api_client
return self
def __exit__(self, exc_type, exc_value, traceback):
- """Support scoped auto-commit in a with: block"""
+ """Support scoped auto-commit in a with: block."""
if self._sync != SYNC_READONLY and self._has_collection_uuid():
self.save()
if self._block_manager is not None:
@synchronized
def api_response(self):
- """
- api_response() -> dict or None
+ """Returns information about this Collection fetched from the API server.
- Returns information about this Collection fetched from the API server.
If the Collection exists in Keep but not the API server, currently
returns None. Future versions may provide a synthetic response.
+
"""
return self._api_response
def save(self, merge=True, num_retries=None):
"""Commit pending buffer blocks to Keep, merge with remote record (if
update=True), write the manifest to Keep, and update the collection
- record. Will raise AssertionError if not associated with a collection
- record on the API server. If you want to save a manifest to Keep only,
- see `save_new()`.
+ record.
+
+ Will raise AssertionError if not associated with a collection record on
+ the API server. If you want to save a manifest to Keep only, see
+ `save_new()`.
:update:
Update and merge remote changes before saving. Otherwise, any
@retry_method
def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
"""Commit pending buffer blocks to Keep, write the manifest to Keep, and create
- a new collection record (if create_collection_record True). After
- creating a new collection record, this Collection object will be
+ a new collection record (if create_collection_record True).
+
+ After creating a new collection record, this Collection object will be
associated with the new record for `save()` and SYNC_LIVE updates.
:name:
class Subcollection(SynchronizedCollectionBase):
"""This is a subdirectory within a collection that doesn't have its own API
- server record. It falls under the umbrella of the root collection."""
+ server record.
+
+ It falls under the umbrella of the root collection.
+
+ """
def __init__(self, parent):
super(Subcollection, self).__init__(parent)
keep=None,
num_retries=None,
sync=SYNC_READONLY):
- """Import a manifest into a `Collection`.
+ """Import a manifest into a `CollectionRoot`.
:manifest_text:
The manifest text to import from.
pos = long(s.group(1))
size = long(s.group(2))
name = s.group(3).replace('\\040', ' ')
- f = c.find("%s/%s" % (stream_name, name), create=True)
+ f = c.find_or_create("%s/%s" % (stream_name, name), FILE)
f.add_segment(blocks, pos, size)
else:
# error!
return c
def export_manifest(item, stream_name=".", portable_locators=False):
- """
+ """Export a manifest from the contents of a SynchronizedCollectionBase.
+
:item:
- Create a manifest for `item` (must be a `Collection` or `ArvadosFile`). If
+ Create a manifest for `item` (must be a `SynchronizedCollectionBase` or `ArvadosFile`). If
`item` is a is a `Collection`, this will also export subcollections.
:stream_name:
:portable_locators:
If True, strip any permission hints on block locators.
If False, use block locators as-is.
+
"""
buf = ""
if isinstance(item, SynchronizedCollectionBase):