_logger = logging.getLogger('arvados.collection')
-
-if sys.version_info >= (3, 0):
- TextIOWrapper = io.TextIOWrapper
-else:
- class TextIOWrapper(io.TextIOWrapper):
- """To maintain backward compatibility, cast str to unicode in
- write('foo').
-
- """
- def write(self, data):
- if isinstance(data, basestring):
- data = unicode(data)
- return super(TextIOWrapper, self).write(data)
-
-
class CollectionBase(object):
"""Abstract base class for Collection classes."""
class CollectionWriter(CollectionBase):
"""Deprecated, use Collection instead."""
+ @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
def __init__(self, api_client=None, num_retries=0, replication=None):
"""Instantiate a CollectionWriter.
'_data_buffer', '_dependencies', '_finished_streams',
'_queued_dirents', '_queued_trees']
+ @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
def __init__(self, api_client=None, **kwargs):
self._dependencies = {}
super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
if 'b' not in mode:
bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
- f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+ f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
return f
def modified(self):
self.set_committed(False)
self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
- item.remove(pathcomponents[1])
+ item.remove(pathcomponents[1], recursive=recursive)
def _clonefrom(self, source):
for k,v in listitems(source):
def __init__(self, manifest_locator_or_text=None,
api_client=None,
keep_client=None,
- num_retries=None,
+ num_retries=10,
parent=None,
apiconfig=None,
block_manager=None,
super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
+
+ # Use the keep client from ThreadSafeApiCache
+ if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
+ self._keep_client = self._api_client.keep
+
self._block_manager = block_manager
self.replication_desired = replication_desired
self._storage_classes_desired = storage_classes_desired
else:
self._config = config.settings()
- self.num_retries = num_retries if num_retries is not None else 0
+ self.num_retries = num_retries
self._manifest_locator = None
self._manifest_text = None
self._portable_data_hash = None
@synchronized
def _my_api(self):
if self._api_client is None:
- self._api_client = ThreadSafeApiCache(self._config)
+ self._api_client = ThreadSafeApiCache(self._config, version='v1')
if self._keep_client is None:
self._keep_client = self._api_client.keep
return self._api_client
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
+ self._block_manager = _BlockManager(self._my_keep(),
+ copies=copies,
+ put_threads=self.put_threads,
+ num_retries=self.num_retries,
+ storage_classes_func=self.storage_classes_desired)
return self._block_manager
def _remember_api_response(self, response):
_segment_re = re.compile(r'(\d+):(\d+):(\S+)')
def _unescape_manifest_path(self, path):
- return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
+ return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
@synchronized
def _import_manifest(self, manifest_text):
self._streams = [normalize_stream(s, streams[s])
for s in sorted(streams)]
+
+ @arvados.util._deprecated('3.0', 'Collection iteration')
@_populate_streams
def all_streams(self):
return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
for s in self._streams]
+ @arvados.util._deprecated('3.0', 'Collection iteration')
@_populate_streams
def all_files(self):
for s in self.all_streams():