_logger = logging.getLogger('arvados.collection')
-
-if sys.version_info >= (3, 0):
- TextIOWrapper = io.TextIOWrapper
-else:
- class TextIOWrapper(io.TextIOWrapper):
- """To maintain backward compatibility, cast str to unicode in
- write('foo').
-
- """
- def write(self, data):
- if isinstance(data, basestring):
- data = unicode(data)
- return super(TextIOWrapper, self).write(data)
-
-
class CollectionBase(object):
"""Abstract base class for Collection classes."""
class CollectionWriter(CollectionBase):
"""Deprecated, use Collection instead."""
+ @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
def __init__(self, api_client=None, num_retries=0, replication=None):
"""Instantiate a CollectionWriter.
'_data_buffer', '_dependencies', '_finished_streams',
'_queued_dirents', '_queued_trees']
+ @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
def __init__(self, api_client=None, **kwargs):
self._dependencies = {}
super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
if 'b' not in mode:
bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
- f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+ f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
return f
def modified(self):
self.set_committed(False)
self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
- item.remove(pathcomponents[1])
+ item.remove(pathcomponents[1], recursive=recursive)
def _clonefrom(self, source):
for k,v in listitems(source):
def __init__(self, manifest_locator_or_text=None,
api_client=None,
keep_client=None,
- num_retries=None,
+ num_retries=10,
parent=None,
apiconfig=None,
block_manager=None,
super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
+
+ # Use the keep client from ThreadSafeApiCache
+ if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
+ self._keep_client = self._api_client.keep
+
self._block_manager = block_manager
self.replication_desired = replication_desired
self._storage_classes_desired = storage_classes_desired
else:
self._config = config.settings()
- self.num_retries = num_retries if num_retries is not None else 0
+ self.num_retries = num_retries
self._manifest_locator = None
self._manifest_text = None
self._portable_data_hash = None
# our tokens.
return
else:
- self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+ self._remember_api_response(response)
other = CollectionReader(response["manifest_text"])
baseline = CollectionReader(self._manifest_text)
self.apply(baseline.diff(other))
@synchronized
def _my_api(self):
if self._api_client is None:
- self._api_client = ThreadSafeApiCache(self._config)
+ self._api_client = ThreadSafeApiCache(self._config, version='v1')
if self._keep_client is None:
self._keep_client = self._api_client.keep
return self._api_client
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
+ self._block_manager = _BlockManager(self._my_keep(),
+ copies=copies,
+ put_threads=self.put_threads,
+ num_retries=self.num_retries,
+ storage_classes_func=self.storage_classes_desired)
return self._block_manager
def _remember_api_response(self, response):
storage_classes=None,
trash_at=None,
merge=True,
- num_retries=None):
+ num_retries=None,
+ preserve_version=False):
"""Save collection to an existing collection record.
Commit pending buffer blocks to Keep, merge with remote record (if
:num_retries:
Retry count on API calls (if None, use the collection default)
+ :preserve_version:
+ If True, indicate that the collection content being saved right now
+ should be preserved in a version snapshot if the collection record is
+ updated in the future. Requires that the API server has
+ Collections.CollectionVersioning enabled, if not, setting this will
+ raise an exception.
+
"""
if properties and type(properties) is not dict:
raise errors.ArgumentError("properties must be dictionary type.")
if trash_at and type(trash_at) is not datetime.datetime:
raise errors.ArgumentError("trash_at must be datetime type.")
+ if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+ raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
body={}
if properties:
body["properties"] = properties
if trash_at:
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
+ if preserve_version:
+ body["preserve_version"] = preserve_version
if not self.committed():
if self._has_remote_blocks:
storage_classes=None,
trash_at=None,
ensure_unique_name=False,
- num_retries=None):
+ num_retries=None,
+ preserve_version=False):
"""Save collection to a new collection record.
Commit pending buffer blocks to Keep and, when create_collection_record
:num_retries:
Retry count on API calls (if None, use the collection default)
+ :preserve_version:
+ If True, indicate that the collection content being saved right now
+ should be preserved in a version snapshot if the collection record is
+ updated in the future. Requires that the API server has
+ Collections.CollectionVersioning enabled, if not, setting this will
+ raise an exception.
+
"""
if properties and type(properties) is not dict:
raise errors.ArgumentError("properties must be dictionary type.")
if trash_at and type(trash_at) is not datetime.datetime:
raise errors.ArgumentError("trash_at must be datetime type.")
+ if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+ raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
if self._has_remote_blocks:
# Copy any remote blocks to the local cluster.
self._copy_remote_blocks(remote_blocks={})
if trash_at:
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
+ if preserve_version:
+ body["preserve_version"] = preserve_version
self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
text = self._api_response["manifest_text"]
_segment_re = re.compile(r'(\d+):(\d+):(\S+)')
def _unescape_manifest_path(self, path):
- return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
+ return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
@synchronized
def _import_manifest(self, manifest_text):
self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
else:
filepath = os.path.join(stream_name, name)
- afile = self.find_or_create(filepath, FILE)
+ try:
+ afile = self.find_or_create(filepath, FILE)
+ except IOError as e:
+ if e.errno == errno.ENOTDIR:
+ raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
+ else:
+ raise e from None
if isinstance(afile, ArvadosFile):
afile.add_segment(blocks, pos, size)
else:
self._streams = [normalize_stream(s, streams[s])
for s in sorted(streams)]
+
+ @arvados.util._deprecated('3.0', 'Collection iteration')
@_populate_streams
def all_streams(self):
return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
for s in self._streams]
+ @arvados.util._deprecated('3.0', 'Collection iteration')
@_populate_streams
def all_files(self):
for s in self.all_streams():