+ @synchronized
+ def locator(self):
+ """The Keep locator for this buffer's contents."""
+ if self._locator is None:
+ self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
+ return self._locator
+
+ @synchronized
+ def clone(self, new_blockid, owner):
+ if self._state == _BufferBlock.COMMITTED:
+ raise AssertionError("Can only duplicate a writable or pending buffer block")
+ bufferblock = _BufferBlock(new_blockid, self.size(), owner)
+ bufferblock.append(self.buffer_view[0:self.size()])
+ return bufferblock
+
+
+class NoopLock(object):
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+ def acquire(self, blocking=False):
+ pass
+
+ def release(self):
+ pass
+
+
+def must_be_writable(orig_func):
+ @functools.wraps(orig_func)
+ def must_be_writable_wrapper(self, *args, **kwargs):
+ if not self.writable():
+ raise IOError(errno.EROFS, "Collection must be writable.")
+ return orig_func(self, *args, **kwargs)
+ return must_be_writable_wrapper
+
+
+class _BlockManager(object):
+ """BlockManager handles buffer blocks.
+
+ Also handles background block uploads, and background block prefetch for a
+ Collection of ArvadosFiles.
+
+ """
+ def __init__(self, keep):
+ """keep: KeepClient object to use"""
+ self._keep = keep
+ self._bufferblocks = {}
+ self._put_queue = None
+ self._put_errors = None
+ self._put_threads = None
+ self._prefetch_queue = None
+ self._prefetch_threads = None
+ self.lock = threading.Lock()
+ self.prefetch_enabled = True
+ self.num_put_threads = 2
+ self.num_get_threads = 2
+
+ @synchronized
+ def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
+ """Allocate a new, empty bufferblock in WRITABLE state and return it.
+
+ :blockid:
+ optional block identifier, otherwise one will be automatically assigned
+
+ :starting_capacity:
+ optional capacity, otherwise will use default capacity
+
+ :owner:
+ ArvadosFile that owns this block
+
+ """
+ if blockid is None:
+ blockid = "bufferblock%i" % len(self._bufferblocks)
+ bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
+ self._bufferblocks[bufferblock.blockid] = bufferblock
+ return bufferblock
+
+ @synchronized
+ def dup_block(self, block, owner):
+ """Create a new bufferblock initialized with the content of an existing bufferblock.
+
+ :block:
+ the buffer block to copy.
+
+ :owner:
+ ArvadosFile that owns the new block
+
+ """
+ new_blockid = "bufferblock%i" % len(self._bufferblocks)
+ bufferblock = block.clone(new_blockid, owner)
+ self._bufferblocks[bufferblock.blockid] = bufferblock
+ return bufferblock
+
+ @synchronized
+ def is_bufferblock(self, locator):
+ return locator in self._bufferblocks
+
+ @synchronized
+ def stop_threads(self):
+ """Shut down and wait for background upload and download threads to finish."""
+
+ if self._put_threads is not None:
+ for t in self._put_threads:
+ self._put_queue.put(None)
+ for t in self._put_threads:
+ t.join()
+ self._put_threads = None
+ self._put_queue = None
+ self._put_errors = None
+
+ if self._prefetch_threads is not None:
+ for t in self._prefetch_threads:
+ self._prefetch_queue.put(None)
+ for t in self._prefetch_threads:
+ t.join()
+ self._prefetch_threads = None
+ self._prefetch_queue = None
+
+ def commit_bufferblock(self, block, wait):
+ """Initiate a background upload of a bufferblock.
+
+ :block:
+ The block object to upload
+
+ :wait:
+ If `wait` is True, upload the block synchronously.
+ If `wait` is False, upload the block asynchronously. This will
+ return immediately unless if the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.
+
+ """
+
+ def commit_bufferblock_worker(self):
+ """Background uploader thread."""
+
+ while True:
+ try:
+ bufferblock = self._put_queue.get()
+ if bufferblock is None:
+ return
+
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ bufferblock.set_state(_BufferBlock.COMMITTED, loc)
+
+ except Exception as e:
+ self._put_errors.put((bufferblock.locator(), e))
+ finally:
+ if self._put_queue is not None:
+ self._put_queue.task_done()
+
+ if block.state() != _BufferBlock.WRITABLE:
+ return
+
+ if wait:
+ block.set_state(_BufferBlock.PENDING)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ block.set_state(_BufferBlock.COMMITTED, loc)
+ else:
+ with self.lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+ self._put_errors = Queue.Queue()
+
+ self._put_threads = []
+ for i in xrange(0, self.num_put_threads):
+ thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ self._put_queue.put(block)
+
+ @synchronized
+ def get_bufferblock(self, locator):
+ return self._bufferblocks.get(locator)
+
+ def get_block_contents(self, locator, num_retries, cache_only=False):
+ """Fetch a block.
+
+ First checks to see if the locator is a BufferBlock and return that, if
+ not, passes the request through to KeepClient.get().
+
+ """
+ with self.lock:
+ if locator in self._bufferblocks:
+ bufferblock = self._bufferblocks[locator]
+ if bufferblock.state() != _BufferBlock.COMMITTED:
+ return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
+ else:
+ locator = bufferblock._locator
+ if cache_only:
+ return self._keep.get_from_cache(locator)
+ else:
+ return self._keep.get(locator, num_retries=num_retries)
+
+ def commit_all(self):
+ """Commit all outstanding buffer blocks.
+
+ Unlike commit_bufferblock(), this is a synchronous call, and will not
+ return until all buffer blocks are uploaded. Raises
+ KeepWriteError() if any blocks failed to upload.
+
+ """
+ with self.lock:
+ items = self._bufferblocks.items()
+
+ for k,v in items:
+ if v.state() == _BufferBlock.WRITABLE:
+ v.owner.flush(False)
+
+ with self.lock:
+ if self._put_queue is not None:
+ self._put_queue.join()
+
+ if not self._put_errors.empty():
+ err = []
+ try:
+ while True:
+ err.append(self._put_errors.get(False))
+ except Queue.Empty:
+ pass
+ raise KeepWriteError("Error writing some blocks", err, label="block")
+
+ def block_prefetch(self, locator):
+ """Initiate a background download of a block.
+
+ This assumes that the underlying KeepClient implements a block cache,
+ so repeated requests for the same block will not result in repeated
+ downloads (unless the block is evicted from the cache.) This method
+ does not block.
+
+ """
+
+ if not self.prefetch_enabled:
+ return
+
+ def block_prefetch_worker(self):
+ """The background downloader thread."""
+ while True:
+ try:
+ b = self._prefetch_queue.get()
+ if b is None:
+ return
+ self._keep.get(b)
+ except Exception:
+ pass
+
+ with self.lock:
+ if locator in self._bufferblocks:
+ return
+ if self._prefetch_threads is None:
+ self._prefetch_queue = Queue.Queue()
+ self._prefetch_threads = []
+ for i in xrange(0, self.num_get_threads):
+ thread = threading.Thread(target=block_prefetch_worker, args=(self,))
+ self._prefetch_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+ self._prefetch_queue.put(locator)