From 5f905666581dd7ccd8f7e05d1c8c4a6eedff0da9 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 16 Feb 2015 16:49:18 -0500 Subject: [PATCH] 4823: Add new tests for BufferBlock, BlockManager, saving, updating. Stylistic changes on variable naming. Remove splitting code from splitfastq. --- crunch_scripts/split-fastq.py | 72 +----- sdk/python/arvados/arvfile.py | 51 ++-- sdk/python/arvados/collection.py | 372 ++++++++++++++++----------- sdk/python/arvados/ranges.py | 71 +++-- sdk/python/arvados/stream.py | 2 +- sdk/python/tests/test_arvfile.py | 102 +++++++- sdk/python/tests/test_collections.py | 65 ++++- services/fuse/bin/arv-mount | 4 +- 8 files changed, 460 insertions(+), 279 deletions(-) diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py index 17aabf2930..1c7a36871d 100755 --- a/crunch_scripts/split-fastq.py +++ b/crunch_scripts/split-fastq.py @@ -16,8 +16,6 @@ inp = arvados.CollectionReader(arvados.getjobparam('reads')) manifest_list = [] -chunking = False #arvados.getjobparam('chunking') - def nextline(reader, start): n = -1 while True: @@ -31,63 +29,6 @@ def nextline(reader, start): start += 128 return n -# Chunk a fastq into approximately 64 MiB chunks. Requires that the input data -# be decompressed ahead of time, such as using decompress-all.py. Generates a -# new manifest, but doesn't actually move any data around. Handles paired -# reads by ensuring that each chunk of a pair gets the same number of records. -# -# This works, but in practice is so slow that potential gains in alignment -# performance are lost in the prep time, which is why it is currently disabled. -# -# A better algorithm would seek to a file position a bit less than the desired -# chunk size and then scan ahead for the next record, making sure that record -# was matched by the read pair. -def splitfastq(p): - for i in xrange(0, len(p)): - p[i]["start"] = 0 - p[i]["end"] = 0 - - count = 0 - recordsize = [0, 0] - - global piece - finish = False - while not finish: - for i in xrange(0, len(p)): - recordsize[i] = 0 - - # read next 4 lines - for i in xrange(0, len(p)): - for ln in xrange(0, 4): - r = nextline(p[i]["reader"], p[i]["end"]+recordsize[i]) - if r == -1: - finish = True - break - recordsize[i] += (r+1) - - splitnow = finish - for i in xrange(0, len(p)): - if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= (64*1024*1024): - splitnow = True - - if splitnow: - for i in xrange(0, len(p)): - global manifest_list - print >>sys.stderr, "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"]) - manifest = [] - manifest.extend(["./_" + str(piece)]) - manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators]) - manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])]) - manifest_list.append(manifest) - p[i]["start"] = p[i]["end"] - piece += 1 - else: - for i in xrange(0, len(p)): - p[i]["end"] += recordsize[i] - count += 1 - if count % 10000 == 0: - print >>sys.stderr, "Record %s at %s" % (count, p[i]["end"]) - prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$') # Look for fastq files @@ -115,14 +56,11 @@ for s in inp.all_streams(): p[0]["reader"] = s.files()[name_pieces.group(0)] if p is not None: - if chunking: - splitfastq(p) - else: - for i in xrange(0, len(p)): - m = p[i]["reader"].as_manifest().split() - m[0] = "./_" + str(piece) - manifest_list.append(m) - piece += 1 + for i in xrange(0, len(p)): + m = p[i]["reader"].as_manifest().split() + m[0] = "./_" + str(piece) + manifest_list.append(m) + piece += 1 manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n" diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 456602fb90..0bc70a7c6e 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -11,7 +11,7 @@ import threading import Queue import copy import errno -from .errors import KeepWriteError +from .errors import KeepWriteError, AssertionError def split(path): """Separate the stream name and file name in a /-separated stream path and @@ -26,7 +26,7 @@ def split(path): stream_name, file_name = '.', path return stream_name, file_name -class ArvadosFileBase(object): +class FileLikeObjectBase(object): def __init__(self, name, mode): self.name = name self.mode = mode @@ -55,7 +55,7 @@ class ArvadosFileBase(object): self.closed = True -class ArvadosFileReaderBase(ArvadosFileBase): +class ArvadosFileReaderBase(FileLikeObjectBase): class _NameAttribute(str): # The Python file API provides a plain .name attribute. # Older SDK provided a name() method. @@ -79,7 +79,7 @@ class ArvadosFileReaderBase(ArvadosFileBase): def decompressed_name(self): return re.sub('\.(bz2|gz)$', '', self.name) - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close def seek(self, pos, whence=os.SEEK_CUR): if whence == os.SEEK_CUR: pos += self._filepos @@ -90,7 +90,7 @@ class ArvadosFileReaderBase(ArvadosFileBase): def tell(self): return self._filepos - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def readall(self, size=2**20, num_retries=None): while True: @@ -99,7 +99,7 @@ class ArvadosFileReaderBase(ArvadosFileBase): break yield data - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def readline(self, size=float('inf'), num_retries=None): cache_pos, cache_data = self._readline_cache @@ -123,7 +123,7 @@ class ArvadosFileReaderBase(ArvadosFileBase): self._readline_cache = (self.tell(), data[nextline_index:]) return data[:nextline_index] - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def decompress(self, decompress, size, num_retries=None): for segment in self.readall(size, num_retries): @@ -131,7 +131,7 @@ class ArvadosFileReaderBase(ArvadosFileBase): if data: yield data - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def readall_decompressed(self, size=2**20, num_retries=None): self.seek(0) @@ -146,7 +146,7 @@ class ArvadosFileReaderBase(ArvadosFileBase): else: return self.readall(size, num_retries=num_retries) - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def readlines(self, sizehint=float('inf'), num_retries=None): data = [] @@ -181,7 +181,7 @@ class StreamFileReader(ArvadosFileReaderBase): n = self.segments[-1] return n.range_start + n.range_size - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" @@ -199,7 +199,7 @@ class StreamFileReader(ArvadosFileReaderBase): self._filepos += len(data) return data - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" @@ -381,8 +381,11 @@ class BlockManager(object): """ new_blockid = "bufferblock%i" % len(self._bufferblocks) block = self._bufferblocks[blockid] - bufferblock = BufferBlock(new_blockid, len(block), owner) - bufferblock.append(block) + if block.state != BufferBlock.WRITABLE: + raise AssertionError("Can only duplicate a writable buffer block") + + bufferblock = BufferBlock(new_blockid, block.size(), owner) + bufferblock.append(block.buffer_view[0:block.size()]) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -465,7 +468,11 @@ class BlockManager(object): block.state = BufferBlock.PENDING self._put_queue.put(block) - def get_block(self, locator, num_retries, cache_only=False): + def get_bufferblock(self, locator): + with self.lock: + return self._bufferblocks.get(locator) + + def get_block_contents(self, locator, num_retries, cache_only=False): """Fetch a block. First checks to see if the locator is a BufferBlock and return that, if @@ -593,7 +600,11 @@ class ArvadosFile(object): new_loc = r.locator if self.parent._my_block_manager().is_bufferblock(r.locator): if r.locator not in map_loc: - map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid + bufferblock = get_bufferblock(r.locator) + if bufferblock.state == BufferBlock.COMITTED: + map_loc[r.locator] = bufferblock.locator() + else: + map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp) new_loc = map_loc[r.locator] cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset)) @@ -674,7 +685,7 @@ class ArvadosFile(object): data = [] for lr in readsegs: - block = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data)) + block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data)) if block: data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size]) else: @@ -781,7 +792,7 @@ class ArvadosFileReader(ArvadosFileReaderBase): def size(self): return self.arvadosfile.size() - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def read(self, size, num_retries=None): """Read up to `size` bytes from the stream, starting at the current file position.""" @@ -789,7 +800,7 @@ class ArvadosFileReader(ArvadosFileReaderBase): self._filepos += len(data) return data - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def readfrom(self, offset, size, num_retries=None): """Read up to `size` bytes from the stream, starting at the current file position.""" @@ -810,7 +821,7 @@ class ArvadosFileWriter(ArvadosFileReader): def __init__(self, arvadosfile, name, mode, num_retries=None): super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries) - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def write(self, data, num_retries=None): if self.mode[0] == "a": @@ -819,7 +830,7 @@ class ArvadosFileWriter(ArvadosFileReader): self.arvadosfile.writeto(self._filepos, data, num_retries) self._filepos += len(data) - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close @retry_method def writelines(self, seq, num_retries=None): for s in seq: diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index a64dd34f97..33af0c2284 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -8,7 +8,7 @@ import time from collections import deque from stat import * -from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock +from .arvfile import split, FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock from keep import * from .stream import StreamReader, normalize_stream, locator_block_size from .ranges import Range, LocatorAndRange @@ -244,7 +244,7 @@ class CollectionReader(CollectionBase): return self._manifest_text -class _WriterFile(ArvadosFileBase): +class _WriterFile(FileLikeObjectBase): def __init__(self, coll_writer, name): super(_WriterFile, self).__init__(name, 'wb') self.dest = coll_writer @@ -253,16 +253,16 @@ class _WriterFile(ArvadosFileBase): super(_WriterFile, self).close() self.dest.finish_current_file() - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close def write(self, data): self.dest.write(data) - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close def writelines(self, seq): for data in seq: self.write(data) - @ArvadosFileBase._before_close + @FileLikeObjectBase._before_close def flush(self): self.dest.flush_data() @@ -696,65 +696,62 @@ class SynchronizedCollectionBase(CollectionBase): def notify(self, event, collection, name, item): raise NotImplementedError() + @must_be_writable @synchronized - def _find(self, path, create, create_collection): - """Internal method. Don't use this. Use `find()` or `find_or_create()` - instead. - - Recursively search the specified file path. May return either a - Collection or ArvadosFile. Return None if not found and create=False. - Will create a new item if create=True. - - :create: - If true, create path components (i.e. Collections) that are - missing. If "create" is False, return None if a path component is - not found. - - :create_collection: - If the path is not found, "create" is True, and - "create_collection" is False, then create and return a new - ArvadosFile for the last path component. If "create_collection" is - True, then create and return a new Collection for the last path - component. + def find_or_create(self, path, create_type): + """Recursively search the specified file path. + + May return either a `Collection` or `ArvadosFile`. If not found, will + create a new item at the specified path based on `create_type`. Will + create intermediate subcollections needed to contain the final item in + the path. + + :create_type: + One of `arvado.collection.FILE` or + `arvado.collection.COLLECTION`. If the path is not found, and value + of create_type is FILE then create and return a new ArvadosFile for + the last path component. If COLLECTION, then create and return a new + Collection for the last path component. """ - if create and self.sync_mode() == SYNC_READONLY: + if self.sync_mode() == SYNC_READONLY: raise IOError((errno.EROFS, "Collection is read only")) - p = path.split("/") - if p[0] == '.': - del p[0] + pathcomponents = path.split("/") + if pathcomponents[0] == '.': + del pathcomponents[0] - if p and p[0]: - item = self._items.get(p[0]) - if len(p) == 1: + if pathcomponents and pathcomponents[0]: + item = self._items.get(pathcomponents[0]) + if len(pathcomponents) == 1: # item must be a file - if item is None and create: + if item is None: # create new file - if create_collection: + if create_type == COLLECTION: item = Subcollection(self) else: item = ArvadosFile(self) - self._items[p[0]] = item + self._items[pathcomponents[0]] = item self._modified = True - self.notify(ADD, self, p[0], item) + self.notify(ADD, self, pathcomponents[0], item) return item else: - if item is None and create: + if item is None: # create new collection item = Subcollection(self) - self._items[p[0]] = item + self._items[pathcomponents[0]] = item self._modified = True - self.notify(ADD, self, p[0], item) - del p[0] + self.notify(ADD, self, pathcomponents[0], item) + del pathcomponents[0] if isinstance(item, SynchronizedCollectionBase): - return item._find("/".join(p), create, create_collection) + return item.find_or_create("/".join(pathcomponents), create_type) else: raise errors.ArgumentError("Interior path components must be subcollection") else: return self + @synchronized def find(self, path): """Recursively search the specified file path. @@ -762,23 +759,32 @@ class SynchronizedCollectionBase(CollectionBase): found. """ - return self._find(path, False, False) + pathcomponents = path.split("/") + if pathcomponents[0] == '.': + del pathcomponents[0] - def find_or_create(self, path, create_type): - """Recursively search the specified file path. + if pathcomponents and pathcomponents[0]: + item = self._items.get(pathcomponents[0]) + if len(pathcomponents) == 1: + # item must be a file + return item + else: + del pathcomponents[0] + if isinstance(item, SynchronizedCollectionBase): + return item.find("/".join(pathcomponents)) + else: + raise errors.ArgumentError("Interior path components must be subcollection") + else: + return self - May return either a Collection or ArvadosFile. Will create a new item - at the specified path if none exists. + def mkdirs(path): + """Recursive subcollection create. - :create_type: - One of `arvado.collection.FILE` or - `arvado.collection.COLLECTION`. If the path is not found, and value - of create_type is FILE then create and return a new ArvadosFile for - the last path component. If COLLECTION, then create and return a new - Collection for the last path component. + Like `os.mkdirs()`. Will create intermediate subcollections needed to + contain the leaf subcollection path. """ - return self._find(path, True, (create_type == COLLECTION)) + return self.find_or_create(path, COLLECTION) def open(self, path, mode): """Open a file-like object for access. @@ -806,20 +812,23 @@ class SynchronizedCollectionBase(CollectionBase): if create and self.sync_mode() == SYNC_READONLY: raise IOError((errno.EROFS, "Collection is read only")) - f = self._find(path, create, False) + if create: + arvfile = self.find_or_create(path, FILE) + else: + arvfile = self.find(path) - if f is None: + if arvfile is None: raise IOError((errno.ENOENT, "File not found")) - if not isinstance(f, ArvadosFile): + if not isinstance(arvfile, ArvadosFile): raise IOError((errno.EISDIR, "Path must refer to a file.")) if mode[0] == "w": - f.truncate(0) + arvfile.truncate(0) if mode == "r": - return ArvadosFileReader(f, path, mode, num_retries=self.num_retries) + return ArvadosFileReader(arvfile, path, mode, num_retries=self.num_retries) else: - return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries) + return ArvadosFileWriter(arvfile, path, mode, num_retries=self.num_retries) @synchronized def modified(self): @@ -902,25 +911,25 @@ class SynchronizedCollectionBase(CollectionBase): :recursive: Specify whether to remove non-empty subcollections (True), or raise an error (False). """ - p = path.split("/") - if p[0] == '.': + pathcomponents = path.split("/") + if pathcomponents[0] == '.': # Remove '.' from the front of the path - del p[0] + del pathcomponents[0] - if len(p) > 0: - item = self._items.get(p[0]) + if len(pathcomponents) > 0: + item = self._items.get(pathcomponents[0]) if item is None: raise IOError((errno.ENOENT, "File not found")) - if len(p) == 1: - if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not recursive: + if len(pathcomponents) == 1: + if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: raise IOError((errno.ENOTEMPTY, "Subcollection not empty")) - d = self._items[p[0]] - del self._items[p[0]] + deleteditem = self._items[pathcomponents[0]] + del self._items[pathcomponents[0]] self._modified = True - self.notify(DEL, self, p[0], d) + self.notify(DEL, self, pathcomponents[0], deleteditem) else: - del p[0] - item.remove("/".join(p)) + del pathcomponents[0] + item.remove("/".join(pathcomponents)) else: raise IOError((errno.ENOENT, "File not found")) @@ -959,41 +968,41 @@ class SynchronizedCollectionBase(CollectionBase): source_obj = source_collection.find(source) if source_obj is None: raise IOError((errno.ENOENT, "File not found")) - sp = source.split("/") + sourcecomponents = source.split("/") else: source_obj = source - sp = None + sourcecomponents = None # Find parent collection the target path - tp = target_path.split("/") + targetcomponents = target_path.split("/") # Determine the name to use. - target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None) + target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None) if not target_name: raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") - target_dir = self.find_or_create("/".join(tp[0:-1]), COLLECTION) + target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) with target_dir.lock: if target_name in target_dir: - if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp: + if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents: target_dir = target_dir[target_name] - target_name = sp[-1] + target_name = sourcecomponents[-1] elif not overwrite: raise IOError((errno.EEXIST, "File already exists")) - mod = None + modified_from = None if target_name in target_dir: - mod = target_dir[target_name] + modified_from = target_dir[target_name] # Actually make the copy. dup = source_obj.clone(target_dir) target_dir._items[target_name] = dup target_dir._modified = True - if mod: - self.notify(MOD, target_dir, target_name, (mod, dup)) + if modified_from: + self.notify(MOD, target_dir, target_name, (modified_from, dup)) else: self.notify(ADD, target_dir, target_name, dup) @@ -1028,7 +1037,7 @@ class SynchronizedCollectionBase(CollectionBase): """ changes = [] if holding_collection is None: - holding_collection = CollectionRoot(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY) + holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY) for k in self: if k not in end_collection: changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection))) @@ -1051,13 +1060,14 @@ class SynchronizedCollectionBase(CollectionBase): alternate path indicating the conflict. """ - for c in changes: - path = c[1] - initial = c[2] + for change in changes: + event_type = change[0] + path = change[1] + initial = change[2] local = self.find(path) conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S", time.gmtime())) - if c[0] == ADD: + if event_type == ADD: if local is None: # No local file at path, safe to copy over new file self.copy(initial, path) @@ -1065,22 +1075,23 @@ class SynchronizedCollectionBase(CollectionBase): # There is already local file and it is different: # save change to conflict file. self.copy(initial, conflictpath) - elif c[0] == MOD: + elif event_type == MOD: + final = change[3] if local == initial: # Local matches the "initial" item so it has not # changed locally and is safe to update. - if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile): + if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): # Replace contents of local file with new contents - local.replace_contents(c[3]) + local.replace_contents(final) else: # Overwrite path with new item; this can happen if # path was a file and is now a collection or vice versa - self.copy(c[3], path, overwrite=True) + self.copy(final, path, overwrite=True) else: # Local is missing (presumably deleted) or local doesn't # match the "start" value, so save change to conflict file - self.copy(c[3], conflictpath) - elif c[0] == DEL: + self.copy(final, conflictpath) + elif event_type == DEL: if local == initial: # Local item matches "initial" value, so it is safe to remove. self.remove(path, recursive=True) @@ -1110,7 +1121,7 @@ class SynchronizedCollectionBase(CollectionBase): def __ne__(self, other): return not self.__eq__(other) -class CollectionRoot(SynchronizedCollectionBase): +class Collection(SynchronizedCollectionBase): """Represents the root of an Arvados Collection, which may be associated with an API server Collection record. @@ -1154,7 +1165,7 @@ class CollectionRoot(SynchronizedCollectionBase): num_retries=None, block_manager=None, sync=None): - """CollectionRoot constructor. + """Collection constructor. :manifest_locator_or_text: One of Arvados collection UUID, block locator of @@ -1185,7 +1196,7 @@ class CollectionRoot(SynchronizedCollectionBase): background websocket events, on block write, or on file close. """ - super(CollectionRoot, self).__init__(parent) + super(Collection, self).__init__(parent) self._api_client = api_client self._keep_client = keep_client self._block_manager = block_manager @@ -1195,7 +1206,7 @@ class CollectionRoot(SynchronizedCollectionBase): else: self._config = config.settings() - self.num_retries = num_retries + self.num_retries = num_retries if num_retries is not None else 2 self._manifest_locator = None self._manifest_text = None self._api_response = None @@ -1220,13 +1231,7 @@ class CollectionRoot(SynchronizedCollectionBase): "Argument to CollectionReader must be a manifest or a collection UUID") self._populate() - - if self._sync == SYNC_LIVE: - if not self._has_collection_uuid(): - raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid") - self.events = events.subscribe(arvados.api(apiconfig=self._config), - [["object_uuid", "=", self._manifest_locator]], - self.on_message) + self._subscribe_events() def root_collection(self): @@ -1235,19 +1240,18 @@ class CollectionRoot(SynchronizedCollectionBase): def sync_mode(self): return self._sync + def _subscribe_events(self): + if self._sync == SYNC_LIVE and self.events is None: + if not self._has_collection_uuid(): + raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid") + self.events = events.subscribe(arvados.api(apiconfig=self._config), + [["object_uuid", "=", self._manifest_locator]], + self.on_message) + def on_message(self, event): if event.get("object_uuid") == self._manifest_locator: self.update() - @staticmethod - def create(name, owner_uuid=None, sync=SYNC_EXPLICIT, apiconfig=None): - """Create a new empty Collection with associated collection record.""" - c = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig) - c.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True) - if sync == SYNC_LIVE: - c.events = events.subscribe(arvados.api(apiconfig=self._config), [["object_uuid", "=", c._manifest_locator]], c.on_message) - return c - @synchronized @retry_method def update(self, other=None, num_retries=None): @@ -1258,9 +1262,9 @@ class CollectionRoot(SynchronizedCollectionBase): if other is None: if self._manifest_locator is None: raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") - n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) - other = import_collection(n["manifest_text"]) - baseline = import_collection(self._manifest_text) + response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) + other = import_manifest(response["manifest_text"]) + baseline = import_manifest(self._manifest_text) self.apply(other.diff(baseline)) @synchronized @@ -1368,12 +1372,12 @@ class CollectionRoot(SynchronizedCollectionBase): def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None): if new_config is None: new_config = self._config - c = CollectionRoot(parent=new_parent, apiconfig=new_config, sync=new_sync) + newcollection = Collection(parent=new_parent, apiconfig=new_config, sync=new_sync) if new_sync == SYNC_READONLY: - c.lock = NoopLock() - c._items = {} - self._cloneinto(c) - return c + newcollection.lock = NoopLock() + newcollection._items = {} + self._cloneinto(newcollection) + return newcollection @synchronized def api_response(self): @@ -1410,13 +1414,13 @@ class CollectionRoot(SynchronizedCollectionBase): self.update() self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries) - mt = self.manifest_text(strip=False) + text = self.manifest_text(strip=False) self._api_response = self._my_api().collections().update( uuid=self._manifest_locator, - body={'manifest_text': mt} + body={'manifest_text': text} ).execute( num_retries=num_retries) - self._manifest_text = mt + self._manifest_text = text self.set_unmodified() @must_be_writable @@ -1447,13 +1451,13 @@ class CollectionRoot(SynchronizedCollectionBase): """ self._my_block_manager().commit_all() self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries) - mt = self.manifest_text(strip=False) + text = self.manifest_text(strip=False) if create_collection_record: if name is None: name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime())) - body = {"manifest_text": mt, + body = {"manifest_text": text, "name": name} if owner_uuid: body["owner_uuid"] = owner_uuid @@ -1468,7 +1472,7 @@ class CollectionRoot(SynchronizedCollectionBase): if self.events: self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]]) - self._manifest_text = mt + self._manifest_text = text self.set_unmodified() @synchronized @@ -1485,17 +1489,75 @@ class CollectionRoot(SynchronizedCollectionBase): c(event, collection, name, item) def ReadOnlyCollection(*args, **kwargs): + """Create a read-only collection object from an api collection record locator, + a portable data hash of a manifest, or raw manifest text. + + See `Collection` constructor for detailed options. + + """ kwargs["sync"] = SYNC_READONLY - return CollectionRoot(*args, **kwargs) + return Collection(*args, **kwargs) def WritableCollection(*args, **kwargs): + """Create a writable collection object from an api collection record locator, + a portable data hash of a manifest, or raw manifest text. + + See `Collection` constructor for detailed options. + + """ + kwargs["sync"] = SYNC_EXPLICIT - return CollectionRoot(*args, **kwargs) + return Collection(*args, **kwargs) def LiveCollection(*args, **kwargs): + """Create a writable, live updating collection object representing an existing + collection record on the API server. + + See `Collection` constructor for detailed options. + + """ kwargs["sync"] = SYNC_LIVE - return CollectionRoot(*args, **kwargs) + return Collection(*args, **kwargs) + +def createWritableCollection(name, owner_uuid=None, apiconfig=None): + """Create an empty, writable collection object and create an associated api + collection record. + + :name: + The collection name + + :owner_uuid: + The parent project. + + :apiconfig: + Optional alternate api configuration to use (to specify alternate API + host or token than the default.) + """ + newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig) + newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True) + return newcollection + +def createLiveCollection(name, owner_uuid=None, apiconfig=None): + """Create an empty, writable, live updating Collection object and create an + associated collection record on the API server. + + :name: + The collection name + + :owner_uuid: + The parent project. + + :apiconfig: + Optional alternate api configuration to use (to specify alternate API + host or token than the default.) + + """ + newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig) + newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True) + newcollection._sync = SYNC_LIVE + newcollection._subscribe_events() + return newcollection class Subcollection(SynchronizedCollectionBase): """This is a subdirectory within a collection that doesn't have its own API @@ -1542,7 +1604,7 @@ def import_manifest(manifest_text, keep=None, num_retries=None, sync=SYNC_READONLY): - """Import a manifest into a `CollectionRoot`. + """Import a manifest into a `Collection`. :manifest_text: The manifest text to import from. @@ -1566,12 +1628,11 @@ def import_manifest(manifest_text, if into_collection is not None: if len(into_collection) > 0: raise ArgumentError("Can only import manifest into an empty collection") - c = into_collection else: - c = CollectionRoot(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync) + into_collection = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync) - save_sync = c.sync_mode() - c._sync = None + save_sync = into_collection.sync_mode() + into_collection._sync = None STREAM_NAME = 0 BLOCKS = 1 @@ -1608,7 +1669,7 @@ def import_manifest(manifest_text, pos = long(s.group(1)) size = long(s.group(2)) name = s.group(3).replace('\\040', ' ') - f = c.find_or_create("%s/%s" % (stream_name, name), FILE) + f = into_collection.find_or_create("%s/%s" % (stream_name, name), FILE) f.add_segment(blocks, pos, size) else: # error! @@ -1618,9 +1679,9 @@ def import_manifest(manifest_text, stream_name = None state = STREAM_NAME - c.set_unmodified() - c._sync = save_sync - return c + into_collection.set_unmodified() + into_collection._sync = save_sync + return into_collection def export_manifest(item, stream_name=".", portable_locators=False): """Export a manifest from the contents of a SynchronizedCollectionBase. @@ -1641,34 +1702,35 @@ def export_manifest(item, stream_name=".", portable_locators=False): if isinstance(item, SynchronizedCollectionBase): stream = {} sorted_keys = sorted(item.keys()) - for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]: - v = item[k] - st = [] - for s in v.segments(): - loc = s.locator + for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]: + # Create a stream per file `k` + arvfile = item[filename] + filestream = [] + for segment in arvfile.segments(): + loc = segment.locator if loc.startswith("bufferblock"): - loc = v.parent._my_block_manager()._bufferblocks[loc].locator() + loc = arvfile.parent._my_block_manager()._bufferblocks[loc].locator() if portable_locators: loc = KeepLocator(loc).stripped() - st.append(LocatorAndRange(loc, locator_block_size(loc), - s.segment_offset, s.range_size)) - stream[k] = st + filestream.append(LocatorAndRange(loc, locator_block_size(loc), + segment.segment_offset, segment.range_size)) + stream[filename] = filestream if stream: buf += ' '.join(normalize_stream(stream_name, stream)) buf += "\n" - for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]: - buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators) + for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]: + buf += export_manifest(item[dirname], stream_name=os.path.join(stream_name, dirname), portable_locators=portable_locators) elif isinstance(item, ArvadosFile): - st = [] - for s in item.segments: - loc = s.locator + filestream = [] + for segment in item.segments: + loc = segment.locator if loc.startswith("bufferblock"): loc = item._bufferblocks[loc].calculate_locator() if portable_locators: loc = KeepLocator(loc).stripped() - st.append(LocatorAndRange(loc, locator_block_size(loc), - s.segment_offset, s.range_size)) - stream[stream_name] = st + filestream.append(LocatorAndRange(loc, locator_block_size(loc), + segment.segment_offset, segment.range_size)) + stream[stream_name] = filestream buf += ' '.join(normalize_stream(stream_name, stream)) buf += "\n" return buf diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py index 12941a1fc1..2a08b3bc1f 100644 --- a/sdk/python/arvados/ranges.py +++ b/sdk/python/arvados/ranges.py @@ -1,3 +1,7 @@ +import logging + +_logger = logging.getLogger('arvados.ranges') + class Range(object): def __init__(self, locator, range_start, range_size, segment_offset=0): self.locator = locator @@ -14,7 +18,7 @@ class Range(object): self.range_size == other.range_size and self.segment_offset == other.segment_offset) -def first_block(data_locators, range_start, range_size, debug=False): +def first_block(data_locators, range_start, range_size): block_start = 0L # range_start/block_start is the inclusive lower bound @@ -26,7 +30,6 @@ def first_block(data_locators, range_start, range_size, debug=False): block_size = data_locators[i].range_size block_start = data_locators[i].range_start block_end = block_start + block_size - if debug: print '---' # perform a binary search for the first block # assumes that all of the blocks are contigious, so range_start is guaranteed @@ -40,7 +43,6 @@ def first_block(data_locators, range_start, range_size, debug=False): else: hi = i i = int((hi + lo) / 2) - if debug: print lo, i, hi block_size = data_locators[i].range_size block_start = data_locators[i].range_start block_end = block_start + block_size @@ -63,13 +65,19 @@ class LocatorAndRange(object): def __repr__(self): return "LocatorAndRange(\"%s\", %i, %i, %i)" % (self.locator, self.block_size, self.segment_offset, self.segment_size) -def locators_and_ranges(data_locators, range_start, range_size, debug=False): - ''' - Get blocks that are covered by the range - data_locators: list of Range objects, assumes that blocks are in order and contigous - range_start: start of range - range_size: size of range - returns list of LocatorAndRange objects +def locators_and_ranges(data_locators, range_start, range_size): + '''Get blocks that are covered by the range and return list of LocatorAndRange + objects. + + :data_locators: + list of Range objects, assumes that blocks are in order and contigous + + :range_start: + start of range + + :range_size: + size of range + ''' if range_size == 0: return [] @@ -78,7 +86,7 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False): range_size = long(range_size) range_end = range_start + range_size - i = first_block(data_locators, range_start, range_size, debug) + i = first_block(data_locators, range_start, range_size) if i is None: return [] @@ -87,15 +95,16 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False): block_start = dl.range_start block_size = dl.range_size block_end = block_start + block_size - if debug: - print dl.locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end + _logger.debug(dl.locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end) if range_end <= block_start: # range ends before this block starts, so don't look at any more locators break #if range_start >= block_end: - # range starts after this block ends, so go to next block - # we should always start at the first block due to the binary above, so this test is redundant + # Range starts after this block ends, so go to next block. + # We should always start at the first block due to the binary + # search above, so this test is unnecessary but useful to help + # document the algorithm. #next if range_start >= block_start and range_end <= block_end: @@ -114,23 +123,28 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False): i += 1 return resp -def replace_range(data_locators, new_range_start, new_range_size, new_locator, new_segment_offset, debug=False): +def replace_range(data_locators, new_range_start, new_range_size, new_locator, new_segment_offset): ''' Replace a file segment range with a new segment. - data_locators: list of Range objects, assumes that segments are in order and contigous + NOTE:: + data_locators will be updated in place - new_range_start: start of range to replace in data_locators + :data_locators: + list of Range objects, assumes that segments are in order and contigous - new_range_size: size of range to replace in data_locators + :new_range_start: + start of range to replace in data_locators - new_locator: locator for new segment to be inserted + :new_range_size: + size of range to replace in data_locators - new_segment_offset: segment offset within the locator + :new_locator: + locator for new segment to be inserted - debug: print debugging details. + :new_segment_offset: + segment offset within the locator - !!! data_locators will be updated in place !!! ''' if new_range_size == 0: return @@ -152,7 +166,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset)) return - i = first_block(data_locators, new_range_start, new_range_size, debug) + i = first_block(data_locators, new_range_start, new_range_size) if i is None: return @@ -160,15 +174,16 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n dl = data_locators[i] old_segment_start = dl.range_start old_segment_end = old_segment_start + dl.range_size - if debug: - print locator, "range_start", new_range_start, "segment_start", old_segment_start, "range_end", new_range_end, "segment_end", old_segment_end + _logger.debug(dl, "range_start", new_range_start, "segment_start", old_segment_start, "range_end", new_range_end, "segment_end", old_segment_end) if new_range_end <= old_segment_start: # range ends before this segment starts, so don't look at any more locators break #if range_start >= old_segment_end: - # range starts after this segment ends, so go to next segment - # we should always start at the first segment due to the binary above, so this test is redundant + # Range starts after this segment ends, so go to next segment. + # We should always start at the first segment due to the binary + # search above, so this test is unnecessary but useful to help + # document the algorithm. #next if old_segment_start <= new_range_start and new_range_end <= old_segment_end: diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index 75b23cc026..9cfceb8f49 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -7,7 +7,7 @@ import functools import copy from .ranges import * -from .arvfile import ArvadosFileBase, StreamFileReader +from .arvfile import StreamFileReader from arvados.retry import retry_method from keep import * import config diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py index 52de0a3e60..29775a64bc 100644 --- a/sdk/python/tests/test_arvfile.py +++ b/sdk/python/tests/test_arvfile.py @@ -75,7 +75,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase): self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator) self.assertEqual(False, c.modified()) - def test_append(self): + def test_append0(self): keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}) api = ArvadosFileWriterTestCase.MockApi({"name":"test_append", "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n"}, @@ -83,22 +83,44 @@ class ArvadosFileWriterTestCase(unittest.TestCase): with arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', api_client=api, keep_client=keep) as c: writer = c.open("count.txt", "r+") + self.assertEqual(writer.size(), 10) + writer.seek(5, os.SEEK_SET) self.assertEqual("56789", writer.read(8)) + writer.seek(10, os.SEEK_SET) writer.write("foo") self.assertEqual(writer.size(), 13) + writer.seek(5, os.SEEK_SET) self.assertEqual("56789foo", writer.read(8)) self.assertEqual(None, c._manifest_locator) self.assertEqual(True, c.modified()) self.assertEqual(None, keep.get("acbd18db4cc2f85cedef654fccc4a4d8+3")) + c.save_new("test_append") self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator) self.assertEqual(False, c.modified()) self.assertEqual("foo", keep.get("acbd18db4cc2f85cedef654fccc4a4d8+3")) + + def test_append1(self): + keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}) + c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', keep_client=keep) + writer = c.open("count.txt", "a+") + self.assertEqual(writer.read(20), "0123456789") + writer.seek(0, os.SEEK_SET) + + writer.write("hello") + self.assertEqual(writer.read(20), "0123456789hello") + writer.seek(0, os.SEEK_SET) + + writer.write("world") + self.assertEqual(writer.read(20), "0123456789helloworld") + + self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", export_manifest(c)) + def test_write0(self): keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}) with arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', @@ -328,7 +350,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase): def block_prefetch(self, loc): pass - def get_block(self, loc, num_retries=0, cache_only=False): + def get_block_contents(self, loc, num_retries=0, cache_only=False): if self.nocache and cache_only: return None return self.blocks[loc] @@ -432,3 +454,79 @@ class ArvadosFileReadAllDecompressedTestCase(ArvadosFileReadTestCase): class ArvadosFileReadlinesTestCase(ArvadosFileReadTestCase): def read_for_test(self, reader, byte_count, **kwargs): return ''.join(reader.readlines(**kwargs)) + +class BlockManagerTest(unittest.TestCase): + def test_bufferblock_append(self): + keep = ArvadosFileWriterTestCase.MockKeep({}) + blockmanager = arvados.arvfile.BlockManager(keep) + bufferblock = blockmanager.alloc_bufferblock() + bufferblock.append("foo") + + self.assertEqual(bufferblock.size(), 3) + self.assertEqual(bufferblock.buffer_view[0:3], "foo") + self.assertEqual(bufferblock.locator(), "acbd18db4cc2f85cedef654fccc4a4d8+3") + + bufferblock.append("bar") + + self.assertEqual(bufferblock.size(), 6) + self.assertEqual(bufferblock.buffer_view[0:6], "foobar") + self.assertEqual(bufferblock.locator(), "3858f62230ac3c915f300c664312c63f+6") + + bufferblock.state = arvados.arvfile.BufferBlock.PENDING + with self.assertRaises(arvados.errors.AssertionError): + bufferblock.append("bar") + + def test_bufferblock_dup(self): + keep = ArvadosFileWriterTestCase.MockKeep({}) + blockmanager = arvados.arvfile.BlockManager(keep) + bufferblock = blockmanager.alloc_bufferblock() + bufferblock.append("foo") + + self.assertEqual(bufferblock.size(), 3) + self.assertEqual(bufferblock.buffer_view[0:3], "foo") + self.assertEqual(bufferblock.locator(), "acbd18db4cc2f85cedef654fccc4a4d8+3") + bufferblock.state = arvados.arvfile.BufferBlock.PENDING + + bufferblock2 = blockmanager.dup_block(bufferblock.blockid, None) + self.assertNotEqual(bufferblock.blockid, bufferblock2.blockid) + + bufferblock2.append("bar") + + self.assertEqual(bufferblock2.size(), 6) + self.assertEqual(bufferblock2.buffer_view[0:6], "foobar") + self.assertEqual(bufferblock2.locator(), "3858f62230ac3c915f300c664312c63f+6") + + self.assertEqual(bufferblock.size(), 3) + self.assertEqual(bufferblock.buffer_view[0:3], "foo") + self.assertEqual(bufferblock.locator(), "acbd18db4cc2f85cedef654fccc4a4d8+3") + + def test_bufferblock_get(self): + keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}) + blockmanager = arvados.arvfile.BlockManager(keep) + bufferblock = blockmanager.alloc_bufferblock() + bufferblock.append("foo") + + self.assertEqual(blockmanager.get_block_contents("781e5e245d69b566979b86e28d23f2c7+10", 1), "0123456789") + self.assertEqual(blockmanager.get_block_contents(bufferblock.blockid, 1), "foo") + + def test_bufferblock_commit(self): + mockkeep = mock.MagicMock() + blockmanager = arvados.arvfile.BlockManager(mockkeep) + bufferblock = blockmanager.alloc_bufferblock() + bufferblock.append("foo") + blockmanager.commit_all() + self.assertTrue(mockkeep.put.called) + self.assertEqual(bufferblock.state, arvados.arvfile.BufferBlock.COMMITTED) + self.assertIsNone(bufferblock.buffer_view) + + + def test_bufferblock_commit_with_error(self): + mockkeep = mock.MagicMock() + mockkeep.put.side_effect = arvados.errors.KeepWriteError("fail") + blockmanager = arvados.arvfile.BlockManager(mockkeep) + bufferblock = blockmanager.alloc_bufferblock() + bufferblock.append("foo") + with self.assertRaises(arvados.errors.KeepWriteError) as err: + blockmanager.commit_all() + self.assertEquals(str(err.exception), "Error writing some blocks: acbd18db4cc2f85cedef654fccc4a4d8+3 raised KeepWriteError (fail)") + self.assertEqual(bufferblock.state, arvados.arvfile.BufferBlock.PENDING) diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py index fbec8b280e..5e50357af0 100644 --- a/sdk/python/tests/test_collections.py +++ b/sdk/python/tests/test_collections.py @@ -816,12 +816,12 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin): class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin): - def test_import_manifest(self): + def test_import_export_manifest(self): m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt . 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt . 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt """ - self.assertEqual(". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt\n", arvados.export_manifest(arvados.ReadOnlyCollection(m1))) + self.assertEqual(". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt\n", arvados.export_manifest(arvados.import_manifest(m1))) def test_init_manifest(self): m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt @@ -1023,10 +1023,67 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin): c1.manifest_text())) def test_notify1(self): - c1 = arvados.WritableCollection(sync=SYNC_EXPLICIT) + c1 = arvados.WritableCollection() events = [] c1.subscribe(lambda event, collection, name, item: events.append((event, collection, name, item))) - c1.find("") + f = c1.open("foo.txt", "w") + self.assertEqual(events[0], (arvados.collection.ADD, c1, "foo.txt", f.arvadosfile)) + + def test_open_w(self): + c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') + self.assertEqual(c1["count.txt"].size(), 10) + c1.open("count.txt", "w").close() + self.assertEqual(c1["count.txt"].size(), 0) + + +class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers): + MAIN_SERVER = {} + KEEP_SERVER = {} + + def test_create_and_save(self): + c = arvados.collection.createWritableCollection("hello world") + self.assertEquals(c.portable_data_hash(), "d41d8cd98f00b204e9800998ecf8427e+0") + self.assertEquals(c.api_response()["portable_data_hash"], "d41d8cd98f00b204e9800998ecf8427e+0" ) + + with c.open("count.txt", "w") as f: + f.write("0123456789") + + self.assertEquals(c.manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n") + + c.save() + + c2 = arvados.api().collections().get(uuid=c._manifest_locator).execute() + self.assertTrue(re.match(r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count.txt$", + c2["manifest_text"])) + + + def test_create_and_update(self): + c1 = arvados.collection.createWritableCollection("hello world") + self.assertEquals(c1.portable_data_hash(), "d41d8cd98f00b204e9800998ecf8427e+0") + self.assertEquals(c1.api_response()["portable_data_hash"], "d41d8cd98f00b204e9800998ecf8427e+0" ) + with c1.open("count.txt", "w") as f: + f.write("0123456789") + + self.assertEquals(c1.manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n") + + print c1.manifest_text() + c1.save() + print c1.manifest_text() + + c2 = arvados.collection.WritableCollection(c1._manifest_locator) + with c2.open("count.txt", "w") as f: + f.write("abcdefg") + + c2.save() + + self.assertNotEqual(c1.portable_data_hash(), c2.portable_data_hash()) + + print c1.manifest_text() + c1.update() + print c1.manifest_text() + + self.assertEqual(c1.portable_data_hash(), c2.portable_data_hash()) + if __name__ == '__main__': unittest.main() diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount index 1e5e9f0c9d..ca65133ce5 100755 --- a/services/fuse/bin/arv-mount +++ b/services/fuse/bin/arv-mount @@ -11,7 +11,7 @@ import time import arvados.commands._util as arv_cmd from arvados_fuse import * -from arvados.api import SafeApi +from arvados.safeapi import ThreadSafeApiCache logger = logging.getLogger('arvados.arv-mount') @@ -82,7 +82,7 @@ with "--". try: # Create the request handler operations = Operations(os.getuid(), os.getgid(), args.encoding) - api = SafeApi(arvados.config) + api = ThreadSafeApiCache(arvados.config) usr = api.users().current().execute(num_retries=args.retries) now = time.time() -- 2.30.2