from builtins import str
from past.builtins import basestring
from builtins import object
+import ciso8601
+import datetime
+import errno
import functools
+import hashlib
+import io
import logging
import os
import re
-import errno
-import hashlib
-import datetime
-import ciso8601
-import time
+import sys
import threading
+import time
from collections import deque
from stat import *
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
from .keep import KeepLocator, KeepClient
from .stream import StreamReader
-from ._normalize_stream import normalize_stream
+from ._normalize_stream import normalize_stream, escape
from ._ranges import Range, LocatorAndRange
from .safeapi import ThreadSafeApiCache
import arvados.config as config
_logger = logging.getLogger('arvados.collection')
+
+if sys.version_info >= (3, 0):
+ TextIOWrapper = io.TextIOWrapper
+else:
+ class TextIOWrapper(io.TextIOWrapper):
+ """To maintain backward compatibility, cast str to unicode in
+ write('foo').
+
+ """
+ def write(self, data):
+ if isinstance(data, basestring):
+ data = unicode(data)
+ return super(TextIOWrapper, self).write(data)
+
+
class CollectionBase(object):
"""Abstract base class for Collection classes."""
streampath, filename = split(streampath)
if self._last_open and not self._last_open.closed:
raise errors.AssertionError(
- "can't open '{}' when '{}' is still open".format(
+ u"can't open '{}' when '{}' is still open".format(
filename, self._last_open.name))
if streampath != self.current_stream_name():
self.start_new_stream(streampath)
writer._queued_file.seek(pos)
except IOError as error:
raise errors.StaleWriterStateError(
- "failed to reopen active file {}: {}".format(path, error))
+ u"failed to reopen active file {}: {}".format(path, error))
return writer
def check_dependencies(self):
for path, orig_stat in listitems(self._dependencies):
if not S_ISREG(orig_stat[ST_MODE]):
- raise errors.StaleWriterStateError("{} not file".format(path))
+ raise errors.StaleWriterStateError(u"{} not file".format(path))
try:
now_stat = tuple(os.stat(path))
except OSError as error:
raise errors.StaleWriterStateError(
- "failed to stat {}: {}".format(path, error))
+ u"failed to stat {}: {}".format(path, error))
if ((not S_ISREG(now_stat[ST_MODE])) or
(orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
(orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
- raise errors.StaleWriterStateError("{} changed".format(path))
+ raise errors.StaleWriterStateError(u"{} changed".format(path))
def dump_state(self, copy_func=lambda x: x):
state = {attr: copy_func(getattr(self, attr))
try:
src_path = os.path.realpath(source)
except Exception:
- raise errors.AssertionError("{} not a file path".format(source))
+ raise errors.AssertionError(u"{} not a file path".format(source))
try:
path_stat = os.stat(src_path)
except OSError as stat_error:
self._dependencies[source] = tuple(fd_stat)
elif path_stat is None:
raise errors.AssertionError(
- "could not stat {}: {}".format(source, stat_error))
+ u"could not stat {}: {}".format(source, stat_error))
elif path_stat.st_ino != fd_stat.st_ino:
raise errors.AssertionError(
- "{} changed between open and stat calls".format(source))
+ u"{} changed between open and stat calls".format(source))
else:
self._dependencies[src_path] = tuple(fd_stat)
def stream_name(self):
raise NotImplementedError()
+
@synchronized
def has_remote_blocks(self):
"""Recursively check for a +R segment locator signature."""
return self.find_or_create(path, COLLECTION)
- def open(self, path, mode="r"):
+ def open(self, path, mode="r", encoding=None):
"""Open a file-like object for access.
:path:
opens for reading and writing. All writes are appended to
the end of the file. Writing does not affect the file pointer for
reading.
+
"""
if not re.search(r'^[rwa][bt]?\+?$', mode):
if mode[0] == 'w':
arvfile.truncate(0)
- return fclass(arvfile, mode=mode, num_retries=self.num_retries)
+ binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+ f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+ if 'b' not in mode:
+ bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+ f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+ return f
def modified(self):
"""Determine if the collection has been modified since last commited."""
self.set_committed(False)
self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
- item.remove(pathcomponents[1])
+ item.remove(pathcomponents[1], recursive=recursive)
def _clonefrom(self, source):
for k,v in listitems(source):
if stream:
buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
- buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
+ buf.append(self[dirname].manifest_text(
+ stream_name=os.path.join(stream_name, dirname),
+ strip=strip, normalize=True, only_committed=only_committed))
return "".join(buf)
else:
if strip:
apiconfig=None,
block_manager=None,
replication_desired=None,
- put_threads=None):
+ storage_classes_desired=None,
+ put_threads=None,
+ get_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
configuration applies. If not None, this value will also be used
for determining the number of block copies being written.
+ :storage_classes_desired:
+ A list of storage class names where to upload the data. If None,
+ the keep client is expected to store the data into the cluster's
+ default storage class(es).
+
"""
+
+ if storage_classes_desired and type(storage_classes_desired) is not list:
+ raise errors.ArgumentError("storage_classes_desired must be list type.")
+
super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
self._block_manager = block_manager
self.replication_desired = replication_desired
+ self._storage_classes_desired = storage_classes_desired
self.put_threads = put_threads
+ self.get_threads = get_threads
if apiconfig:
self._config = apiconfig
try:
self._populate()
- except (IOError, errors.SyntaxError) as e:
- raise errors.ArgumentError("Error processing manifest text: %s", e)
+ except errors.SyntaxError as e:
+ raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
+
+ def storage_classes_desired(self):
+ return self._storage_classes_desired or []
def root_collection(self):
return self
def get_trash_at(self):
if self._api_response and self._api_response["trash_at"]:
- return ciso8601.parse_datetime(self._api_response["trash_at"])
+ try:
+ return ciso8601.parse_datetime(self._api_response["trash_at"])
+ except ValueError:
+ return None
else:
return None
# our tokens.
return
else:
- self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+ self._remember_api_response(response)
other = CollectionReader(response["manifest_text"])
baseline = CollectionReader(self._manifest_text)
self.apply(baseline.diff(other))
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
+ self._block_manager = _BlockManager(self._my_keep(),
+ copies=copies,
+ put_threads=self.put_threads,
+ num_retries=self.num_retries,
+ storage_classes_func=self.storage_classes_desired,
+ get_threads=self.get_threads,)
return self._block_manager
def _remember_api_response(self, response):
self._manifest_text = self._api_response['manifest_text']
self._portable_data_hash = self._api_response['portable_data_hash']
# If not overriden via kwargs, we should try to load the
- # replication_desired from the API server
+ # replication_desired and storage_classes_desired from the API server
if self.replication_desired is None:
self.replication_desired = self._api_response.get('replication_desired', None)
+ if self._storage_classes_desired is None:
+ self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
def _populate(self):
if self._manifest_text is None:
storage_classes=None,
trash_at=None,
merge=True,
- num_retries=None):
+ num_retries=None,
+ preserve_version=False):
"""Save collection to an existing collection record.
Commit pending buffer blocks to Keep, merge with remote record (if
:num_retries:
Retry count on API calls (if None, use the collection default)
+ :preserve_version:
+ If True, indicate that the collection content being saved right now
+ should be preserved in a version snapshot if the collection record is
+ updated in the future. Requires that the API server has
+ Collections.CollectionVersioning enabled, if not, setting this will
+ raise an exception.
+
"""
if properties and type(properties) is not dict:
raise errors.ArgumentError("properties must be dictionary type.")
if storage_classes and type(storage_classes) is not list:
raise errors.ArgumentError("storage_classes must be list type.")
+ if storage_classes:
+ self._storage_classes_desired = storage_classes
if trash_at and type(trash_at) is not datetime.datetime:
raise errors.ArgumentError("trash_at must be datetime type.")
+ if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+ raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
body={}
if properties:
body["properties"] = properties
- if storage_classes:
- body["storage_classes_desired"] = storage_classes
+ if self.storage_classes_desired():
+ body["storage_classes_desired"] = self.storage_classes_desired()
if trash_at:
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
+ if preserve_version:
+ body["preserve_version"] = preserve_version
if not self.committed():
if self._has_remote_blocks:
storage_classes=None,
trash_at=None,
ensure_unique_name=False,
- num_retries=None):
+ num_retries=None,
+ preserve_version=False):
"""Save collection to a new collection record.
Commit pending buffer blocks to Keep and, when create_collection_record
:num_retries:
Retry count on API calls (if None, use the collection default)
+ :preserve_version:
+ If True, indicate that the collection content being saved right now
+ should be preserved in a version snapshot if the collection record is
+ updated in the future. Requires that the API server has
+ Collections.CollectionVersioning enabled, if not, setting this will
+ raise an exception.
+
"""
if properties and type(properties) is not dict:
raise errors.ArgumentError("properties must be dictionary type.")
if trash_at and type(trash_at) is not datetime.datetime:
raise errors.ArgumentError("trash_at must be datetime type.")
+ if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+ raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
if self._has_remote_blocks:
# Copy any remote blocks to the local cluster.
self._copy_remote_blocks(remote_blocks={})
self._has_remote_blocks = False
+ if storage_classes:
+ self._storage_classes_desired = storage_classes
+
self._my_block_manager().commit_all()
text = self.manifest_text(strip=False)
body["owner_uuid"] = owner_uuid
if properties:
body["properties"] = properties
- if storage_classes:
- body["storage_classes_desired"] = storage_classes
+ if self.storage_classes_desired():
+ body["storage_classes_desired"] = self.storage_classes_desired()
if trash_at:
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
+ if preserve_version:
+ body["preserve_version"] = preserve_version
self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
text = self._api_response["manifest_text"]
self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
else:
filepath = os.path.join(stream_name, name)
- afile = self.find_or_create(filepath, FILE)
+ try:
+ afile = self.find_or_create(filepath, FILE)
+ except IOError as e:
+ if e.errno == errno.ENOTDIR:
+ raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
+ else:
+ raise e from None
if isinstance(afile, ArvadosFile):
afile.add_segment(blocks, pos, size)
else:
self.name = newname
self.lock = self.parent.root_collection().lock
+ @synchronized
+ def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
+ """Encode empty directories by using an \056-named (".") empty file"""
+ if len(self._items) == 0:
+ return "%s %s 0:0:\\056\n" % (
+ escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
+ return super(Subcollection, self)._get_manifest_text(stream_name,
+ strip, normalize,
+ only_committed)
+
class CollectionReader(Collection):
"""A read-only collection object.