X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cc946c07eade09974423955f8d4b080941f53c7b..22286e8b81fa7644500e197b95e6d6417ed25f7e:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index ad52195f48..c46019a0d4 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -4,6 +4,11 @@ import zlib import bz2 from .ranges import * from arvados.retry import retry_method +import config +import hashlib +import hashlib +import threading +import Queue def split(path): """split(path) -> streamname, filename @@ -46,7 +51,7 @@ class ArvadosFileBase(object): self.closed = True -class StreamFileReader(ArvadosFileBase): +class ArvadosFileReaderBase(ArvadosFileBase): class _NameAttribute(str): # The Python file API provides a plain .name attribute. # Older SDK provided a name() method. @@ -54,13 +59,10 @@ class StreamFileReader(ArvadosFileBase): def __call__(self): return self - - def __init__(self, stream, segments, name): - super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb') - self._stream = stream - self.segments = segments + def __init__(self, name, mode, num_retries=None): + super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode) self._filepos = 0L - self.num_retries = stream.num_retries + self.num_retries = num_retries self._readline_cache = (None, None) def __iter__(self): @@ -73,57 +75,17 @@ class StreamFileReader(ArvadosFileBase): def decompressed_name(self): return re.sub('\.(bz2|gz)$', '', self.name) - def stream_name(self): - return self._stream.name() - @ArvadosFileBase._before_close def seek(self, pos, whence=os.SEEK_CUR): if whence == os.SEEK_CUR: pos += self._filepos elif whence == os.SEEK_END: pos += self.size() - self._filepos = min(max(pos, 0L), self._size()) + self._filepos = min(max(pos, 0L), self.size()) def tell(self): return self._filepos - def _size(self): - n = self.segments[-1] - return n[OFFSET] + n[BLOCKSIZE] - - def size(self): - return self._size() - - @ArvadosFileBase._before_close - @retry_method - def read(self, size, num_retries=None): - """Read up to 'size' bytes from the stream, starting at the current file position""" - if size == 0: - return '' - - data = '' - available_chunks = locators_and_ranges(self.segments, self._filepos, size) - if available_chunks: - locator, blocksize, segmentoffset, segmentsize = available_chunks[0] - data = self._stream._readfrom(locator+segmentoffset, segmentsize, - num_retries=num_retries) - - self._filepos += len(data) - return data - - @ArvadosFileBase._before_close - @retry_method - def readfrom(self, start, size, num_retries=None): - """Read up to 'size' bytes from the stream, starting at 'start'""" - if size == 0: - return '' - - data = [] - for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size): - data.append(self._stream._readfrom(locator+segmentoffset, segmentsize, - num_retries=num_retries)) - return ''.join(data) - @ArvadosFileBase._before_close @retry_method def readall(self, size=2**20, num_retries=None): @@ -192,75 +154,368 @@ class StreamFileReader(ArvadosFileBase): break return ''.join(data).splitlines(True) + +class StreamFileReader(ArvadosFileReaderBase): + def __init__(self, stream, segments, name): + super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries) + self._stream = stream + self.segments = segments + + def stream_name(self): + return self._stream.name() + + def size(self): + n = self.segments[-1] + return n.range_start + n.range_size + + @ArvadosFileBase._before_close + @retry_method + def read(self, size, num_retries=None): + """Read up to 'size' bytes from the stream, starting at the current file position""" + if size == 0: + return '' + + data = '' + available_chunks = locators_and_ranges(self.segments, self._filepos, size) + if available_chunks: + lr = available_chunks[0] + data = self._stream._readfrom(lr.locator+lr.segment_offset, + lr.segment_size, + num_retries=num_retries) + + self._filepos += len(data) + return data + + @ArvadosFileBase._before_close + @retry_method + def readfrom(self, start, size, num_retries=None): + """Read up to 'size' bytes from the stream, starting at 'start'""" + if size == 0: + return '' + + data = [] + for lr in locators_and_ranges(self.segments, start, size): + data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size, + num_retries=num_retries)) + return ''.join(data) + def as_manifest(self): - manifest_text = ['.'] - manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators]) - manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments]) - return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True) + from stream import normalize_stream + segs = [] + for r in self.segments: + segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) + return " ".join(normalize_stream(".", {self.name: segs})) + "\n" + + +class BufferBlock(object): + WRITABLE = 0 + PENDING = 1 + COMMITTED = 2 + + def __init__(self, blockid, starting_size): + self.blockid = blockid + self.buffer_block = bytearray(starting_size) + self.buffer_view = memoryview(self.buffer_block) + self.write_pointer = 0 + self.state = BufferBlock.WRITABLE + self._locator = None + + def append(self, data): + if self.state == BufferBlock.WRITABLE: + while (self.write_pointer+len(data)) > len(self.buffer_block): + new_buffer_block = bytearray(len(self.buffer_block) * 2) + new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] + self.buffer_block = new_buffer_block + self.buffer_view = memoryview(self.buffer_block) + self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data + self.write_pointer += len(data) + self._locator = None + else: + raise AssertionError("Buffer block is not writable") + def size(self): + return self.write_pointer + + def locator(self): + if self._locator is None: + self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) + return self._locator + +class AsyncKeepWriteErrors(Exception): + def __init__(self, errors): + self.errors = errors + + def __repr__(self): + return "\n".join(self.errors) + +class BlockManager(object): + def __init__(self, keep): + self._keep = keep + self._bufferblocks = {} + self._put_queue = None + self._put_errors = None + self._put_threads = None + self._prefetch_queue = None + self._prefetch_threads = None + + def alloc_bufferblock(self, blockid=None, starting_size=2**14): + if blockid is None: + blockid = "bufferblock%i" % len(self._bufferblocks) + bb = BufferBlock(blockid, starting_size=starting_size) + self._bufferblocks[bb.blockid] = bb + return bb + + def stop_threads(self): + if self._put_threads is not None: + for t in self._put_threads: + self._put_queue.put(None) + for t in self._put_threads: + t.join() + self._put_threads = None + self._put_queue = None + self._put_errors = None + + if self._prefetch_threads is not None: + for t in self._prefetch_threads: + self._prefetch_queue.put(None) + for t in self._prefetch_threads: + t.join() + self._prefetch_threads = None + self._prefetch_queue = None + + def commit_bufferblock(self, block): + def worker(self): + while True: + try: + b = self._put_queue.get() + if b is None: + return + b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes()) + b.state = BufferBlock.COMMITTED + b.buffer_view = None + b.buffer_block = None + except Exception as e: + print e + self._put_errors.put(e) + finally: + if self._put_queue is not None: + self._put_queue.task_done() + + if self._put_threads is None: + self._put_queue = Queue.Queue(maxsize=2) + self._put_errors = Queue.Queue() + self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))] + for t in self._put_threads: + t.start() + + block.state = BufferBlock.PENDING + self._put_queue.put(block) + + def get_block(self, locator, num_retries): + if locator in self._bufferblocks: + bb = self._bufferblocks[locator] + if bb.state != BufferBlock.COMMITTED: + return bb.buffer_view[0:bb.write_pointer].tobytes() + else: + locator = bb._locator + return self._keep.get(locator, num_retries=num_retries) + + def commit_all(self): + for k,v in self._bufferblocks.items(): + if v.state == BufferBlock.WRITABLE: + self.commit_bufferblock(v) + if self._put_queue is not None: + self._put_queue.join() + if not self._put_errors.empty(): + e = [] + try: + while True: + e.append(self._put_errors.get(False)) + except Queue.Empty: + pass + raise AsyncKeepWriteErrors(e) + + def block_prefetch(self, locator): + def worker(self): + while True: + try: + b = self._prefetch_queue.get() + if b is None: + return + self._keep.get(b) + except: + pass + + if locator in self._bufferblocks: + return + if self._prefetch_threads is None: + self._prefetch_queue = Queue.Queue() + self._prefetch_threads = [threading.Thread(target=worker, args=(self,)), + threading.Thread(target=worker, args=(self,))] + self._prefetch_threads[0].start() + self._prefetch_threads[1].start() + self._prefetch_queue.put(locator) + +class ArvadosFile(object): + def __init__(self, parent, stream=[], segments=[]): + ''' + stream: a list of Range objects representing a block stream + segments: a list of Range objects representing segments + ''' + self.parent = parent + self._modified = True + self.segments = [] + for s in segments: + self.add_segment(stream, s.range_start, s.range_size) + self._current_bblock = None + + def set_unmodified(self): + self._modified = False + + def modified(self): + return self._modified + + def truncate(self, size): + new_segs = [] + for r in self.segments: + range_end = r.range_start+r.range_size + if r.range_start >= size: + # segment is past the trucate size, all done + break + elif size < range_end: + nr = Range(r.locator, r.range_start, size - r.range_start) + nr.segment_offset = r.segment_offset + new_segs.append(nr) + break + else: + new_segs.append(r) -class StreamFileWriter(StreamFileReader): - def __init__(self, stream, segments, name): - super(StreamFileWriter, self).__init__(stream, segments, name) - self.mode = 'wb' + self.segments = new_segs + self._modified = True - # wrap superclass methods in mutex - def _proxy_method(name): - method = getattr(StreamFileReader, name) - @functools.wraps(method, ('__name__', '__doc__')) - def wrapper(self, *args, **kwargs): - with self._stream.mutex: - return method(self, *args, **kwargs) - return wrapper + def readfrom(self, offset, size, num_retries): + if size == 0 or offset >= self.size(): + return '' + data = [] - for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']: - locals()[_method_name] = _proxy_method(_method_name) + for lr in locators_and_ranges(self.segments, offset, size + config.KEEP_BLOCK_SIZE): + self.parent._my_block_manager().block_prefetch(lr.locator) - def truncate(self, size=None): - with self._stream.mutex: - if size is None: - size = self._filepos - - segs = locators_and_ranges(self.segments, 0, size) - - newstream = [] - self.segments = [] - streamoffset = 0L - fileoffset = 0L - - for seg in segs: - for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]): - newstream.append([locator, blocksize, streamoffset]) - self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset]) - streamoffset += blocksize - fileoffset += segmentsize - if len(newstream) == 0: - newstream.append(config.EMPTY_BLOCK_LOCATOR) - self.segments.append([0, 0, 0]) - self._stream._data_locators = newstream - if self._filepos > fileoffset: - self._filepos = fileoffset - - def _writeto(self, offset, data): - if offset > self._size(): + for lr in locators_and_ranges(self.segments, offset, size): + # TODO: if data is empty, wait on block get, otherwise only + # get more data if the block is already in the cache. + data.append(self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) + return ''.join(data) + + def _repack_writes(self): + '''Test if the buffer block has more data than is referenced by actual segments + (this happens when a buffered write over-writes a file range written in + a previous buffered write). Re-pack the buffer block for efficiency + and to avoid leaking information. + ''' + segs = self.segments + + # Sum up the segments to get the total bytes of the file referencing + # into the buffer block. + bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid] + write_total = sum([s.range_size for s in bufferblock_segs]) + + if write_total < self._current_bblock.size(): + # There is more data in the buffer block than is actually accounted for by segments, so + # re-pack into a new buffer by copying over to a new buffer block. + new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total) + for t in bufferblock_segs: + new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes()) + t.segment_offset = new_bb.size() - t.range_size + + self._current_bblock = new_bb + + def writeto(self, offset, data, num_retries): + if len(data) == 0: + return + + if offset > self.size(): raise ArgumentError("Offset is past the end of the file") - self._stream._append(data) - replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data)) - def writeto(self, offset, data): - with self._stream.mutex: - self._writeto(offset, data) + if len(data) > config.KEEP_BLOCK_SIZE: + raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE)) - def write(self, data): - with self._stream.mutex: - self._writeto(self._filepos, data) - self._filepos += len(data) + self._modified = True + + if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE: + self._current_bblock = self.parent._my_block_manager().alloc_bufferblock() + + if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: + self._repack_writes() + if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: + self.parent._my_block_manager().commit_bufferblock(self._current_bblock) + self._current_bblock = self.parent._my_block_manager().alloc_bufferblock() + + self._current_bblock.append(data) + replace_range(self.segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) - def writelines(self, seq): - with self._stream.mutex: - for s in seq: - self._writeto(self._filepos, s) - self._filepos += len(s) + def add_segment(self, blocks, pos, size): + self._modified = True + for lr in locators_and_ranges(blocks, pos, size): + last = self.segments[-1] if self.segments else Range(0, 0, 0) + r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) + self.segments.append(r) + + def size(self): + if self.segments: + n = self.segments[-1] + return n.range_start + n.range_size + else: + return 0 + + +class ArvadosFileReader(ArvadosFileReaderBase): + def __init__(self, arvadosfile, name, mode="r", num_retries=None): + super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries) + self.arvadosfile = arvadosfile + + def size(self): + return self.arvadosfile.size() + + @ArvadosFileBase._before_close + @retry_method + def read(self, size, num_retries=None): + """Read up to 'size' bytes from the stream, starting at the current file position""" + data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries) + self._filepos += len(data) + return data + + @ArvadosFileBase._before_close + @retry_method + def readfrom(self, offset, size, num_retries=None): + """Read up to 'size' bytes from the stream, starting at the current file position""" + return self.arvadosfile.readfrom(offset, size, num_retries) def flush(self): pass + +class ArvadosFileWriter(ArvadosFileReader): + def __init__(self, arvadosfile, name, mode, num_retries=None): + super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries) + + @ArvadosFileBase._before_close + @retry_method + def write(self, data, num_retries=None): + if self.mode[0] == "a": + self.arvadosfile.writeto(self.size(), data) + else: + self.arvadosfile.writeto(self._filepos, data, num_retries) + self._filepos += len(data) + + @ArvadosFileBase._before_close + @retry_method + def writelines(self, seq, num_retries=None): + for s in seq: + self.write(s) + + def truncate(self, size=None): + if size is None: + size = self._filepos + self.arvadosfile.truncate(size) + if self._filepos > self.size(): + self._filepos = self.size()