+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+standard_library.install_aliases()
+from builtins import range
+from builtins import object
import functools
import os
import zlib
import bz2
-import config
+from . import config
import hashlib
import threading
-import Queue
+import queue
import copy
import errno
import re
stream_name, file_name = '.', path
return stream_name, file_name
+
+class UnownedBlockError(Exception):
+ """Raised when there's an writable block without an owner on the BlockManager."""
+ pass
+
+
class _FileLikeObjectBase(object):
def __init__(self, name, mode):
self.name = name
class ArvadosFileReaderBase(_FileLikeObjectBase):
def __init__(self, name, mode, num_retries=None):
super(ArvadosFileReaderBase, self).__init__(name, mode)
- self._filepos = 0L
+ self._filepos = 0
self.num_retries = num_retries
self._readline_cache = (None, None)
pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
- self._filepos = min(max(pos, 0L), self.size())
+ self._filepos = min(max(pos, 0), self.size())
def tell(self):
return self._filepos
if available_chunks:
lr = available_chunks[0]
data = self._stream.readfrom(lr.locator+lr.segment_offset,
- lr.segment_size,
- num_retries=num_retries)
+ lr.segment_size,
+ num_retries=num_retries)
self._filepos += len(data)
return data
# blocks pending. If they are full 64 MiB blocks, that means up to
# 256 MiB of internal buffering, which is the same size as the
# default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
+ self._put_queue = queue.Queue(maxsize=2)
self._put_threads = []
- for i in xrange(0, self.num_put_threads):
+ for i in range(0, self.num_put_threads):
thread = threading.Thread(target=self._commit_bufferblock_worker)
self._put_threads.append(thread)
thread.daemon = True
@synchronized
def start_get_threads(self):
if self._prefetch_threads is None:
- self._prefetch_queue = Queue.Queue()
+ self._prefetch_queue = queue.Queue()
self._prefetch_threads = []
- for i in xrange(0, self.num_get_threads):
+ for i in range(0, self.num_get_threads):
thread = threading.Thread(target=self._block_prefetch_worker)
self._prefetch_threads.append(thread)
thread.daemon = True
# A WRITABLE block always has an owner.
# A WRITABLE block with its owner.closed() implies that it's
# size is <= KEEP_BLOCK_SIZE/2.
- small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ try:
+ small_blocks = [b for b in list(self._bufferblocks.values()) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ except AttributeError:
+ # Writable blocks without owner shouldn't exist.
+ raise UnownedBlockError()
if len(small_blocks) <= 1:
# Not enough small blocks for repacking
self.repack_small_blocks(force=True, sync=True)
with self.lock:
- items = self._bufferblocks.items()
+ items = list(self._bufferblocks.items())
for k,v in items:
if v.state() != _BufferBlock.COMMITTED and v.owner:
self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
- self._committed = False
+ self.set_committed(False)
def __eq__(self, other):
if other is self:
with self.lock:
if len(self._segments) != len(othersegs):
return False
- for i in xrange(0, len(othersegs)):
+ for i in range(0, len(othersegs)):
seg1 = self._segments[i]
seg2 = othersegs[i]
loc1 = seg1.locator
self._segments = segs
@synchronized
- def set_committed(self):
- """Set committed flag to True"""
- self._committed = True
+ def set_committed(self, value=True):
+ """Set committed flag.
+
+ If value is True, set committed to be True.
+
+ If value is False, set committed to be False for this and all parents.
+ """
+ if value == self._committed:
+ return
+ self._committed = value
+ if self._committed is False and self.parent is not None:
+ self.parent.set_committed(False)
@synchronized
def committed(self):
"""
self._writers.remove(writer)
- if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
+ if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
# File writer closed, not small enough for repacking
self.flush()
elif self.closed():
new_segs.append(r)
self._segments = new_segs
- self._committed = False
+ self.set_committed(False)
elif size > self.size():
raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
n += config.KEEP_BLOCK_SIZE
return
- self._committed = False
+ self.set_committed(False)
if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
def _add_segment(self, blocks, pos, size):
"""Internal implementation of add_segment."""
- self._committed = False
+ self.set_committed(False)
for lr in locators_and_ranges(blocks, pos, size):
last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
return 0
@synchronized
- def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+ def manifest_text(self, stream_name=".", portable_locators=False,
+ normalize=False, only_committed=False):
buf = ""
filestream = []
for segment in self.segments:
loc = segment.locator
- if loc.startswith("bufferblock"):
+ if self.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
@must_be_writable
@synchronized
def _reparent(self, newparent, newname):
- self._committed = False
+ self.set_committed(False)
self.flush(sync=True)
self.parent.remove(self.name)
self.parent = newparent