- Replace a range with a new block.
- data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
- range_start: start of range
- range_size: size of range
- new_locator: locator for new block to be inserted
- !!! data_locators will be updated in place !!!
- '''
- if range_size == 0:
- return
-
- range_start = long(range_start)
- range_size = long(range_size)
- range_end = range_start + range_size
-
- last = data_locators[-1]
- if (last[OFFSET]+last[BLOCKSIZE]) == range_start:
- # append new block
- data_locators.append([new_locator, range_size, range_start])
- return
-
- i = first_block(data_locators, range_start, range_size, debug)
- if i is None:
- return
-
- while i < len(data_locators):
- locator, block_size, block_start = data_locators[i]
- block_end = block_start + block_size
- if debug:
- print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
- if range_end <= block_start:
- # range ends before this block starts, so don't look at any more locators
- break
-
- #if range_start >= block_end:
- # range starts after this block ends, so go to next block
- # we should always start at the first block due to the binary above, so this test is redundant
- #next
-
- if range_start >= block_start and range_end <= block_end:
- # range starts and ends in this block
- # split block into 3 pieces
- #resp.append([locator, block_size, range_start - block_start, range_size])
- pass
- elif range_start >= block_start and range_end > block_end:
- # range starts in this block
- # split block into 2 pieces
- #resp.append([locator, block_size, range_start - block_start, block_end - range_start])
- pass
- elif range_start < block_start and range_end > block_end:
- # range starts in a previous block and extends to further blocks
- # zero out this block
- #resp.append([locator, block_size, 0L, block_size])
- pass
- elif range_start < block_start and range_end <= block_end:
- # range starts in a previous block and ends in this block
- # split into 2 pieces
- #resp.append([locator, block_size, 0L, range_end - block_start])
- pass
- block_start = block_end
- i += 1
-
-
-def split(path):
- """split(path) -> streamname, filename
-
- Separate the stream name and file name in a /-separated stream path.
- If no stream name is available, assume '.'.
- """
- try:
- stream_name, file_name = path.rsplit('/', 1)
- except ValueError: # No / in string
- stream_name, file_name = '.', path
- return stream_name, file_name
-
-class StreamFileReader(ArvadosFileBase):
- class _NameAttribute(str):
- # The Python file API provides a plain .name attribute.
- # Older SDK provided a name() method.
- # This class provides both, for maximum compatibility.
- def __call__(self):
- return self
-
-
- def __init__(self, stream, segments, name):
- super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
- self._stream = stream
- self.segments = segments
- self._filepos = 0L
- self.num_retries = stream.num_retries
- self._readline_cache = (None, None)
-
- def __iter__(self):
- while True:
- data = self.readline()
- if not data:
- break
- yield data
-
- def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self.name)
-
- def stream_name(self):
- return self._stream.name()
-
- @ArvadosFileBase._before_close
- def seek(self, pos, whence=os.SEEK_CUR):
- if whence == os.SEEK_CUR:
- pos += self._filepos
- elif whence == os.SEEK_END:
- pos += self.size()
- self._filepos = min(max(pos, 0L), self._size())
-
- def tell(self):
- return self._filepos
-
- def _size(self):
- n = self.segments[-1]
- return n[OFFSET] + n[BLOCKSIZE]
-
- def size(self):
- return self._size()
-
- @ArvadosFileBase._before_close
- @retry_method
- def read(self, size, num_retries=None):
- """Read up to 'size' bytes from the stream, starting at the current file position"""
- if size == 0:
- return ''
-
- data = ''
- available_chunks = locators_and_ranges(self.segments, self._filepos, size)
- if available_chunks:
- locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
- data = self._stream._readfrom(locator+segmentoffset, segmentsize,
- num_retries=num_retries)
-
- self._filepos += len(data)
- return data
-
- @ArvadosFileBase._before_close
- @retry_method
- def readfrom(self, start, size, num_retries=None):
- """Read up to 'size' bytes from the stream, starting at 'start'"""
- if size == 0:
- return ''
-
- data = []
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
- data.append(self._stream._readfrom(locator+segmentoffset, segmentsize,
- num_retries=num_retries))
- return ''.join(data)
-
- @ArvadosFileBase._before_close
- @retry_method
- def readall(self, size=2**20, num_retries=None):
- while True:
- data = self.read(size, num_retries=num_retries)
- if data == '':
- break
- yield data
-
- @ArvadosFileBase._before_close
- @retry_method
- def readline(self, size=float('inf'), num_retries=None):
- cache_pos, cache_data = self._readline_cache
- if self.tell() == cache_pos:
- data = [cache_data]
- else:
- data = ['']
- data_size = len(data[-1])
- while (data_size < size) and ('\n' not in data[-1]):
- next_read = self.read(2 ** 20, num_retries=num_retries)
- if not next_read:
- break
- data.append(next_read)
- data_size += len(next_read)
- data = ''.join(data)
- try:
- nextline_index = data.index('\n') + 1
- except ValueError:
- nextline_index = len(data)
- nextline_index = min(nextline_index, size)
- self._readline_cache = (self.tell(), data[nextline_index:])
- return data[:nextline_index]
-
- @ArvadosFileBase._before_close
- @retry_method
- def decompress(self, decompress, size, num_retries=None):
- for segment in self.readall(size, num_retries):
- data = decompress(segment)
- if data:
- yield data