X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/89796f01a6ea3cb553a61be6ce92883a1decf003..276ca380e9ee106b3880708d0ac48e2ab3cca335:/sdk/python/arvados/arvfile.py?ds=sidebyside diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 1c21d832c0..2ec1079af1 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -208,19 +208,42 @@ class StreamFileReader(ArvadosFileReaderBase): class BufferBlock(object): +''' +A BufferBlock is a stand-in for a Keep block that is in the process of being +written. Writers can append to it, get the size, and compute the Keep locator. + +There are three valid states: + +WRITABLE - can append + +PENDING - is in the process of being uploaded to Keep, append is an error + +COMMITTED - the block has been written to Keep, its internal buffer has been +released, and the BufferBlock should be discarded in favor of fetching the +block through normal Keep means. +''' WRITABLE = 0 PENDING = 1 COMMITTED = 2 - def __init__(self, blockid, starting_size): + def __init__(self, blockid, starting_capacity): + ''' + blockid: the identifier for this block + starting_capacity: the initial buffer capacity + ''' self.blockid = blockid - self.buffer_block = bytearray(starting_size) + self.buffer_block = bytearray(starting_capacity) self.buffer_view = memoryview(self.buffer_block) self.write_pointer = 0 self.state = BufferBlock.WRITABLE self._locator = None def append(self, data): + ''' + Append some data to the buffer. Only valid if the block is in WRITABLE + state. Implements an expanding buffer, doubling capacity as needed to + accomdate all the data. + ''' if self.state == BufferBlock.WRITABLE: while (self.write_pointer+len(data)) > len(self.buffer_block): new_buffer_block = bytearray(len(self.buffer_block) * 2) @@ -234,14 +257,20 @@ class BufferBlock(object): raise AssertionError("Buffer block is not writable") def size(self): + '''Amount of data written to the buffer''' return self.write_pointer def locator(self): + '''The Keep locator for this buffer's contents.''' if self._locator is None: self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) return self._locator class AsyncKeepWriteErrors(Exception): + ''' + Roll up one or more Keep write exceptions (generated by background + threads) into a single one. + ''' def __init__(self, errors): self.errors = errors @@ -249,7 +278,12 @@ class AsyncKeepWriteErrors(Exception): return "\n".join(self.errors) class BlockManager(object): + ''' + BlockManager handles buffer blocks, background block uploads, and + background block prefetch for a Collection of ArvadosFiles. + ''' def __init__(self, keep): + '''keep: KeepClient object to use''' self._keep = keep self._bufferblocks = {} self._put_queue = None @@ -258,14 +292,22 @@ class BlockManager(object): self._prefetch_queue = None self._prefetch_threads = None - def alloc_bufferblock(self, blockid=None, starting_size=2**14): + def alloc_bufferblock(self, blockid=None, starting_capacity=2**14): + ''' + Allocate a new, empty bufferblock in WRITABLE state and return it. + blockid: optional block identifier, otherwise one will be automatically assigned + starting_capacity: optional capacity, otherwise will use default capacity + ''' if blockid is None: blockid = "bufferblock%i" % len(self._bufferblocks) - bb = BufferBlock(blockid, starting_size=starting_size) + bb = BufferBlock(blockid, starting_capacity=starting_capacity) self._bufferblocks[bb.blockid] = bb return bb def stop_threads(self): + ''' + Shut down and wait for background upload and download threads to finish. + ''' if self._put_threads is not None: for t in self._put_threads: self._put_queue.put(None) @@ -284,7 +326,15 @@ class BlockManager(object): self._prefetch_queue = None def commit_bufferblock(self, block): + ''' + Initiate a background upload of a bufferblock. This will block if the + upload queue is at capacity, otherwise it will return immediately. + ''' + def worker(self): + ''' + Background uploader thread. + ''' while True: try: b = self._put_queue.get() @@ -302,6 +352,17 @@ class BlockManager(object): self._put_queue.task_done() if self._put_threads is None: + # Start uploader threads. + + # If we don't limit the Queue size, the upload queue can quickly + # grow to take up gigabytes of RAM if the writing process is + # generating data more quickly than it can be send to the Keep + # servers. + # + # With two upload threads and a queue size of 2, this means up to 4 + # blocks pending. If they are full 64 MiB blocks, that means up to + # 256 MiB of internal buffering, which is the same size as the + # default download block cache in KeepClient. self._put_queue = Queue.Queue(maxsize=2) self._put_errors = Queue.Queue() self._put_threads = [threading.Thread(target=worker, args=(self,)), @@ -310,10 +371,15 @@ class BlockManager(object): t.daemon = True t.start() + # Mark the block as PENDING so to disallow any more appends. block.state = BufferBlock.PENDING self._put_queue.put(block) def get_block(self, locator, num_retries, cache_only=False): + ''' + Fetch a block. First checks to see if the locator is a BufferBlock and + return that, if not, passes the request through to KeepClient.get(). + ''' if locator in self._bufferblocks: bb = self._bufferblocks[locator] if bb.state != BufferBlock.COMMITTED: @@ -323,6 +389,12 @@ class BlockManager(object): return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only) def commit_all(self): + ''' + Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this + is a synchronous call, and will not return until all buffer blocks are + uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to + upload. + ''' for k,v in self._bufferblocks.items(): if v.state == BufferBlock.WRITABLE: self.commit_bufferblock(v) @@ -338,7 +410,14 @@ class BlockManager(object): raise AsyncKeepWriteErrors(e) def block_prefetch(self, locator): + ''' + Initiate a background download of a block. This assumes that the + underlying KeepClient implements a block cache, so repeated requests + for the same block will not result in repeated downloads (unless the + block is evicted from the cache.) This method does not block. + ''' def worker(self): + '''Background downloader thread.''' while True: try: b = self._prefetch_queue.get() @@ -360,6 +439,11 @@ class BlockManager(object): self._prefetch_queue.put(locator) class ArvadosFile(object): + ''' + Manages the underyling representation of a file in Keep as a sequence of + segments over a set of blocks, supporting random read/write access. + ''' + def __init__(self, parent, stream=[], segments=[]): ''' stream: a list of Range objects representing a block stream @@ -371,6 +455,19 @@ class ArvadosFile(object): for s in segments: self.add_segment(stream, s.locator, s.range_size) self._current_bblock = None + self.lock = threading.Lock() + + def clone(self): + ''' + Make a copy of this file. + ''' + # TODO: copy bufferblocks. + with self.lock: + cp = ArvadosFile() + cp.parent = self.parent + cp._modified = False + cp.segments = [Range(r.locator, r.range_start, r.range_size, r.segment_offset) for r in self.segments] + return cp def set_unmodified(self): self._modified = False @@ -477,7 +574,7 @@ class ArvadosFile(object): class ArvadosFileReader(ArvadosFileReaderBase): def __init__(self, arvadosfile, name, mode="r", num_retries=None): super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries) - self.arvadosfile = arvadosfile + self.arvadosfile = arvadosfile.clone() def size(self): return self.arvadosfile.size() @@ -499,9 +596,23 @@ class ArvadosFileReader(ArvadosFileReaderBase): def flush(self): pass + +class SynchronizedArvadosFile(object): + def __init__(self, arvadosfile): + self.arvadosfile = arvadosfile + + def clone(self): + return self + + def __getattr__(self, name): + with self.arvadosfile.lock: + return getattr(self.arvadosfile, name) + + class ArvadosFileWriter(ArvadosFileReader): def __init__(self, arvadosfile, name, mode, num_retries=None): - super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries) + self.arvadosfile = SynchronizedArvadosFile(arvadosfile) + super(ArvadosFileWriter, self).__init__(self.arvadosfile, name, mode, num_retries=num_retries) @ArvadosFileBase._before_close @retry_method