From 40241ad60283c3f34fb1157d063d23252a317e31 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 16 May 2024 11:14:44 -0400 Subject: [PATCH] 15397: Remove deprecated Python SDK facilities. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/python/arvados/__init__.py | 136 +----- sdk/python/arvados/api.py | 46 -- sdk/python/arvados/collection.py | 466 -------------------- sdk/python/arvados/crunch.py | 34 -- sdk/python/arvados/stream.py | 105 ----- sdk/python/arvados/util.py | 345 --------------- sdk/python/tests/arvados_testutil.py | 16 +- sdk/python/tests/manifest_examples.py | 6 +- sdk/python/tests/test_api.py | 16 - sdk/python/tests/test_arvfile.py | 34 +- sdk/python/tests/test_collections.py | 598 +++++--------------------- sdk/python/tests/test_crunch.py | 31 -- sdk/python/tests/test_sdk.py | 48 --- sdk/python/tests/test_stream.py | 79 +--- sdk/python/tests/test_util.py | 30 -- services/fuse/tests/test_mount.py | 88 ++-- 16 files changed, 181 insertions(+), 1897 deletions(-) delete mode 100644 sdk/python/arvados/crunch.py delete mode 100644 sdk/python/arvados/stream.py delete mode 100644 sdk/python/tests/test_crunch.py delete mode 100644 sdk/python/tests/test_sdk.py diff --git a/sdk/python/arvados/__init__.py b/sdk/python/arvados/__init__.py index 83f658201c..8d9a5690b4 100644 --- a/sdk/python/arvados/__init__.py +++ b/sdk/python/arvados/__init__.py @@ -28,9 +28,8 @@ from collections import UserDict from . import api, errors, util from .api import api_from_config, http_cache -from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter +from .collection import CollectionReader from arvados.keep import * -from arvados.stream import * from .arvfile import StreamFileReader from .logging import log_format, log_date_format, log_handler from .retry import RetryLoop @@ -55,136 +54,3 @@ logger = stdliblog.getLogger('arvados') logger.addHandler(log_handler) logger.setLevel(stdliblog.DEBUG if config.get('ARVADOS_DEBUG') else stdliblog.WARNING) - -@util._deprecated('3.0', 'arvados-cwl-runner or the containers API') -def task_set_output(self, s, num_retries=5): - for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0): - try: - return api('v1').job_tasks().update( - uuid=self['uuid'], - body={ - 'output':s, - 'success':True, - 'progress':1.0 - }).execute() - except errors.ApiError as error: - if retry.check_http_response_success(error.resp.status) is None and tries_left > 0: - logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left)) - else: - raise - -_current_task = None -@util._deprecated('3.0', 'arvados-cwl-runner or the containers API') -def current_task(num_retries=5): - global _current_task - if _current_task: - return _current_task - - for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2): - try: - task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute() - task = UserDict(task) - task.set_output = types.MethodType(task_set_output, task) - task.tmpdir = os.environ['TASK_WORK'] - _current_task = task - return task - except errors.ApiError as error: - if retry.check_http_response_success(error.resp.status) is None and tries_left > 0: - logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left)) - else: - raise - -_current_job = None -@util._deprecated('3.0', 'arvados-cwl-runner or the containers API') -def current_job(num_retries=5): - global _current_job - if _current_job: - return _current_job - - for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2): - try: - job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute() - job = UserDict(job) - job.tmpdir = os.environ['JOB_WORK'] - _current_job = job - return job - except errors.ApiError as error: - if retry.check_http_response_success(error.resp.status) is None and tries_left > 0: - logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left)) - else: - raise - -@util._deprecated('3.0', 'arvados-cwl-runner or the containers API') -def getjobparam(*args): - return current_job()['script_parameters'].get(*args) - -@util._deprecated('3.0', 'arvados-cwl-runner or the containers API') -def get_job_param_mount(*args): - return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args)) - -@util._deprecated('3.0', 'arvados-cwl-runner or the containers API') -def get_task_param_mount(*args): - return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args)) - -class JobTask(object): - @util._deprecated('3.0', 'arvados-cwl-runner or the containers API') - def __init__(self, parameters=dict(), runtime_constraints=dict()): - print("init jobtask %s %s" % (parameters, runtime_constraints)) - -class job_setup(object): - @staticmethod - @util._deprecated('3.0', 'arvados-cwl-runner or the containers API') - def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None): - if if_sequence != current_task()['sequence']: - return - - if not api_client: - api_client = api('v1') - - job_input = current_job()['script_parameters']['input'] - cr = CollectionReader(job_input, api_client=api_client) - cr.normalize() - for s in cr.all_streams(): - for f in s.all_files(): - if input_as_path: - task_input = os.path.join(job_input, s.name(), f.name()) - else: - task_input = f.as_manifest() - new_task_attrs = { - 'job_uuid': current_job()['uuid'], - 'created_by_job_task_uuid': current_task()['uuid'], - 'sequence': if_sequence + 1, - 'parameters': { - 'input':task_input - } - } - api_client.job_tasks().create(body=new_task_attrs).execute() - if and_end_task: - api_client.job_tasks().update(uuid=current_task()['uuid'], - body={'success':True} - ).execute() - exit(0) - - @staticmethod - @util._deprecated('3.0', 'arvados-cwl-runner or the containers API') - def one_task_per_input_stream(if_sequence=0, and_end_task=True): - if if_sequence != current_task()['sequence']: - return - job_input = current_job()['script_parameters']['input'] - cr = CollectionReader(job_input) - for s in cr.all_streams(): - task_input = s.tokens() - new_task_attrs = { - 'job_uuid': current_job()['uuid'], - 'created_by_job_task_uuid': current_task()['uuid'], - 'sequence': if_sequence + 1, - 'parameters': { - 'input':task_input - } - } - api('v1').job_tasks().create(body=new_task_attrs).execute() - if and_end_task: - api('v1').job_tasks().update(uuid=current_task()['uuid'], - body={'success':True} - ).execute() - exit(0) diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py index 8a17e42fcb..0cfaf19238 100644 --- a/sdk/python/arvados/api.py +++ b/sdk/python/arvados/api.py @@ -496,49 +496,3 @@ def api_from_config( docstring for more information about their meaning. """ return api(**api_kwargs_from_config(version, apiconfig, **kwargs)) - -class OrderedJsonModel(apiclient.model.JsonModel): - """Model class for JSON that preserves the contents' order - - .. WARNING:: Deprecated - This model is redundant now that Python dictionaries preserve insertion - ordering. Code that passes this model to API constructors can remove it. - - In Python versions before 3.6, API clients that cared about preserving the - order of fields in API server responses could use this model to do so. - Typical usage looked like: - - from arvados.api import OrderedJsonModel - client = arvados.api('v1', ..., model=OrderedJsonModel()) - """ - @util._deprecated(preferred="the default model and rely on Python's built-in dictionary ordering") - def __init__(self, data_wrapper=False): - return super().__init__(data_wrapper) - - -RETRY_DELAY_INITIAL = 0 -""" -.. WARNING:: Deprecated - This constant was used by retry code in previous versions of the Arvados SDK. - Changing the value has no effect anymore. - Prefer passing `num_retries` to an API client constructor instead. - Refer to the constructor docstrings for details. -""" - -RETRY_DELAY_BACKOFF = 0 -""" -.. WARNING:: Deprecated - This constant was used by retry code in previous versions of the Arvados SDK. - Changing the value has no effect anymore. - Prefer passing `num_retries` to an API client constructor instead. - Refer to the constructor docstrings for details. -""" - -RETRY_COUNT = 0 -""" -.. WARNING:: Deprecated - This constant was used by retry code in previous versions of the Arvados SDK. - Changing the value has no effect anymore. - Prefer passing `num_retries` to an API client constructor instead. - Refer to the constructor docstrings for details. -""" diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 1050d4c093..bd620a4778 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -30,7 +30,6 @@ from stat import * from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock from .keep import KeepLocator, KeepClient -from .stream import StreamReader from ._normalize_stream import normalize_stream, escape from ._ranges import Range, LocatorAndRange from .safeapi import ThreadSafeApiCache @@ -1840,468 +1839,3 @@ class CollectionReader(Collection): def writable(self) -> bool: return self._in_init - - def _populate_streams(orig_func): - @functools.wraps(orig_func) - def populate_streams_wrapper(self, *args, **kwargs): - # Defer populating self._streams until needed since it creates a copy of the manifest. - if self._streams is None: - if self._manifest_text: - self._streams = [sline.split() - for sline in self._manifest_text.split("\n") - if sline] - else: - self._streams = [] - return orig_func(self, *args, **kwargs) - return populate_streams_wrapper - - @arvados.util._deprecated('3.0', 'Collection iteration') - @_populate_streams - def normalize(self): - """Normalize the streams returned by `all_streams`""" - streams = {} - for s in self.all_streams(): - for f in s.all_files(): - streamname, filename = split(s.name() + "/" + f.name()) - if streamname not in streams: - streams[streamname] = {} - if filename not in streams[streamname]: - streams[streamname][filename] = [] - for r in f.segments: - streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size)) - - self._streams = [normalize_stream(s, streams[s]) - for s in sorted(streams)] - - @arvados.util._deprecated('3.0', 'Collection iteration') - @_populate_streams - def all_streams(self): - return [StreamReader(s, self._my_keep(), num_retries=self.num_retries) - for s in self._streams] - - @arvados.util._deprecated('3.0', 'Collection iteration') - @_populate_streams - def all_files(self): - for s in self.all_streams(): - for f in s.all_files(): - yield f - - -class CollectionWriter(CollectionBase): - """Create a new collection from scratch - - .. WARNING:: Deprecated - This class is deprecated. Prefer `arvados.collection.Collection` - instead. - """ - - @arvados.util._deprecated('3.0', 'arvados.collection.Collection') - def __init__(self, api_client=None, num_retries=0, replication=None): - """Instantiate a CollectionWriter. - - CollectionWriter lets you build a new Arvados Collection from scratch. - Write files to it. The CollectionWriter will upload data to Keep as - appropriate, and provide you with the Collection manifest text when - you're finished. - - Arguments: - * api_client: The API client to use to look up Collections. If not - provided, CollectionReader will build one from available Arvados - configuration. - * num_retries: The default number of times to retry failed - service requests. Default 0. You may change this value - after instantiation, but note those changes may not - propagate to related objects like the Keep client. - * replication: The number of copies of each block to store. - If this argument is None or not supplied, replication is - the server-provided default if available, otherwise 2. - """ - self._api_client = api_client - self.num_retries = num_retries - self.replication = (2 if replication is None else replication) - self._keep_client = None - self._data_buffer = [] - self._data_buffer_len = 0 - self._current_stream_files = [] - self._current_stream_length = 0 - self._current_stream_locators = [] - self._current_stream_name = '.' - self._current_file_name = None - self._current_file_pos = 0 - self._finished_streams = [] - self._close_file = None - self._queued_file = None - self._queued_dirents = deque() - self._queued_trees = deque() - self._last_open = None - - def __exit__(self, exc_type, exc_value, traceback): - if exc_type is None: - self.finish() - - def do_queued_work(self): - # The work queue consists of three pieces: - # * _queued_file: The file object we're currently writing to the - # Collection. - # * _queued_dirents: Entries under the current directory - # (_queued_trees[0]) that we want to write or recurse through. - # This may contain files from subdirectories if - # max_manifest_depth == 0 for this directory. - # * _queued_trees: Directories that should be written as separate - # streams to the Collection. - # This function handles the smallest piece of work currently queued - # (current file, then current directory, then next directory) until - # no work remains. The _work_THING methods each do a unit of work on - # THING. _queue_THING methods add a THING to the work queue. - while True: - if self._queued_file: - self._work_file() - elif self._queued_dirents: - self._work_dirents() - elif self._queued_trees: - self._work_trees() - else: - break - - def _work_file(self): - while True: - buf = self._queued_file.read(config.KEEP_BLOCK_SIZE) - if not buf: - break - self.write(buf) - self.finish_current_file() - if self._close_file: - self._queued_file.close() - self._close_file = None - self._queued_file = None - - def _work_dirents(self): - path, stream_name, max_manifest_depth = self._queued_trees[0] - if stream_name != self.current_stream_name(): - self.start_new_stream(stream_name) - while self._queued_dirents: - dirent = self._queued_dirents.popleft() - target = os.path.join(path, dirent) - if os.path.isdir(target): - self._queue_tree(target, - os.path.join(stream_name, dirent), - max_manifest_depth - 1) - else: - self._queue_file(target, dirent) - break - if not self._queued_dirents: - self._queued_trees.popleft() - - def _work_trees(self): - path, stream_name, max_manifest_depth = self._queued_trees[0] - d = arvados.util.listdir_recursive( - path, max_depth = (None if max_manifest_depth == 0 else 0)) - if d: - self._queue_dirents(stream_name, d) - else: - self._queued_trees.popleft() - - def _queue_file(self, source, filename=None): - assert (self._queued_file is None), "tried to queue more than one file" - if not hasattr(source, 'read'): - source = open(source, 'rb') - self._close_file = True - else: - self._close_file = False - if filename is None: - filename = os.path.basename(source.name) - self.start_new_file(filename) - self._queued_file = source - - def _queue_dirents(self, stream_name, dirents): - assert (not self._queued_dirents), "tried to queue more than one tree" - self._queued_dirents = deque(sorted(dirents)) - - def _queue_tree(self, path, stream_name, max_manifest_depth): - self._queued_trees.append((path, stream_name, max_manifest_depth)) - - def write_file(self, source, filename=None): - self._queue_file(source, filename) - self.do_queued_work() - - def write_directory_tree(self, - path, stream_name='.', max_manifest_depth=-1): - self._queue_tree(path, stream_name, max_manifest_depth) - self.do_queued_work() - - def write(self, newdata): - if isinstance(newdata, bytes): - pass - elif isinstance(newdata, str): - newdata = newdata.encode() - elif hasattr(newdata, '__iter__'): - for s in newdata: - self.write(s) - return - self._data_buffer.append(newdata) - self._data_buffer_len += len(newdata) - self._current_stream_length += len(newdata) - while self._data_buffer_len >= config.KEEP_BLOCK_SIZE: - self.flush_data() - - def open(self, streampath, filename=None): - """open(streampath[, filename]) -> file-like object - - Pass in the path of a file to write to the Collection, either as a - single string or as two separate stream name and file name arguments. - This method returns a file-like object you can write to add it to the - Collection. - - You may only have one file object from the Collection open at a time, - so be sure to close the object when you're done. Using the object in - a with statement makes that easy: - - with cwriter.open('./doc/page1.txt') as outfile: - outfile.write(page1_data) - with cwriter.open('./doc/page2.txt') as outfile: - outfile.write(page2_data) - """ - if filename is None: - streampath, filename = split(streampath) - if self._last_open and not self._last_open.closed: - raise errors.AssertionError( - u"can't open '{}' when '{}' is still open".format( - filename, self._last_open.name)) - if streampath != self.current_stream_name(): - self.start_new_stream(streampath) - self.set_current_file_name(filename) - self._last_open = _WriterFile(self, filename) - return self._last_open - - def flush_data(self): - data_buffer = b''.join(self._data_buffer) - if data_buffer: - self._current_stream_locators.append( - self._my_keep().put( - data_buffer[0:config.KEEP_BLOCK_SIZE], - copies=self.replication)) - self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]] - self._data_buffer_len = len(self._data_buffer[0]) - - def start_new_file(self, newfilename=None): - self.finish_current_file() - self.set_current_file_name(newfilename) - - def set_current_file_name(self, newfilename): - if re.search(r'[\t\n]', newfilename): - raise errors.AssertionError( - "Manifest filenames cannot contain whitespace: %s" % - newfilename) - elif re.search(r'\x00', newfilename): - raise errors.AssertionError( - "Manifest filenames cannot contain NUL characters: %s" % - newfilename) - self._current_file_name = newfilename - - def current_file_name(self): - return self._current_file_name - - def finish_current_file(self): - if self._current_file_name is None: - if self._current_file_pos == self._current_stream_length: - return - raise errors.AssertionError( - "Cannot finish an unnamed file " + - "(%d bytes at offset %d in '%s' stream)" % - (self._current_stream_length - self._current_file_pos, - self._current_file_pos, - self._current_stream_name)) - self._current_stream_files.append([ - self._current_file_pos, - self._current_stream_length - self._current_file_pos, - self._current_file_name]) - self._current_file_pos = self._current_stream_length - self._current_file_name = None - - def start_new_stream(self, newstreamname='.'): - self.finish_current_stream() - self.set_current_stream_name(newstreamname) - - def set_current_stream_name(self, newstreamname): - if re.search(r'[\t\n]', newstreamname): - raise errors.AssertionError( - "Manifest stream names cannot contain whitespace: '%s'" % - (newstreamname)) - self._current_stream_name = '.' if newstreamname=='' else newstreamname - - def current_stream_name(self): - return self._current_stream_name - - def finish_current_stream(self): - self.finish_current_file() - self.flush_data() - if not self._current_stream_files: - pass - elif self._current_stream_name is None: - raise errors.AssertionError( - "Cannot finish an unnamed stream (%d bytes in %d files)" % - (self._current_stream_length, len(self._current_stream_files))) - else: - if not self._current_stream_locators: - self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) - self._finished_streams.append([self._current_stream_name, - self._current_stream_locators, - self._current_stream_files]) - self._current_stream_files = [] - self._current_stream_length = 0 - self._current_stream_locators = [] - self._current_stream_name = None - self._current_file_pos = 0 - self._current_file_name = None - - def finish(self): - """Store the manifest in Keep and return its locator. - - This is useful for storing manifest fragments (task outputs) - temporarily in Keep during a Crunch job. - - In other cases you should make a collection instead, by - sending manifest_text() to the API server's "create - collection" endpoint. - """ - return self._my_keep().put(self.manifest_text().encode(), - copies=self.replication) - - def portable_data_hash(self): - stripped = self.stripped_manifest().encode() - return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) - - def manifest_text(self): - self.finish_current_stream() - manifest = '' - - for stream in self._finished_streams: - if not re.search(r'^\.(/.*)?$', stream[0]): - manifest += './' - manifest += stream[0].replace(' ', '\\040') - manifest += ' ' + ' '.join(stream[1]) - manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) - manifest += "\n" - - return manifest - - def data_locators(self): - ret = [] - for name, locators, files in self._finished_streams: - ret += locators - return ret - - def save_new(self, name=None): - return self._api_client.collections().create( - ensure_unique_name=True, - body={ - 'name': name, - 'manifest_text': self.manifest_text(), - }).execute(num_retries=self.num_retries) - - -class ResumableCollectionWriter(CollectionWriter): - """CollectionWriter that can serialize internal state to disk - - .. WARNING:: Deprecated - This class is deprecated. Prefer `arvados.collection.Collection` - instead. - """ - - STATE_PROPS = ['_current_stream_files', '_current_stream_length', - '_current_stream_locators', '_current_stream_name', - '_current_file_name', '_current_file_pos', '_close_file', - '_data_buffer', '_dependencies', '_finished_streams', - '_queued_dirents', '_queued_trees'] - - @arvados.util._deprecated('3.0', 'arvados.collection.Collection') - def __init__(self, api_client=None, **kwargs): - self._dependencies = {} - super(ResumableCollectionWriter, self).__init__(api_client, **kwargs) - - @classmethod - def from_state(cls, state, *init_args, **init_kwargs): - # Try to build a new writer from scratch with the given state. - # If the state is not suitable to resume (because files have changed, - # been deleted, aren't predictable, etc.), raise a - # StaleWriterStateError. Otherwise, return the initialized writer. - # The caller is responsible for calling writer.do_queued_work() - # appropriately after it's returned. - writer = cls(*init_args, **init_kwargs) - for attr_name in cls.STATE_PROPS: - attr_value = state[attr_name] - attr_class = getattr(writer, attr_name).__class__ - # Coerce the value into the same type as the initial value, if - # needed. - if attr_class not in (type(None), attr_value.__class__): - attr_value = attr_class(attr_value) - setattr(writer, attr_name, attr_value) - # Check dependencies before we try to resume anything. - if any(KeepLocator(ls).permission_expired() - for ls in writer._current_stream_locators): - raise errors.StaleWriterStateError( - "locators include expired permission hint") - writer.check_dependencies() - if state['_current_file'] is not None: - path, pos = state['_current_file'] - try: - writer._queued_file = open(path, 'rb') - writer._queued_file.seek(pos) - except IOError as error: - raise errors.StaleWriterStateError( - u"failed to reopen active file {}: {}".format(path, error)) - return writer - - def check_dependencies(self): - for path, orig_stat in self._dependencies.items(): - if not S_ISREG(orig_stat[ST_MODE]): - raise errors.StaleWriterStateError(u"{} not file".format(path)) - try: - now_stat = tuple(os.stat(path)) - except OSError as error: - raise errors.StaleWriterStateError( - u"failed to stat {}: {}".format(path, error)) - if ((not S_ISREG(now_stat[ST_MODE])) or - (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or - (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): - raise errors.StaleWriterStateError(u"{} changed".format(path)) - - def dump_state(self, copy_func=lambda x: x): - state = {attr: copy_func(getattr(self, attr)) - for attr in self.STATE_PROPS} - if self._queued_file is None: - state['_current_file'] = None - else: - state['_current_file'] = (os.path.realpath(self._queued_file.name), - self._queued_file.tell()) - return state - - def _queue_file(self, source, filename=None): - try: - src_path = os.path.realpath(source) - except Exception: - raise errors.AssertionError(u"{} not a file path".format(source)) - try: - path_stat = os.stat(src_path) - except OSError as stat_error: - path_stat = None - super(ResumableCollectionWriter, self)._queue_file(source, filename) - fd_stat = os.fstat(self._queued_file.fileno()) - if not S_ISREG(fd_stat.st_mode): - # We won't be able to resume from this cache anyway, so don't - # worry about further checks. - self._dependencies[source] = tuple(fd_stat) - elif path_stat is None: - raise errors.AssertionError( - u"could not stat {}: {}".format(source, stat_error)) - elif path_stat.st_ino != fd_stat.st_ino: - raise errors.AssertionError( - u"{} changed between open and stat calls".format(source)) - else: - self._dependencies[src_path] = tuple(fd_stat) - - def write(self, data): - if self._queued_file is None: - raise errors.AssertionError( - "resumable writer can't accept unsourced data") - return super(ResumableCollectionWriter, self).write(data) diff --git a/sdk/python/arvados/crunch.py b/sdk/python/arvados/crunch.py deleted file mode 100644 index 57cf2e01ef..0000000000 --- a/sdk/python/arvados/crunch.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import json -import os - -from . import util - -class TaskOutputDir(object): - """Keep-backed directory for staging outputs of Crunch tasks. - - Example, in a crunch task whose output is a file called "out.txt" - containing "42": - - import arvados - import arvados.crunch - import os - - out = arvados.crunch.TaskOutputDir() - with open(os.path.join(out.path, 'out.txt'), 'w') as f: - f.write('42') - arvados.current_task().set_output(out.manifest_text()) - """ - @util._deprecated('3.0', 'arvados-cwl-runner or the containers API') - def __init__(self): - self.path = os.environ['TASK_KEEPMOUNT_TMP'] - - def __str__(self): - return self.path - - def manifest_text(self): - snapshot = os.path.join(self.path, '.arvados#collection') - return json.load(open(snapshot))['manifest_text'] diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py deleted file mode 100644 index ff541e5716..0000000000 --- a/sdk/python/arvados/stream.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import collections -import hashlib -import os -import re -import threading -import functools -import copy - -from ._ranges import locators_and_ranges, Range -from .arvfile import StreamFileReader -from arvados.retry import retry_method -from arvados.keep import * -from . import config -from . import errors -from . import util -from ._normalize_stream import normalize_stream - -class StreamReader(object): - @util._deprecated('3.0', 'arvados.collection.Collecttion') - def __init__(self, tokens, keep=None, debug=False, _empty=False, - num_retries=10): - self._stream_name = None - self._data_locators = [] - self._files = collections.OrderedDict() - self._keep = keep - self.num_retries = num_retries - - streamoffset = 0 - - # parse stream - for tok in tokens: - if debug: print('tok', tok) - if self._stream_name is None: - self._stream_name = tok.replace('\\040', ' ') - continue - - s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok) - if s: - blocksize = int(s.group(1)) - self._data_locators.append(Range(tok, streamoffset, blocksize, 0)) - streamoffset += blocksize - continue - - s = re.search(r'^(\d+):(\d+):(\S+)', tok) - if s: - pos = int(s.group(1)) - size = int(s.group(2)) - name = s.group(3).replace('\\040', ' ') - if name not in self._files: - self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name) - else: - filereader = self._files[name] - filereader.segments.append(Range(pos, filereader.size(), size)) - continue - - raise errors.SyntaxError("Invalid manifest format") - - def name(self): - return self._stream_name - - def files(self): - return self._files - - def all_files(self): - return list(self._files.values()) - - def size(self): - n = self._data_locators[-1] - return n.range_start + n.range_size - - def locators_and_ranges(self, range_start, range_size): - return locators_and_ranges(self._data_locators, range_start, range_size) - - @retry_method - def _keepget(self, locator, num_retries=None): - return self._keep.get(locator, num_retries=num_retries) - - @retry_method - def readfrom(self, start, size, num_retries=None): - """Read up to 'size' bytes from the stream, starting at 'start'""" - if size == 0: - return b'' - if self._keep is None: - self._keep = KeepClient(num_retries=self.num_retries) - data = [] - for lr in locators_and_ranges(self._data_locators, start, size): - data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) - return b''.join(data) - - def manifest_text(self, strip=False): - manifest_text = [self.name().replace(' ', '\\040')] - if strip: - for d in self._data_locators: - m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator) - manifest_text.append(m.group(0)) - else: - manifest_text.extend([d.locator for d in self._data_locators]) - manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040')) - for seg in f.segments]) - for f in self._files.values()]) - return ' '.join(manifest_text) + '\n' diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py index 050c67f68d..521bfdeefd 100644 --- a/sdk/python/arvados/util.py +++ b/sdk/python/arvados/util.py @@ -66,13 +66,6 @@ link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}') """Regular expression to match any Arvados link UUID""" user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}') """Regular expression to match any Arvados user UUID""" -job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}') -"""Regular expression to match any Arvados job UUID - -.. WARNING:: Deprecated - Arvados job resources are deprecated and will be removed in a future - release. Prefer the containers API instead. -""" def _deprecated(version=None, preferred=None): """Mark a callable as deprecated in the SDK @@ -395,341 +388,3 @@ def list_all(fn, num_retries=0, **kwargs): items_available = c['items_available'] offset = c['offset'] + len(c['items']) return items - -@_deprecated('3.0') -def clear_tmpdir(path=None): - """ - Ensure the given directory (or TASK_TMPDIR if none given) - exists and is empty. - """ - from arvados import current_task - if path is None: - path = current_task().tmpdir - if os.path.exists(path): - p = subprocess.Popen(['rm', '-rf', path]) - stdout, stderr = p.communicate(None) - if p.returncode != 0: - raise Exception('rm -rf %s: %s' % (path, stderr)) - os.mkdir(path) - -@_deprecated('3.0', 'subprocess.run') -def run_command(execargs, **kwargs): - kwargs.setdefault('stdin', subprocess.PIPE) - kwargs.setdefault('stdout', subprocess.PIPE) - kwargs.setdefault('stderr', sys.stderr) - kwargs.setdefault('close_fds', True) - kwargs.setdefault('shell', False) - p = subprocess.Popen(execargs, **kwargs) - stdoutdata, stderrdata = p.communicate(None) - if p.returncode != 0: - raise arvados.errors.CommandFailedError( - "run_command %s exit %d:\n%s" % - (execargs, p.returncode, stderrdata)) - return stdoutdata, stderrdata - -@_deprecated('3.0') -def git_checkout(url, version, path): - from arvados import current_job - if not re.search('^/', path): - path = os.path.join(current_job().tmpdir, path) - if not os.path.exists(path): - run_command(["git", "clone", url, path], - cwd=os.path.dirname(path)) - run_command(["git", "checkout", version], - cwd=path) - return path - -@_deprecated('3.0') -def tar_extractor(path, decompress_flag): - return subprocess.Popen(["tar", - "-C", path, - ("-x%sf" % decompress_flag), - "-"], - stdout=None, - stdin=subprocess.PIPE, stderr=sys.stderr, - shell=False, close_fds=True) - -@_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module') -def tarball_extract(tarball, path): - """Retrieve a tarball from Keep and extract it to a local - directory. Return the absolute path where the tarball was - extracted. If the top level of the tarball contained just one - file or directory, return the absolute path of that single - item. - - tarball -- collection locator - path -- where to extract the tarball: absolute, or relative to job tmp - """ - from arvados import current_job - from arvados.collection import CollectionReader - if not re.search('^/', path): - path = os.path.join(current_job().tmpdir, path) - lockfile = open(path + '.lock', 'w') - fcntl.flock(lockfile, fcntl.LOCK_EX) - try: - os.stat(path) - except OSError: - os.mkdir(path) - already_have_it = False - try: - if os.readlink(os.path.join(path, '.locator')) == tarball: - already_have_it = True - except OSError: - pass - if not already_have_it: - - # emulate "rm -f" (i.e., if the file does not exist, we win) - try: - os.unlink(os.path.join(path, '.locator')) - except OSError: - if os.path.exists(os.path.join(path, '.locator')): - os.unlink(os.path.join(path, '.locator')) - - for f in CollectionReader(tarball).all_files(): - f_name = f.name() - if f_name.endswith(('.tbz', '.tar.bz2')): - p = tar_extractor(path, 'j') - elif f_name.endswith(('.tgz', '.tar.gz')): - p = tar_extractor(path, 'z') - elif f_name.endswith('.tar'): - p = tar_extractor(path, '') - else: - raise arvados.errors.AssertionError( - "tarball_extract cannot handle filename %s" % f.name()) - while True: - buf = f.read(2**20) - if len(buf) == 0: - break - p.stdin.write(buf) - p.stdin.close() - p.wait() - if p.returncode != 0: - lockfile.close() - raise arvados.errors.CommandFailedError( - "tar exited %d" % p.returncode) - os.symlink(tarball, os.path.join(path, '.locator')) - tld_extracts = [f for f in os.listdir(path) if f != '.locator'] - lockfile.close() - if len(tld_extracts) == 1: - return os.path.join(path, tld_extracts[0]) - return path - -@_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module') -def zipball_extract(zipball, path): - """Retrieve a zip archive from Keep and extract it to a local - directory. Return the absolute path where the archive was - extracted. If the top level of the archive contained just one - file or directory, return the absolute path of that single - item. - - zipball -- collection locator - path -- where to extract the archive: absolute, or relative to job tmp - """ - from arvados import current_job - from arvados.collection import CollectionReader - if not re.search('^/', path): - path = os.path.join(current_job().tmpdir, path) - lockfile = open(path + '.lock', 'w') - fcntl.flock(lockfile, fcntl.LOCK_EX) - try: - os.stat(path) - except OSError: - os.mkdir(path) - already_have_it = False - try: - if os.readlink(os.path.join(path, '.locator')) == zipball: - already_have_it = True - except OSError: - pass - if not already_have_it: - - # emulate "rm -f" (i.e., if the file does not exist, we win) - try: - os.unlink(os.path.join(path, '.locator')) - except OSError: - if os.path.exists(os.path.join(path, '.locator')): - os.unlink(os.path.join(path, '.locator')) - - for f in CollectionReader(zipball).all_files(): - if not f.name().endswith('.zip'): - raise arvados.errors.NotImplementedError( - "zipball_extract cannot handle filename %s" % f.name()) - zip_filename = os.path.join(path, os.path.basename(f.name())) - zip_file = open(zip_filename, 'wb') - while True: - buf = f.read(2**20) - if len(buf) == 0: - break - zip_file.write(buf) - zip_file.close() - - p = subprocess.Popen(["unzip", - "-q", "-o", - "-d", path, - zip_filename], - stdout=None, - stdin=None, stderr=sys.stderr, - shell=False, close_fds=True) - p.wait() - if p.returncode != 0: - lockfile.close() - raise arvados.errors.CommandFailedError( - "unzip exited %d" % p.returncode) - os.unlink(zip_filename) - os.symlink(zipball, os.path.join(path, '.locator')) - tld_extracts = [f for f in os.listdir(path) if f != '.locator'] - lockfile.close() - if len(tld_extracts) == 1: - return os.path.join(path, tld_extracts[0]) - return path - -@_deprecated('3.0', 'arvados.collection.Collection') -def collection_extract(collection, path, files=[], decompress=True): - """Retrieve a collection from Keep and extract it to a local - directory. Return the absolute path where the collection was - extracted. - - collection -- collection locator - path -- where to extract: absolute, or relative to job tmp - """ - from arvados import current_job - from arvados.collection import CollectionReader - matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection) - if matches: - collection_hash = matches.group(1) - else: - collection_hash = hashlib.md5(collection).hexdigest() - if not re.search('^/', path): - path = os.path.join(current_job().tmpdir, path) - lockfile = open(path + '.lock', 'w') - fcntl.flock(lockfile, fcntl.LOCK_EX) - try: - os.stat(path) - except OSError: - os.mkdir(path) - already_have_it = False - try: - if os.readlink(os.path.join(path, '.locator')) == collection_hash: - already_have_it = True - except OSError: - pass - - # emulate "rm -f" (i.e., if the file does not exist, we win) - try: - os.unlink(os.path.join(path, '.locator')) - except OSError: - if os.path.exists(os.path.join(path, '.locator')): - os.unlink(os.path.join(path, '.locator')) - - files_got = [] - for s in CollectionReader(collection).all_streams(): - stream_name = s.name() - for f in s.all_files(): - if (files == [] or - ((f.name() not in files_got) and - (f.name() in files or - (decompress and f.decompressed_name() in files)))): - outname = f.decompressed_name() if decompress else f.name() - files_got += [outname] - if os.path.exists(os.path.join(path, stream_name, outname)): - continue - mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname))) - outfile = open(os.path.join(path, stream_name, outname), 'wb') - for buf in (f.readall_decompressed() if decompress - else f.readall()): - outfile.write(buf) - outfile.close() - if len(files_got) < len(files): - raise arvados.errors.AssertionError( - "Wanted files %s but only got %s from %s" % - (files, files_got, - [z.name() for z in CollectionReader(collection).all_files()])) - os.symlink(collection_hash, os.path.join(path, '.locator')) - - lockfile.close() - return path - -@_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)') -def mkdir_dash_p(path): - if not os.path.isdir(path): - try: - os.makedirs(path) - except OSError as e: - if e.errno == errno.EEXIST and os.path.isdir(path): - # It is not an error if someone else creates the - # directory between our exists() and makedirs() calls. - pass - else: - raise - -@_deprecated('3.0', 'arvados.collection.Collection') -def stream_extract(stream, path, files=[], decompress=True): - """Retrieve a stream from Keep and extract it to a local - directory. Return the absolute path where the stream was - extracted. - - stream -- StreamReader object - path -- where to extract: absolute, or relative to job tmp - """ - from arvados import current_job - if not re.search('^/', path): - path = os.path.join(current_job().tmpdir, path) - lockfile = open(path + '.lock', 'w') - fcntl.flock(lockfile, fcntl.LOCK_EX) - try: - os.stat(path) - except OSError: - os.mkdir(path) - - files_got = [] - for f in stream.all_files(): - if (files == [] or - ((f.name() not in files_got) and - (f.name() in files or - (decompress and f.decompressed_name() in files)))): - outname = f.decompressed_name() if decompress else f.name() - files_got += [outname] - if os.path.exists(os.path.join(path, outname)): - os.unlink(os.path.join(path, outname)) - mkdir_dash_p(os.path.dirname(os.path.join(path, outname))) - outfile = open(os.path.join(path, outname), 'wb') - for buf in (f.readall_decompressed() if decompress - else f.readall()): - outfile.write(buf) - outfile.close() - if len(files_got) < len(files): - raise arvados.errors.AssertionError( - "Wanted files %s but only got %s from %s" % - (files, files_got, [z.name() for z in stream.all_files()])) - lockfile.close() - return path - -@_deprecated('3.0', 'os.walk') -def listdir_recursive(dirname, base=None, max_depth=None): - """listdir_recursive(dirname, base, max_depth) - - Return a list of file and directory names found under dirname. - - If base is not None, prepend "{base}/" to each returned name. - - If max_depth is None, descend into directories and return only the - names of files found in the directory tree. - - If max_depth is a non-negative integer, stop descending into - directories at the given depth, and at that point return directory - names instead. - - If max_depth==0 (and base is None) this is equivalent to - sorted(os.listdir(dirname)). - """ - allfiles = [] - for ent in sorted(os.listdir(dirname)): - ent_path = os.path.join(dirname, ent) - ent_base = os.path.join(base, ent) if base else ent - if os.path.isdir(ent_path) and max_depth != 0: - allfiles += listdir_recursive( - ent_path, base=ent_base, - max_depth=(max_depth-1 if max_depth else None)) - else: - allfiles += [ent_base] - return allfiles diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py index 1f1f796a9c..6c2e439c2a 100644 --- a/sdk/python/tests/arvados_testutil.py +++ b/sdk/python/tests/arvados_testutil.py @@ -139,6 +139,7 @@ class FakeCurl(object): return self._resp_code raise Exception + def mock_keep_responses(body, *codes, **headers): """Patch pycurl to return fake responses and raise exceptions. @@ -164,21 +165,6 @@ def mock_keep_responses(body, *codes, **headers): return mock.patch('pycurl.Curl', cm) -class MockStreamReader(object): - def __init__(self, name='.', *data): - self._name = name - self._data = b''.join([ - b if isinstance(b, bytes) else b.encode() - for b in data]) - self._data_locators = [str_keep_locator(d) for d in data] - self.num_retries = 0 - - def name(self): - return self._name - - def readfrom(self, start, size, num_retries=None): - return self._data[start:start + size] - class ApiClientMock(object): def api_client_mock(self): api_mock = mock.MagicMock(name='api_client_mock') diff --git a/sdk/python/tests/manifest_examples.py b/sdk/python/tests/manifest_examples.py index c1945d03d9..6f448c0e58 100644 --- a/sdk/python/tests/manifest_examples.py +++ b/sdk/python/tests/manifest_examples.py @@ -12,13 +12,13 @@ class ManifestExamples(object): blocks_per_file=1, files_per_stream=1, streams=1): - datablip = 'x' * bytes_per_block + datablip = b'x' * bytes_per_block data_loc = tutil.str_keep_locator(datablip) with tutil.mock_keep_responses(data_loc, 200): - coll = arvados.CollectionWriter() + coll = arvados.collection.Collection() for si in range(0, streams): for fi in range(0, files_per_stream): - with coll.open("stream{}/file{}.txt".format(si, fi)) as f: + with coll.open("stream{}/file{}.txt".format(si, fi), 'wb') as f: for bi in range(0, blocks_per_file): f.write(datablip) return coll.manifest_text() diff --git a/sdk/python/tests/test_api.py b/sdk/python/tests/test_api.py index 7d7cc9ba59..2da05e4d99 100644 --- a/sdk/python/tests/test_api.py +++ b/sdk/python/tests/test_api.py @@ -26,7 +26,6 @@ from arvados.api import ( api_client, normalize_api_kwargs, api_kwargs_from_config, - OrderedJsonModel, _googleapiclient_log_lock, ) from .arvados_testutil import fake_httplib2_response, mock_api_responses, queue_with @@ -201,21 +200,6 @@ class ArvadosApiTest(run_test_server.TestCaseWithServers): self.assertEqual(response.status, code) self.assertEqual(response.get('status'), str(code)) - def test_ordered_json_model(self): - mock_responses = { - 'arvados.collections.get': ( - None, - json.dumps(collections.OrderedDict( - (c, int(c, 16)) for c in string.hexdigits - )).encode(), - ), - } - req_builder = apiclient_http.RequestMockBuilder(mock_responses) - api = arvados.api('v1', - requestBuilder=req_builder, model=OrderedJsonModel()) - result = api.collections().get(uuid='test').execute() - self.assertEqual(string.hexdigits, ''.join(list(result.keys()))) - def test_api_is_threadsafe(self): api_kwargs = { 'host': os.environ['ARVADOS_API_HOST'], diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py index 6bcba9a81d..20e40527fd 100644 --- a/sdk/python/tests/test_arvfile.py +++ b/sdk/python/tests/test_arvfile.py @@ -16,7 +16,7 @@ from arvados.collection import Collection from arvados.arvfile import ArvadosFile, ArvadosFileReader from . import arvados_testutil as tutil -from .test_stream import StreamFileReaderTestCase, StreamRetryTestMixin +from .test_stream import StreamFileReaderTestMixin, StreamRetryTestMixin class ArvadosFileWriterTestCase(unittest.TestCase): class MockKeep(object): @@ -620,7 +620,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase): self.assertEqual(b"01234567", keep.get("2e9ec317e197819358fbc43afca7d837+8")) -class ArvadosFileReaderTestCase(StreamFileReaderTestCase): +class ArvadosFileReaderTestCase(unittest.TestCase, StreamFileReaderTestMixin): class MockParent(object): class MockBlockMgr(object): def __init__(self, blocks, nocache): @@ -649,6 +649,11 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase): return ArvadosFileReaderTestCase.MockParent.MockBlockMgr(self.blocks, self.nocache) + def make_file_reader(self, name='emptyfile', data='', nocache=False): + loc = tutil.str_keep_locator(data) + af = ArvadosFile(ArvadosFileReaderTestCase.MockParent({loc: data}, nocache=nocache), name, stream=[Range(loc, 0, len(data))], segments=[Range(0, len(data), len(data))]) + return ArvadosFileReader(af, mode='rb') + def make_count_reader(self, nocache=False): stream = [] n = 0 @@ -658,7 +663,21 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase): blocks[loc] = d stream.append(Range(loc, n, len(d))) n += len(d) - af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), "count.txt", stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)]) + af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache=nocache), "count.txt", stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)]) + return ArvadosFileReader(af, mode="rb") + + def make_newlines_reader(self, nocache=False): + stream = [] + segments = [] + n = 0 + blocks = {} + for d in [b'one\ntwo\n\nth', b'ree\nfour\n\n']: + loc = tutil.str_keep_locator(d) + blocks[loc] = d + stream.append(Range(loc, n, len(d))) + segments.append(Range(n, len(d), n+len(d))) + n += len(d) + af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache=nocache), "count.txt", stream=stream, segments=segments) return ArvadosFileReader(af, mode="rb") def test_read_block_crossing_behavior(self): @@ -667,16 +686,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase): sfile = self.make_count_reader(nocache=True) self.assertEqual(b'12345678', sfile.read(8)) - def test_successive_reads(self): - # Override StreamFileReaderTestCase.test_successive_reads - sfile = self.make_count_reader(nocache=True) - self.assertEqual(b'1234', sfile.read(4)) - self.assertEqual(b'5678', sfile.read(4)) - self.assertEqual(b'9', sfile.read(4)) - self.assertEqual(b'', sfile.read(4)) - def test_tell_after_block_read(self): - # Override StreamFileReaderTestCase.test_tell_after_block_read sfile = self.make_count_reader(nocache=True) self.assertEqual(b'12345678', sfile.read(8)) self.assertEqual(8, sfile.tell()) diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py index 5d574856dd..2f40c58bbb 100644 --- a/sdk/python/tests/test_collections.py +++ b/sdk/python/tests/test_collections.py @@ -17,17 +17,11 @@ import parameterized from unittest import mock from . import run_test_server -from arvados._ranges import Range, LocatorAndRange +from arvados._ranges import Range, LocatorAndRange, locators_and_ranges from arvados.collection import Collection, CollectionReader from . import arvados_testutil as tutil from .arvados_testutil import make_block_cache -class TestResumableWriter(arvados.ResumableCollectionWriter): - KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K. - - def current_state(self): - return self.dump_state(copy.deepcopy) - @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}]) class ArvadosCollectionsTest(run_test_server.TestCaseWithServers, tutil.ArvadosBaseTestCase): @@ -45,22 +39,15 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers, block_cache=make_block_cache(cls.disk_cache)) def write_foo_bar_baz(self): - cw = arvados.CollectionWriter(self.api_client) - self.assertEqual(cw.current_stream_name(), '.', - 'current_stream_name() should be "." now') - cw.set_current_file_name('foo.txt') - cw.write(b'foo') - self.assertEqual(cw.current_file_name(), 'foo.txt', - 'current_file_name() should be foo.txt now') - cw.start_new_file('bar.txt') - cw.write(b'bar') - cw.start_new_stream('baz') - cw.write(b'baz') - cw.set_current_file_name('baz.txt') - self.assertEqual(cw.manifest_text(), - ". 3858f62230ac3c915f300c664312c63f+6 0:3:foo.txt 3:3:bar.txt\n" + - "./baz 73feffa4b7f6bb68e44cf984c85f6e88+3 0:3:baz.txt\n", - "wrong manifest: got {}".format(cw.manifest_text())) + with arvados.collection.Collection(api_client=self.api_client).open('zzz', 'wb') as f: + f.write(b'foobar') + f.flush() + f.write(b'baz') + cw = arvados.collection.Collection( + api_client=self.api_client, + manifest_locator_or_text= + ". 3858f62230ac3c915f300c664312c63f+6 0:3:foo.txt 3:3:bar.txt\n" + + "./baz 73feffa4b7f6bb68e44cf984c85f6e88+3 0:3:baz.txt\n") cw.save_new() return cw.portable_data_hash() @@ -77,101 +64,34 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers, '23ca013983d6239e98931cc779e68426+114', 'wrong locator hash: ' + self.write_foo_bar_baz()) - def test_local_collection_reader(self): - foobarbaz = self.write_foo_bar_baz() - cr = arvados.CollectionReader( - foobarbaz + '+Xzizzle', self.api_client) - got = [] - for s in cr.all_streams(): - for f in s.all_files(): - got += [[f.size(), f.stream_name(), f.name(), f.read(2**26)]] - expected = [[3, '.', 'foo.txt', b'foo'], - [3, '.', 'bar.txt', b'bar'], - [3, './baz', 'baz.txt', b'baz']] - self.assertEqual(got, - expected) - stream0 = cr.all_streams()[0] - self.assertEqual(stream0.readfrom(0, 0), - b'', - 'reading zero bytes should have returned empty string') - self.assertEqual(stream0.readfrom(0, 2**26), - b'foobar', - 'reading entire stream failed') - self.assertEqual(stream0.readfrom(2**26, 0), - b'', - 'reading zero bytes should have returned empty string') - self.assertEqual(3, len(cr)) - self.assertTrue(cr) - - def _test_subset(self, collection, expected): - cr = arvados.CollectionReader(collection, self.api_client) - for s in cr.all_streams(): - for ex in expected: - if ex[0] == s: - f = s.files()[ex[2]] - got = [f.size(), f.stream_name(), f.name(), "".join(f.readall(2**26))] - self.assertEqual(got, - ex, - 'all_files|as_manifest did not preserve manifest contents: got %s expected %s' % (got, ex)) - - def test_collection_manifest_subset(self): - foobarbaz = self.write_foo_bar_baz() - self._test_subset(foobarbaz, - [[3, '.', 'bar.txt', b'bar'], - [3, '.', 'foo.txt', b'foo'], - [3, './baz', 'baz.txt', b'baz']]) - self._test_subset((". %s %s 0:3:foo.txt 3:3:bar.txt\n" % - (self.keep_client.put(b"foo"), - self.keep_client.put(b"bar"))), - [[3, '.', 'bar.txt', b'bar'], - [3, '.', 'foo.txt', b'foo']]) - self._test_subset((". %s %s 0:2:fo.txt 2:4:obar.txt\n" % - (self.keep_client.put(b"foo"), - self.keep_client.put(b"bar"))), - [[2, '.', 'fo.txt', b'fo'], - [4, '.', 'obar.txt', b'obar']]) - self._test_subset((". %s %s 0:2:fo.txt 2:0:zero.txt 2:2:ob.txt 4:2:ar.txt\n" % - (self.keep_client.put(b"foo"), - self.keep_client.put(b"bar"))), - [[2, '.', 'ar.txt', b'ar'], - [2, '.', 'fo.txt', b'fo'], - [2, '.', 'ob.txt', b'ob'], - [0, '.', 'zero.txt', b'']]) - def test_collection_empty_file(self): - cw = arvados.CollectionWriter(self.api_client) - cw.start_new_file('zero.txt') - cw.write(b'') + cw = arvados.collection.Collection(api_client=self.api_client) + with cw.open('zero.txt', 'wb') as f: + pass self.assertEqual(cw.manifest_text(), ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:zero.txt\n") self.check_manifest_file_sizes(cw.manifest_text(), [0]) - cw = arvados.CollectionWriter(self.api_client) - cw.start_new_file('zero.txt') - cw.write(b'') - cw.start_new_file('one.txt') - cw.write(b'1') - cw.start_new_stream('foo') - cw.start_new_file('zero.txt') - cw.write(b'') - self.check_manifest_file_sizes(cw.manifest_text(), [0,1,0]) - - def test_no_implicit_normalize(self): - cw = arvados.CollectionWriter(self.api_client) - cw.start_new_file('b') - cw.write(b'b') - cw.start_new_file('a') - cw.write(b'') - self.check_manifest_file_sizes(cw.manifest_text(), [1,0]) - self.check_manifest_file_sizes( - arvados.CollectionReader( - cw.manifest_text()).manifest_text(normalize=True), - [0,1]) + + cw = arvados.collection.Collection(api_client=self.api_client) + with cw.open('zero.txt', 'wb') as f: + pass + with cw.open('one.txt', 'wb') as f: + f.write(b'1') + with cw.open('foo/zero.txt', 'wb') as f: + pass + # sorted, that's: [./one.txt, ./zero.txt, foo/zero.txt] + self.check_manifest_file_sizes(cw.manifest_text(), [1,0,0]) def check_manifest_file_sizes(self, manifest_text, expect_sizes): - cr = arvados.CollectionReader(manifest_text, self.api_client) got_sizes = [] - for f in cr.all_files(): - got_sizes += [f.size()] + def walk(subdir): + for fnm in subdir: + if isinstance(subdir[fnm], arvados.arvfile.ArvadosFile): + got_sizes.append(subdir[fnm].size()) + else: + walk(subdir[fnm]) + cr = arvados.CollectionReader(manifest_text, self.api_client) + walk(cr) self.assertEqual(got_sizes, expect_sizes, "got wrong file sizes %s, expected %s" % (got_sizes, expect_sizes)) def test_normalized_collection(self): @@ -233,30 +153,30 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers, Range('e', 40, 10), Range('f', 50, 10)] - self.assertEqual(arvados.locators_and_ranges(blocks2, 2, 2), [LocatorAndRange('a', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 12, 2), [LocatorAndRange('b', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 22, 2), [LocatorAndRange('c', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 32, 2), [LocatorAndRange('d', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 42, 2), [LocatorAndRange('e', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 52, 2), [LocatorAndRange('f', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 62, 2), []) - self.assertEqual(arvados.locators_and_ranges(blocks2, -2, 2), []) - - self.assertEqual(arvados.locators_and_ranges(blocks2, 0, 2), [LocatorAndRange('a', 10, 0, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 10, 2), [LocatorAndRange('b', 10, 0, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 20, 2), [LocatorAndRange('c', 10, 0, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 30, 2), [LocatorAndRange('d', 10, 0, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 40, 2), [LocatorAndRange('e', 10, 0, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 50, 2), [LocatorAndRange('f', 10, 0, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 60, 2), []) - self.assertEqual(arvados.locators_and_ranges(blocks2, -2, 2), []) - - self.assertEqual(arvados.locators_and_ranges(blocks2, 9, 2), [LocatorAndRange('a', 10, 9, 1), LocatorAndRange('b', 10, 0, 1)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 19, 2), [LocatorAndRange('b', 10, 9, 1), LocatorAndRange('c', 10, 0, 1)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 29, 2), [LocatorAndRange('c', 10, 9, 1), LocatorAndRange('d', 10, 0, 1)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 39, 2), [LocatorAndRange('d', 10, 9, 1), LocatorAndRange('e', 10, 0, 1)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 49, 2), [LocatorAndRange('e', 10, 9, 1), LocatorAndRange('f', 10, 0, 1)]) - self.assertEqual(arvados.locators_and_ranges(blocks2, 59, 2), [LocatorAndRange('f', 10, 9, 1)]) + self.assertEqual(locators_and_ranges(blocks2, 2, 2), [LocatorAndRange('a', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 12, 2), [LocatorAndRange('b', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 22, 2), [LocatorAndRange('c', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 32, 2), [LocatorAndRange('d', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 42, 2), [LocatorAndRange('e', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 52, 2), [LocatorAndRange('f', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 62, 2), []) + self.assertEqual(locators_and_ranges(blocks2, -2, 2), []) + + self.assertEqual(locators_and_ranges(blocks2, 0, 2), [LocatorAndRange('a', 10, 0, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 10, 2), [LocatorAndRange('b', 10, 0, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 20, 2), [LocatorAndRange('c', 10, 0, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 30, 2), [LocatorAndRange('d', 10, 0, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 40, 2), [LocatorAndRange('e', 10, 0, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 50, 2), [LocatorAndRange('f', 10, 0, 2)]) + self.assertEqual(locators_and_ranges(blocks2, 60, 2), []) + self.assertEqual(locators_and_ranges(blocks2, -2, 2), []) + + self.assertEqual(locators_and_ranges(blocks2, 9, 2), [LocatorAndRange('a', 10, 9, 1), LocatorAndRange('b', 10, 0, 1)]) + self.assertEqual(locators_and_ranges(blocks2, 19, 2), [LocatorAndRange('b', 10, 9, 1), LocatorAndRange('c', 10, 0, 1)]) + self.assertEqual(locators_and_ranges(blocks2, 29, 2), [LocatorAndRange('c', 10, 9, 1), LocatorAndRange('d', 10, 0, 1)]) + self.assertEqual(locators_and_ranges(blocks2, 39, 2), [LocatorAndRange('d', 10, 9, 1), LocatorAndRange('e', 10, 0, 1)]) + self.assertEqual(locators_and_ranges(blocks2, 49, 2), [LocatorAndRange('e', 10, 9, 1), LocatorAndRange('f', 10, 0, 1)]) + self.assertEqual(locators_and_ranges(blocks2, 59, 2), [LocatorAndRange('f', 10, 9, 1)]) blocks3 = [Range('a', 0, 10), @@ -267,56 +187,56 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers, Range('f', 50, 10), Range('g', 60, 10)] - self.assertEqual(arvados.locators_and_ranges(blocks3, 2, 2), [LocatorAndRange('a', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks3, 12, 2), [LocatorAndRange('b', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks3, 22, 2), [LocatorAndRange('c', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks3, 32, 2), [LocatorAndRange('d', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks3, 42, 2), [LocatorAndRange('e', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks3, 52, 2), [LocatorAndRange('f', 10, 2, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks3, 62, 2), [LocatorAndRange('g', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks3, 2, 2), [LocatorAndRange('a', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks3, 12, 2), [LocatorAndRange('b', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks3, 22, 2), [LocatorAndRange('c', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks3, 32, 2), [LocatorAndRange('d', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks3, 42, 2), [LocatorAndRange('e', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks3, 52, 2), [LocatorAndRange('f', 10, 2, 2)]) + self.assertEqual(locators_and_ranges(blocks3, 62, 2), [LocatorAndRange('g', 10, 2, 2)]) blocks = [Range('a', 0, 10), Range('b', 10, 15), Range('c', 25, 5)] - self.assertEqual(arvados.locators_and_ranges(blocks, 1, 0), []) - self.assertEqual(arvados.locators_and_ranges(blocks, 0, 5), [LocatorAndRange('a', 10, 0, 5)]) - self.assertEqual(arvados.locators_and_ranges(blocks, 3, 5), [LocatorAndRange('a', 10, 3, 5)]) - self.assertEqual(arvados.locators_and_ranges(blocks, 0, 10), [LocatorAndRange('a', 10, 0, 10)]) - - self.assertEqual(arvados.locators_and_ranges(blocks, 0, 11), [LocatorAndRange('a', 10, 0, 10), - LocatorAndRange('b', 15, 0, 1)]) - self.assertEqual(arvados.locators_and_ranges(blocks, 1, 11), [LocatorAndRange('a', 10, 1, 9), - LocatorAndRange('b', 15, 0, 2)]) - self.assertEqual(arvados.locators_and_ranges(blocks, 0, 25), [LocatorAndRange('a', 10, 0, 10), - LocatorAndRange('b', 15, 0, 15)]) - - self.assertEqual(arvados.locators_and_ranges(blocks, 0, 30), [LocatorAndRange('a', 10, 0, 10), - LocatorAndRange('b', 15, 0, 15), - LocatorAndRange('c', 5, 0, 5)]) - self.assertEqual(arvados.locators_and_ranges(blocks, 1, 30), [LocatorAndRange('a', 10, 1, 9), - LocatorAndRange('b', 15, 0, 15), - LocatorAndRange('c', 5, 0, 5)]) - self.assertEqual(arvados.locators_and_ranges(blocks, 0, 31), [LocatorAndRange('a', 10, 0, 10), - LocatorAndRange('b', 15, 0, 15), - LocatorAndRange('c', 5, 0, 5)]) - - self.assertEqual(arvados.locators_and_ranges(blocks, 15, 5), [LocatorAndRange('b', 15, 5, 5)]) - - self.assertEqual(arvados.locators_and_ranges(blocks, 8, 17), [LocatorAndRange('a', 10, 8, 2), - LocatorAndRange('b', 15, 0, 15)]) - - self.assertEqual(arvados.locators_and_ranges(blocks, 8, 20), [LocatorAndRange('a', 10, 8, 2), - LocatorAndRange('b', 15, 0, 15), - LocatorAndRange('c', 5, 0, 3)]) - - self.assertEqual(arvados.locators_and_ranges(blocks, 26, 2), [LocatorAndRange('c', 5, 1, 2)]) - - self.assertEqual(arvados.locators_and_ranges(blocks, 9, 15), [LocatorAndRange('a', 10, 9, 1), - LocatorAndRange('b', 15, 0, 14)]) - self.assertEqual(arvados.locators_and_ranges(blocks, 10, 15), [LocatorAndRange('b', 15, 0, 15)]) - self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [LocatorAndRange('b', 15, 1, 14), - LocatorAndRange('c', 5, 0, 1)]) + self.assertEqual(locators_and_ranges(blocks, 1, 0), []) + self.assertEqual(locators_and_ranges(blocks, 0, 5), [LocatorAndRange('a', 10, 0, 5)]) + self.assertEqual(locators_and_ranges(blocks, 3, 5), [LocatorAndRange('a', 10, 3, 5)]) + self.assertEqual(locators_and_ranges(blocks, 0, 10), [LocatorAndRange('a', 10, 0, 10)]) + + self.assertEqual(locators_and_ranges(blocks, 0, 11), [LocatorAndRange('a', 10, 0, 10), + LocatorAndRange('b', 15, 0, 1)]) + self.assertEqual(locators_and_ranges(blocks, 1, 11), [LocatorAndRange('a', 10, 1, 9), + LocatorAndRange('b', 15, 0, 2)]) + self.assertEqual(locators_and_ranges(blocks, 0, 25), [LocatorAndRange('a', 10, 0, 10), + LocatorAndRange('b', 15, 0, 15)]) + + self.assertEqual(locators_and_ranges(blocks, 0, 30), [LocatorAndRange('a', 10, 0, 10), + LocatorAndRange('b', 15, 0, 15), + LocatorAndRange('c', 5, 0, 5)]) + self.assertEqual(locators_and_ranges(blocks, 1, 30), [LocatorAndRange('a', 10, 1, 9), + LocatorAndRange('b', 15, 0, 15), + LocatorAndRange('c', 5, 0, 5)]) + self.assertEqual(locators_and_ranges(blocks, 0, 31), [LocatorAndRange('a', 10, 0, 10), + LocatorAndRange('b', 15, 0, 15), + LocatorAndRange('c', 5, 0, 5)]) + + self.assertEqual(locators_and_ranges(blocks, 15, 5), [LocatorAndRange('b', 15, 5, 5)]) + + self.assertEqual(locators_and_ranges(blocks, 8, 17), [LocatorAndRange('a', 10, 8, 2), + LocatorAndRange('b', 15, 0, 15)]) + + self.assertEqual(locators_and_ranges(blocks, 8, 20), [LocatorAndRange('a', 10, 8, 2), + LocatorAndRange('b', 15, 0, 15), + LocatorAndRange('c', 5, 0, 3)]) + + self.assertEqual(locators_and_ranges(blocks, 26, 2), [LocatorAndRange('c', 5, 1, 2)]) + + self.assertEqual(locators_and_ranges(blocks, 9, 15), [LocatorAndRange('a', 10, 9, 1), + LocatorAndRange('b', 15, 0, 14)]) + self.assertEqual(locators_and_ranges(blocks, 10, 15), [LocatorAndRange('b', 15, 0, 15)]) + self.assertEqual(locators_and_ranges(blocks, 11, 15), [LocatorAndRange('b', 15, 1, 14), + LocatorAndRange('c', 5, 0, 1)]) class MockKeep(object): def __init__(self, content, num_retries=0): @@ -326,32 +246,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers, def get(self, locator, num_retries=0, prefetch=False): return self.content[locator] - def test_stream_reader(self): - keepblocks = { - 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': b'abcdefghij', - 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': b'klmnopqrstuvwxy', - 'cccccccccccccccccccccccccccccccc+5': b'z0123', - } - mk = self.MockKeep(keepblocks) - - sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:30:foo"], mk) - - content = b'abcdefghijklmnopqrstuvwxyz0123456789' - - self.assertEqual(sr.readfrom(0, 30), content[0:30]) - self.assertEqual(sr.readfrom(2, 30), content[2:30]) - - self.assertEqual(sr.readfrom(2, 8), content[2:10]) - self.assertEqual(sr.readfrom(0, 10), content[0:10]) - - self.assertEqual(sr.readfrom(0, 5), content[0:5]) - self.assertEqual(sr.readfrom(5, 5), content[5:10]) - self.assertEqual(sr.readfrom(10, 5), content[10:15]) - self.assertEqual(sr.readfrom(15, 5), content[15:20]) - self.assertEqual(sr.readfrom(20, 5), content[20:25]) - self.assertEqual(sr.readfrom(25, 5), content[25:30]) - self.assertEqual(sr.readfrom(30, 5), b'') - def test_extract_file(self): m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt . 085c37f02916da1cad16f93c54d899b7+41 0:41:md6sum.txt @@ -359,156 +253,19 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers, . 085c37f02916da1cad16f93c54d899b7+41 5348b82a029fd9e971a811ce1f71360b+43 8b22da26f9f433dea0a10e5ec66d73ba+43 47:80:md8sum.txt . 085c37f02916da1cad16f93c54d899b7+41 5348b82a029fd9e971a811ce1f71360b+43 8b22da26f9f433dea0a10e5ec66d73ba+43 40:80:md9sum.txt """ - - m2 = arvados.CollectionReader(m1, self.api_client).manifest_text(normalize=True) - + coll = arvados.CollectionReader(m1, self.api_client) + m2 = coll.manifest_text(normalize=True) self.assertEqual(m2, ". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt 43:41:md6sum.txt 84:43:md7sum.txt 6:37:md8sum.txt 84:43:md8sum.txt 83:1:md9sum.txt 0:43:md9sum.txt 84:36:md9sum.txt\n") - files = arvados.CollectionReader( - m2, self.api_client).all_streams()[0].files() - - self.assertEqual(files['md5sum.txt'].as_manifest(), + self.assertEqual(coll['md5sum.txt'].manifest_text(), ". 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt\n") - self.assertEqual(files['md6sum.txt'].as_manifest(), + self.assertEqual(coll['md6sum.txt'].manifest_text(), ". 085c37f02916da1cad16f93c54d899b7+41 0:41:md6sum.txt\n") - self.assertEqual(files['md7sum.txt'].as_manifest(), + self.assertEqual(coll['md7sum.txt'].manifest_text(), ". 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md7sum.txt\n") - self.assertEqual(files['md9sum.txt'].as_manifest(), + self.assertEqual(coll['md9sum.txt'].manifest_text(), ". 085c37f02916da1cad16f93c54d899b7+41 5348b82a029fd9e971a811ce1f71360b+43 8b22da26f9f433dea0a10e5ec66d73ba+43 40:80:md9sum.txt\n") - def test_write_directory_tree(self): - cwriter = arvados.CollectionWriter(self.api_client) - cwriter.write_directory_tree(self.build_directory_tree( - ['basefile', 'subdir/subfile'])) - self.assertEqual(cwriter.manifest_text(), - """. c5110c5ac93202d8e0f9e381f22bac0f+8 0:8:basefile -./subdir 1ca4dec89403084bf282ad31e6cf7972+14 0:14:subfile\n""") - - def test_write_named_directory_tree(self): - cwriter = arvados.CollectionWriter(self.api_client) - cwriter.write_directory_tree(self.build_directory_tree( - ['basefile', 'subdir/subfile']), 'root') - self.assertEqual( - cwriter.manifest_text(), - """./root c5110c5ac93202d8e0f9e381f22bac0f+8 0:8:basefile -./root/subdir 1ca4dec89403084bf282ad31e6cf7972+14 0:14:subfile\n""") - - def test_write_directory_tree_in_one_stream(self): - cwriter = arvados.CollectionWriter(self.api_client) - cwriter.write_directory_tree(self.build_directory_tree( - ['basefile', 'subdir/subfile']), max_manifest_depth=0) - self.assertEqual(cwriter.manifest_text(), - """. 4ace875ffdc6824a04950f06858f4465+22 0:8:basefile 8:14:subdir/subfile\n""") - - def test_write_directory_tree_with_limited_recursion(self): - cwriter = arvados.CollectionWriter(self.api_client) - cwriter.write_directory_tree( - self.build_directory_tree(['f1', 'd1/f2', 'd1/d2/f3']), - max_manifest_depth=1) - self.assertEqual(cwriter.manifest_text(), - """. bd19836ddb62c11c55ab251ccaca5645+2 0:2:f1 -./d1 50170217e5b04312024aa5cd42934494+13 0:8:d2/f3 8:5:f2\n""") - - def test_write_directory_tree_with_zero_recursion(self): - cwriter = arvados.CollectionWriter(self.api_client) - content = 'd1/d2/f3d1/f2f1' - blockhash = tutil.str_keep_locator(content) - cwriter.write_directory_tree( - self.build_directory_tree(['f1', 'd1/f2', 'd1/d2/f3']), - max_manifest_depth=0) - self.assertEqual( - cwriter.manifest_text(), - ". {} 0:8:d1/d2/f3 8:5:d1/f2 13:2:f1\n".format(blockhash)) - - def test_write_one_file(self): - cwriter = arvados.CollectionWriter(self.api_client) - with self.make_test_file() as testfile: - cwriter.write_file(testfile.name) - self.assertEqual( - cwriter.manifest_text(), - ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:{}\n".format( - os.path.basename(testfile.name))) - - def test_write_named_file(self): - cwriter = arvados.CollectionWriter(self.api_client) - with self.make_test_file() as testfile: - cwriter.write_file(testfile.name, 'foo') - self.assertEqual(cwriter.manifest_text(), - ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:foo\n") - - def test_write_multiple_files(self): - cwriter = arvados.CollectionWriter(self.api_client) - for letter in 'ABC': - with self.make_test_file(letter.encode()) as testfile: - cwriter.write_file(testfile.name, letter) - self.assertEqual( - cwriter.manifest_text(), - ". 902fbdd2b1df0c4f70b4a5d23525e932+3 0:1:A 1:1:B 2:1:C\n") - - def test_basic_resume(self): - cwriter = TestResumableWriter() - with self.make_test_file() as testfile: - cwriter.write_file(testfile.name, 'test') - resumed = TestResumableWriter.from_state(cwriter.current_state()) - self.assertEqual(cwriter.manifest_text(), resumed.manifest_text(), - "resumed CollectionWriter had different manifest") - - def test_resume_fails_when_missing_dependency(self): - cwriter = TestResumableWriter() - with self.make_test_file() as testfile: - cwriter.write_file(testfile.name, 'test') - self.assertRaises(arvados.errors.StaleWriterStateError, - TestResumableWriter.from_state, - cwriter.current_state()) - - def test_resume_fails_when_dependency_mtime_changed(self): - cwriter = TestResumableWriter() - with self.make_test_file() as testfile: - cwriter.write_file(testfile.name, 'test') - os.utime(testfile.name, (0, 0)) - self.assertRaises(arvados.errors.StaleWriterStateError, - TestResumableWriter.from_state, - cwriter.current_state()) - - def test_resume_fails_when_dependency_is_nonfile(self): - cwriter = TestResumableWriter() - cwriter.write_file('/dev/null', 'empty') - self.assertRaises(arvados.errors.StaleWriterStateError, - TestResumableWriter.from_state, - cwriter.current_state()) - - def test_resume_fails_when_dependency_size_changed(self): - cwriter = TestResumableWriter() - with self.make_test_file() as testfile: - cwriter.write_file(testfile.name, 'test') - orig_mtime = os.fstat(testfile.fileno()).st_mtime - testfile.write(b'extra') - testfile.flush() - os.utime(testfile.name, (orig_mtime, orig_mtime)) - self.assertRaises(arvados.errors.StaleWriterStateError, - TestResumableWriter.from_state, - cwriter.current_state()) - - def test_resume_fails_with_expired_locator(self): - cwriter = TestResumableWriter() - state = cwriter.current_state() - # Add an expired locator to the state. - state['_current_stream_locators'].append(''.join([ - 'a' * 32, '+1+A', 'b' * 40, '@', '10000000'])) - self.assertRaises(arvados.errors.StaleWriterStateError, - TestResumableWriter.from_state, state) - - def test_arbitrary_objects_not_resumable(self): - cwriter = TestResumableWriter() - with open('/dev/null') as badfile: - self.assertRaises(arvados.errors.AssertionError, - cwriter.write_file, badfile) - - def test_arbitrary_writes_not_resumable(self): - cwriter = TestResumableWriter() - self.assertRaises(arvados.errors.AssertionError, - cwriter.write, "badtext") - class CollectionTestMixin(tutil.ApiClientMock): API_COLLECTIONS = run_test_server.fixture('collections') @@ -578,8 +335,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin): reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client, num_retries=3) with tutil.mock_keep_responses('foo', 500, 500, 200): - self.assertEqual(b'foo', - b''.join(f.read(9) for f in reader.all_files())) + self.assertEqual('foo', reader.open('foo', 'r').read()) def test_read_nonnormalized_manifest_with_collection_reader(self): # client should be able to use CollectionReader on a manifest without normalizing it @@ -595,12 +351,6 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin): reader.stripped_manifest()) # Ensure stripped_manifest() didn't mutate our reader. self.assertEqual(nonnormal, reader.manifest_text()) - # Ensure the files appear in the order given in the manifest. - self.assertEqual( - [[6, '.', 'foo.txt'], - [0, '.', 'bar.txt']], - [[f.size(), f.stream_name(), f.name()] - for f in reader.all_streams()[0].all_files()]) def test_read_empty_collection(self): client = self.api_client_mock(200) @@ -649,140 +399,6 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin): self.assertRaises(IOError, reader.open, 'nonexistent') -@tutil.skip_sleep -class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin): - def mock_keep(self, body, *codes, **headers): - headers.setdefault('x-keep-replicas-stored', 2) - return tutil.mock_keep_responses(body, *codes, **headers) - - def foo_writer(self, **kwargs): - kwargs.setdefault('api_client', self.api_client_mock()) - writer = arvados.CollectionWriter(**kwargs) - writer.start_new_file('foo') - writer.write(b'foo') - return writer - - def test_write_whole_collection(self): - writer = self.foo_writer() - with self.mock_keep(self.DEFAULT_DATA_HASH, 200, 200): - self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish()) - - def test_write_no_default(self): - writer = self.foo_writer() - with self.mock_keep(None, 500): - with self.assertRaises(arvados.errors.KeepWriteError): - writer.finish() - - def test_write_insufficient_replicas_via_proxy(self): - writer = self.foo_writer(replication=3) - with self.mock_keep(None, 200, **{'x-keep-replicas-stored': 2}): - with self.assertRaises(arvados.errors.KeepWriteError): - writer.manifest_text() - - def test_write_insufficient_replicas_via_disks(self): - client = mock.MagicMock(name='api_client') - with self.mock_keep( - None, 200, 200, - **{'x-keep-replicas-stored': 1}) as keepmock: - self.mock_keep_services(client, status=200, service_type='disk', count=2) - writer = self.foo_writer(api_client=client, replication=3) - with self.assertRaises(arvados.errors.KeepWriteError): - writer.manifest_text() - - def test_write_three_replicas(self): - client = mock.MagicMock(name='api_client') - with self.mock_keep( - "", 500, 500, 500, 200, 200, 200, - **{'x-keep-replicas-stored': 1}) as keepmock: - self.mock_keep_services(client, status=200, service_type='disk', count=6) - writer = self.foo_writer(api_client=client, replication=3) - writer.manifest_text() - self.assertEqual(6, keepmock.call_count) - - def test_write_whole_collection_through_retries(self): - writer = self.foo_writer(num_retries=2) - with self.mock_keep(self.DEFAULT_DATA_HASH, - 500, 500, 200, 500, 500, 200): - self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish()) - - def test_flush_data_retries(self): - writer = self.foo_writer(num_retries=2) - foo_hash = self.DEFAULT_MANIFEST.split()[1] - with self.mock_keep(foo_hash, 500, 200): - writer.flush_data() - self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text()) - - def test_one_open(self): - client = self.api_client_mock() - writer = arvados.CollectionWriter(client) - with writer.open('out') as out_file: - self.assertEqual('.', writer.current_stream_name()) - self.assertEqual('out', writer.current_file_name()) - out_file.write(b'test data') - data_loc = tutil.str_keep_locator('test data') - self.assertTrue(out_file.closed, "writer file not closed after context") - self.assertRaises(ValueError, out_file.write, 'extra text') - with self.mock_keep(data_loc, 200) as keep_mock: - self.assertEqual(". {} 0:9:out\n".format(data_loc), - writer.manifest_text()) - - def test_open_writelines(self): - client = self.api_client_mock() - writer = arvados.CollectionWriter(client) - with writer.open('six') as out_file: - out_file.writelines(['12', '34', '56']) - data_loc = tutil.str_keep_locator('123456') - with self.mock_keep(data_loc, 200) as keep_mock: - self.assertEqual(". {} 0:6:six\n".format(data_loc), - writer.manifest_text()) - - def test_open_flush(self): - client = self.api_client_mock() - data_loc1 = tutil.str_keep_locator('flush1') - data_loc2 = tutil.str_keep_locator('flush2') - with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock: - writer = arvados.CollectionWriter(client) - with writer.open('flush_test') as out_file: - out_file.write(b'flush1') - out_file.flush() - out_file.write(b'flush2') - self.assertEqual(". {} {} 0:12:flush_test\n".format(data_loc1, - data_loc2), - writer.manifest_text()) - - def test_two_opens_same_stream(self): - client = self.api_client_mock() - writer = arvados.CollectionWriter(client) - with writer.open('.', '1') as out_file: - out_file.write(b'1st') - with writer.open('.', '2') as out_file: - out_file.write(b'2nd') - data_loc = tutil.str_keep_locator('1st2nd') - with self.mock_keep(data_loc, 200) as keep_mock: - self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc), - writer.manifest_text()) - - def test_two_opens_two_streams(self): - client = self.api_client_mock() - data_loc1 = tutil.str_keep_locator('file') - data_loc2 = tutil.str_keep_locator('indir') - with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock: - writer = arvados.CollectionWriter(client) - with writer.open('file') as out_file: - out_file.write(b'file') - with writer.open('./dir', 'indir') as out_file: - out_file.write(b'indir') - expected = ". {} 0:4:file\n./dir {} 0:5:indir\n".format( - data_loc1, data_loc2) - self.assertEqual(expected, writer.manifest_text()) - - def test_dup_open_fails(self): - client = self.api_client_mock() - writer = arvados.CollectionWriter(client) - file1 = writer.open('one') - self.assertRaises(arvados.errors.AssertionError, writer.open, 'two') - - class CollectionMethods(run_test_server.TestCaseWithServers): def test_keys_values_items_support_indexing(self): diff --git a/sdk/python/tests/test_crunch.py b/sdk/python/tests/test_crunch.py deleted file mode 100644 index 809e229b20..0000000000 --- a/sdk/python/tests/test_crunch.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados.crunch -import os -import shutil -import tempfile -import unittest - -class TaskOutputDirTest(unittest.TestCase): - def setUp(self): - self.tmp = tempfile.mkdtemp() - os.environ['TASK_KEEPMOUNT_TMP'] = self.tmp - - def tearDown(self): - os.environ.pop('TASK_KEEPMOUNT_TMP') - shutil.rmtree(self.tmp) - - def test_env_var(self): - out = arvados.crunch.TaskOutputDir() - self.assertEqual(out.path, self.tmp) - - with open(os.path.join(self.tmp, '.arvados#collection'), 'w') as f: - f.write('{\n "manifest_text":"",\n "uuid":null\n}\n') - self.assertEqual(out.manifest_text(), '') - - # Special file must be re-read on each call to manifest_text(). - with open(os.path.join(self.tmp, '.arvados#collection'), 'w') as f: - f.write(r'{"manifest_text":". unparsed 0:3:foo\n","uuid":null}') - self.assertEqual(out.manifest_text(), ". unparsed 0:3:foo\n") diff --git a/sdk/python/tests/test_sdk.py b/sdk/python/tests/test_sdk.py deleted file mode 100644 index 4ef81c53d8..0000000000 --- a/sdk/python/tests/test_sdk.py +++ /dev/null @@ -1,48 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import os -import unittest - -from unittest import mock - -import arvados -import arvados.collection - -class TestSDK(unittest.TestCase): - - @mock.patch('arvados.current_task') - @mock.patch('arvados.current_job') - def test_one_task_per_input_file_normalize(self, mock_job, mock_task): - mock_api = mock.MagicMock() - - # This manifest will be reduced from three lines to one when it is - # normalized. - nonnormalized_manifest = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt -. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt -. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt -""" - dummy_hash = 'ffffffffffffffffffffffffffffffff+0' - - mock_job.return_value = { - 'uuid': 'none', - 'script_parameters': { - 'input': dummy_hash - } - } - mock_task.return_value = { - 'uuid': 'none', - 'sequence': 0, - } - # mock the API client to return a collection with a nonnormalized manifest. - mock_api.collections().get().execute.return_value = { - 'uuid': 'zzzzz-4zz18-mockcollection0', - 'portable_data_hash': dummy_hash, - 'manifest_text': nonnormalized_manifest, - } - - # Because one_task_per_input_file normalizes this collection, - # it should now create only one job task and not three. - arvados.job_setup.one_task_per_input_file(and_end_task=False, api_client=mock_api) - mock_api.job_tasks().create().execute.assert_called_once_with() diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py index a3f5d9ff63..7e6e1f55a3 100644 --- a/sdk/python/tests/test_stream.py +++ b/sdk/python/tests/test_stream.py @@ -12,18 +12,12 @@ import hashlib from unittest import mock import arvados -from arvados import StreamReader, StreamFileReader from arvados._ranges import Range from . import arvados_testutil as tutil from . import run_test_server -class StreamFileReaderTestCase(unittest.TestCase): - def make_count_reader(self): - stream = tutil.MockStreamReader('.', '01234', '34567', '67890') - return StreamFileReader(stream, [Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)], - 'count.txt') - +class StreamFileReaderTestMixin(object): def test_read_block_crossing_behavior(self): # read() calls will be aligned on block boundaries - see #3663. sfile = self.make_count_reader() @@ -35,8 +29,8 @@ class StreamFileReaderTestCase(unittest.TestCase): def test_successive_reads(self): sfile = self.make_count_reader() - for expect in [b'123', b'456', b'789', b'']: - self.assertEqual(expect, sfile.read(10)) + for expect in [b'1234', b'5678', b'9', b'']: + self.assertEqual(expect, sfile.read(4)) def test_readfrom_spans_blocks(self): sfile = self.make_count_reader() @@ -87,11 +81,6 @@ class StreamFileReaderTestCase(unittest.TestCase): def test_size(self): self.assertEqual(9, self.make_count_reader().size()) - def test_tell_after_block_read(self): - sfile = self.make_count_reader() - sfile.read(5) - self.assertEqual(3, sfile.tell()) - def test_tell_after_small_read(self): sfile = self.make_count_reader() sfile.read(1) @@ -108,10 +97,6 @@ class StreamFileReaderTestCase(unittest.TestCase): self.assertEqual(b'12', sfile.read(2)) self.assertTrue(sfile.closed, "reader is open after context") - def make_newlines_reader(self): - stream = tutil.MockStreamReader('.', 'one\ntwo\n\nth', 'ree\nfour\n\n') - return StreamFileReader(stream, [Range(0, 0, 11), Range(11, 11, 10)], 'count.txt') - def check_lines(self, actual): self.assertEqual(['one\n', 'two\n', '\n', 'three\n', 'four\n', '\n'], actual) @@ -142,19 +127,14 @@ class StreamFileReaderTestCase(unittest.TestCase): def test_readlines_sizehint(self): result = self.make_newlines_reader().readlines(8) - self.assertEqual(['one\n', 'two\n'], result[:2]) - self.assertNotIn('three\n', result) + self.assertEqual(['one\n', 'two\n', '\n', 'three\n', 'four\n', '\n'], result) def test_name_attribute(self): - # Test both .name and .name() (for backward compatibility) - stream = tutil.MockStreamReader() - sfile = StreamFileReader(stream, [Range(0, 0, 0)], 'nametest') + sfile = self.make_file_reader(name='nametest') self.assertEqual('nametest', sfile.name) - self.assertEqual('nametest', sfile.name()) def check_decompressed_name(self, filename, expect): - stream = tutil.MockStreamReader('.', '') - reader = StreamFileReader(stream, [Range(0, 0, 0)], filename) + reader = self.make_file_reader(name=filename) self.assertEqual(expect, reader.decompressed_name()) def test_decompressed_name_uncompressed_file(self): @@ -169,9 +149,7 @@ class StreamFileReaderTestCase(unittest.TestCase): def check_decompression(self, compress_ext, compress_func): test_text = b'decompression\ntest\n' test_data = compress_func(test_text) - stream = tutil.MockStreamReader('.', test_data) - reader = StreamFileReader(stream, [Range(0, 0, len(test_data))], - 'test.' + compress_ext) + reader = self.make_file_reader(name='test.'+compress_ext, data=test_data) self.assertEqual(test_text, b''.join(reader.readall_decompressed())) @staticmethod @@ -257,48 +235,5 @@ class StreamRetryTestMixin(object): self.read_for_test(reader, 10, num_retries=1) -class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin): - def reader_for(self, coll_name, **kwargs): - return StreamReader(self.manifest_for(coll_name).split(), - self.keep_client(), **kwargs) - - def read_for_test(self, reader, byte_count, **kwargs): - return reader.readfrom(0, byte_count, **kwargs) - - def test_manifest_text_without_keep_client(self): - mtext = self.manifest_for('multilevel_collection_1') - for line in mtext.rstrip('\n').split('\n'): - reader = StreamReader(line.split()) - self.assertEqual(line + '\n', reader.manifest_text()) - - -class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin): - def reader_for(self, coll_name, **kwargs): - return StreamReader(self.manifest_for(coll_name).split(), - self.keep_client(), **kwargs).all_files()[0] - - def read_for_test(self, reader, byte_count, **kwargs): - return reader.read(byte_count, **kwargs) - - -class StreamFileReadFromTestCase(StreamFileReadTestCase): - def read_for_test(self, reader, byte_count, **kwargs): - return reader.readfrom(0, byte_count, **kwargs) - - -class StreamFileReadAllTestCase(StreamFileReadTestCase): - def read_for_test(self, reader, byte_count, **kwargs): - return b''.join(reader.readall(**kwargs)) - - -class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase): - def read_for_test(self, reader, byte_count, **kwargs): - return b''.join(reader.readall_decompressed(**kwargs)) - - -class StreamFileReadlinesTestCase(StreamFileReadTestCase): - def read_for_test(self, reader, byte_count, **kwargs): - return ''.join(reader.readlines(**kwargs)).encode() - if __name__ == '__main__': unittest.main() diff --git a/sdk/python/tests/test_util.py b/sdk/python/tests/test_util.py index 75d4a89e30..3055599ca5 100644 --- a/sdk/python/tests/test_util.py +++ b/sdk/python/tests/test_util.py @@ -13,36 +13,6 @@ from unittest import mock import arvados import arvados.util -class MkdirDashPTest(unittest.TestCase): - def setUp(self): - try: - os.path.mkdir('./tmp') - except: - pass - def tearDown(self): - try: - os.unlink('./tmp/bar') - os.rmdir('./tmp/foo') - os.rmdir('./tmp') - except: - pass - def runTest(self): - arvados.util.mkdir_dash_p('./tmp/foo') - with open('./tmp/bar', 'wb') as f: - f.write(b'bar') - self.assertRaises(OSError, arvados.util.mkdir_dash_p, './tmp/bar') - - -class RunCommandTestCase(unittest.TestCase): - def test_success(self): - stdout, stderr = arvados.util.run_command(['echo', 'test'], - stderr=subprocess.PIPE) - self.assertEqual("test\n".encode(), stdout) - self.assertEqual("".encode(), stderr) - - def test_failure(self): - with self.assertRaises(arvados.errors.CommandFailedError): - arvados.util.run_command(['false']) class KeysetTestHelper: def __init__(self, expect): diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py index f5f61baeb3..53e29ec087 100644 --- a/services/fuse/tests/test_mount.py +++ b/services/fuse/tests/test_mount.py @@ -59,44 +59,38 @@ class FuseMountTest(MountTestBase): def setUp(self): super(FuseMountTest, self).setUp() - cw = arvados.CollectionWriter() - - cw.start_new_file('thing1.txt') - cw.write("data 1") - cw.start_new_file('thing2.txt') - cw.write("data 2") - - cw.start_new_stream('dir1') - cw.start_new_file('thing3.txt') - cw.write("data 3") - cw.start_new_file('thing4.txt') - cw.write("data 4") - - cw.start_new_stream('dir2') - cw.start_new_file('thing5.txt') - cw.write("data 5") - cw.start_new_file('thing6.txt') - cw.write("data 6") - - cw.start_new_stream('dir2/dir3') - cw.start_new_file('thing7.txt') - cw.write("data 7") - - cw.start_new_file('thing8.txt') - cw.write("data 8") - - cw.start_new_stream('edgecases') - for f in ":/.../-/*/ ".split("/"): - cw.start_new_file(f) - cw.write('x') - - for f in ":/.../-/*/ ".split("/"): - cw.start_new_stream('edgecases/dirs/' + f) - cw.start_new_file('x/x') - cw.write('x') - - self.testcollection = cw.finish() - self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute() + cw = arvados.collection.Collection() + with cw.open('thing1.txt', 'w') as f: + f.write('data 1') + with cw.open('thing2.txt', 'w') as f: + f.write('data 2') + + with cw.open('dir1/thing3.txt', 'w') as f: + f.write('data 3') + with cw.open('dir1/thing4.txt', 'w') as f: + f.write('data 4') + + with cw.open('dir2/thing5.txt', 'w') as f: + f.write('data 5') + with cw.open('dir2/thing6.txt', 'w') as f: + f.write('data 6') + + with cw.open('dir2/dir3/thing7.txt', 'w') as f: + f.write('data 7') + with cw.open('dir2/dir3/thing8.txt', 'w') as f: + f.write('data 8') + + for fnm in ":/.../-/*/ ".split("/"): + with cw.open('edgecases/'+fnm, 'w') as f: + f.write('x') + + for fnm in ":/.../-/*/ ".split("/"): + with cw.open('edgecases/dirs/'+fnm+'/x/x', 'w') as f: + f.write('x') + + self.testcollection = cw.portable_data_hash() + self.test_manifest = cw.manifest_text() + self.api.collections().create(body={"manifest_text": self.test_manifest}).execute() def runTest(self): self.make_mount(fuse.CollectionDirectory, collection_record=self.testcollection) @@ -136,12 +130,11 @@ class FuseMagicTest(MountTestBase): self.collection_in_test_project = run_test_server.fixture('collections')['foo_collection_in_aproject']['name'] self.collection_in_filter_group = run_test_server.fixture('collections')['baz_file']['name'] - cw = arvados.CollectionWriter() - - cw.start_new_file('thing1.txt') - cw.write("data 1") + cw = arvados.collection.Collection() + with cw.open('thing1.txt', 'w') as f: + f.write('data 1') - self.testcollection = cw.finish() + self.testcollection = cw.portable_data_hash() self.test_manifest = cw.manifest_text() coll = self.api.collections().create(body={"manifest_text":self.test_manifest}).execute() self.test_manifest_pdh = coll['portable_data_hash'] @@ -1160,12 +1153,11 @@ class FuseMagicTestPDHOnly(MountTestBase): def setUp(self, api=None): super(FuseMagicTestPDHOnly, self).setUp(api=api) - cw = arvados.CollectionWriter() - - cw.start_new_file('thing1.txt') - cw.write("data 1") + cw = arvados.collection.Collection() + with cw.open('thing1.txt', 'w') as f: + f.write('data 1') - self.testcollection = cw.finish() + self.testcollection = cw.portable_data_hash() self.test_manifest = cw.manifest_text() created = self.api.collections().create(body={"manifest_text":self.test_manifest}).execute() self.testcollectionuuid = str(created['uuid']) -- 2.30.2