- blockid = "bufferblock%i" % len(self._bufferblocks)
- bb = BufferBlock(blockid, starting_size=starting_size)
- self._bufferblocks[bb.blockid] = bb
- return bb
-
- def commit_bufferblock(self, block):
- def worker(self):
- while self._continue_worker:
- try:
- b = self._put_queue.get()
- b._locator = self._keep.put(item)
- b.state = BufferBlock.COMMITTED
- b.buffer_view = None
- b.buffer_block = None
- except Exception as e:
- self._error.put(e)
- finally:
- self._queue.task_done()
-
- if self._threads is None:
- self._put_queue = Queue.Queue()
- self._put_errors = Queue.Queue()
- self._threads = [threading.Thread(target=worker, args=(self,)),
- threading.Thread(target=worker, args=(self,))]
-
- block.state = BufferBlock.PENDING
- self._put_queue.put(block)
-
- def get_block(self, locator, num_retries):
- if locator in self._bufferblocks:
- bb = self._bufferblocks[locator]
- if bb.state != BufferBlock.COMMITTED:
- return bb.buffer_view[0:bb.write_pointer].tobytes()
+ blockid = str(uuid.uuid4())
+ bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
+ self._bufferblocks[bufferblock.blockid] = bufferblock
+ return bufferblock
+
+ @synchronized
+ def dup_block(self, block, owner):
+ """Create a new bufferblock initialized with the content of an existing bufferblock.
+
+ :block:
+ the buffer block to copy.
+
+ :owner:
+ ArvadosFile that owns the new block
+
+ """
+ new_blockid = str(uuid.uuid4())
+ bufferblock = block.clone(new_blockid, owner)
+ self._bufferblocks[bufferblock.blockid] = bufferblock
+ return bufferblock
+
+ @synchronized
+ def is_bufferblock(self, locator):
+ return locator in self._bufferblocks
+
+ def _commit_bufferblock_worker(self):
+ """Background uploader thread."""
+
+ while True:
+ try:
+ bufferblock = self._put_queue.get()
+ if bufferblock is None:
+ return
+
+ if self.copies is None:
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ else:
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
+ bufferblock.set_state(_BufferBlock.COMMITTED, loc)
+ except Exception as e:
+ bufferblock.set_state(_BufferBlock.ERROR, e)
+ finally:
+ if self._put_queue is not None:
+ self._put_queue.task_done()
+
+ def start_put_threads(self):
+ with self.threads_lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = queue.Queue(maxsize=2)
+
+ self._put_threads = []
+ for i in range(0, self.num_put_threads):
+ thread = threading.Thread(target=self._commit_bufferblock_worker)
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ def _block_prefetch_worker(self):
+ """The background downloader thread."""
+ while True:
+ try:
+ b = self._prefetch_queue.get()
+ if b is None:
+ return
+ self._keep.get(b)
+ except Exception:
+ _logger.exception("Exception doing block prefetch")
+
+ @synchronized
+ def start_get_threads(self):
+ if self._prefetch_threads is None:
+ self._prefetch_queue = queue.Queue()
+ self._prefetch_threads = []
+ for i in range(0, self.num_get_threads):
+ thread = threading.Thread(target=self._block_prefetch_worker)
+ self._prefetch_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+
+ @synchronized
+ def stop_threads(self):
+ """Shut down and wait for background upload and download threads to finish."""
+
+ if self._put_threads is not None:
+ for t in self._put_threads:
+ self._put_queue.put(None)
+ for t in self._put_threads:
+ t.join()
+ self._put_threads = None
+ self._put_queue = None
+
+ if self._prefetch_threads is not None:
+ for t in self._prefetch_threads:
+ self._prefetch_queue.put(None)
+ for t in self._prefetch_threads:
+ t.join()
+ self._prefetch_threads = None
+ self._prefetch_queue = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.stop_threads()
+
+ @synchronized
+ def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
+ """Packs small blocks together before uploading"""
+
+ self._pending_write_size += closed_file_size
+
+ # Check if there are enough small blocks for filling up one in full
+ if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+ return
+
+ # Search blocks ready for getting packed together before being
+ # committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that its
+ # size is <= KEEP_BLOCK_SIZE/2.
+ try:
+ small_blocks = [b for b in listvalues(self._bufferblocks)
+ if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ except AttributeError:
+ # Writable blocks without owner shouldn't exist.
+ raise UnownedBlockError()
+
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
+
+ for bb in small_blocks:
+ bb.repack_writes()
+
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
+
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
+ new_bb = self._alloc_bufferblock()
+ new_bb.owner = []
+ files = []
+ while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ bb = small_blocks.pop(0)
+ new_bb.owner.append(bb.owner)
+ self._pending_write_size -= bb.size()
+ new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+ files.append((bb, new_bb.write_pointer - bb.size()))
+
+ self.commit_bufferblock(new_bb, sync=sync)
+
+ for bb, new_bb_segment_offset in files:
+ newsegs = bb.owner.segments()
+ for s in newsegs:
+ if s.locator == bb.blockid:
+ s.locator = new_bb.blockid
+ s.segment_offset = new_bb_segment_offset+s.segment_offset
+ bb.owner.set_segments(newsegs)
+ self._delete_bufferblock(bb.blockid)
+
+ def commit_bufferblock(self, block, sync):
+ """Initiate a background upload of a bufferblock.
+
+ :block:
+ The block object to upload
+
+ :sync:
+ If `sync` is True, upload the block synchronously.
+ If `sync` is False, upload the block asynchronously. This will
+ return immediately unless the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.
+
+ """
+ try:
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ except StateChangeError as e:
+ if e.state == _BufferBlock.PENDING:
+ if sync:
+ block.wait_for_commit.wait()
+ else:
+ return
+ if block.state() == _BufferBlock.COMMITTED:
+ return
+ elif block.state() == _BufferBlock.ERROR:
+ raise block.error