X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/92002b9af320e3229f59ca0778d1906e663f3066..f2388f1bdad27efd2816533aa7da80735ed5ec3f:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index f4580f346b..0fcdc1e633 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -88,9 +88,6 @@ class _FileLikeObjectBase(object): class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): super(ArvadosFileReaderBase, self).__init__(name, mode) - self._binary = 'b' in mode - if sys.version_info >= (3, 0) and not self._binary: - raise NotImplementedError("text mode {!r} is not implemented".format(mode)) self._filepos = 0 self.num_retries = num_retries self._readline_cache = (None, None) @@ -484,7 +481,7 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep, copies=None, put_threads=None): + def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = collections.OrderedDict() @@ -494,15 +491,14 @@ class _BlockManager(object): self._prefetch_threads = None self.lock = threading.Lock() self.prefetch_enabled = True - if put_threads: - self.num_put_threads = put_threads - else: - self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS + self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS self.copies = copies + self.storage_classes = storage_classes_func or (lambda: []) self._pending_write_size = 0 self.threads_lock = threading.Lock() self.padding_block = None + self.num_retries = num_retries @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -557,9 +553,9 @@ class _BlockManager(object): return if self.copies is None: - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) else: - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) bufferblock.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: bufferblock.set_state(_BufferBlock.ERROR, e) @@ -574,7 +570,7 @@ class _BlockManager(object): # If we don't limit the Queue size, the upload queue can quickly # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep + # generating data more quickly than it can be sent to the Keep # servers. # # With two upload threads and a queue size of 2, this means up to 4 @@ -728,9 +724,9 @@ class _BlockManager(object): if sync: try: if self.copies is None: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) else: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e) @@ -764,9 +760,10 @@ class _BlockManager(object): self._delete_bufferblock(locator) def _delete_bufferblock(self, locator): - bb = self._bufferblocks[locator] - bb.clear() - del self._bufferblocks[locator] + if locator in self._bufferblocks: + bb = self._bufferblocks[locator] + bb.clear() + del self._bufferblocks[locator] def get_block_contents(self, locator, num_retries, cache_only=False): """Fetch a block. @@ -900,6 +897,38 @@ class ArvadosFile(object): return True return False + @synchronized + def has_remote_blocks(self): + """Returns True if any of the segment's locators has a +R signature""" + + for s in self._segments: + if '+R' in s.locator: + return True + return False + + @synchronized + def _copy_remote_blocks(self, remote_blocks={}): + """Ask Keep to copy remote blocks and point to their local copies. + + This is called from the parent Collection. + + :remote_blocks: + Shared cache of remote to local block mappings. This is used to avoid + doing extra work when blocks are shared by more than one file in + different subdirectories. + """ + + for s in self._segments: + if '+R' in s.locator: + try: + loc = remote_blocks[s.locator] + except KeyError: + loc = self.parent._my_keep().refresh_signature(s.locator) + remote_blocks[s.locator] = loc + s.locator = loc + self.parent.set_committed(False) + return remote_blocks + @synchronized def segments(self): return copy.copy(self._segments) @@ -1246,6 +1275,11 @@ class ArvadosFileReader(ArvadosFileReaderBase): def stream_name(self): return self.arvadosfile.parent.stream_name() + def readinto(self, b): + data = self.read(len(b)) + b[:len(data)] = data + return len(data) + @_FileLikeObjectBase._before_close @retry_method def read(self, size=None, num_retries=None): @@ -1324,3 +1358,33 @@ class ArvadosFileWriter(ArvadosFileReader): if not self.closed: self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close() + + +class WrappableFile(object): + """An interface to an Arvados file that's compatible with io wrappers. + + """ + def __init__(self, f): + self.f = f + self.closed = False + def close(self): + self.closed = True + return self.f.close() + def flush(self): + return self.f.flush() + def read(self, *args, **kwargs): + return self.f.read(*args, **kwargs) + def readable(self): + return self.f.readable() + def readinto(self, *args, **kwargs): + return self.f.readinto(*args, **kwargs) + def seek(self, *args, **kwargs): + return self.f.seek(*args, **kwargs) + def seekable(self): + return self.f.seekable() + def tell(self): + return self.f.tell() + def writable(self): + return self.f.writable() + def write(self, *args, **kwargs): + return self.f.write(*args, **kwargs)