+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from __future__ import absolute_import
+from future.utils import listitems, listvalues, viewkeys
+from builtins import str
+from past.builtins import basestring
+from builtins import object
+import ciso8601
+import datetime
+import errno
import functools
+import hashlib
+import io
import logging
import os
import re
-import errno
-import hashlib
-import time
+import sys
import threading
+import time
from collections import deque
from stat import *
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
from .keep import KeepLocator, KeepClient
from .stream import StreamReader
-from ._normalize_stream import normalize_stream
+from ._normalize_stream import normalize_stream, escape
from ._ranges import Range, LocatorAndRange
from .safeapi import ThreadSafeApiCache
import arvados.config as config
_logger = logging.getLogger('arvados.collection')
+
+if sys.version_info >= (3, 0):
+ TextIOWrapper = io.TextIOWrapper
+else:
+ class TextIOWrapper(io.TextIOWrapper):
+ """To maintain backward compatibility, cast str to unicode in
+ write('foo').
+
+ """
+ def write(self, data):
+ if isinstance(data, basestring):
+ data = unicode(data)
+ return super(TextIOWrapper, self).write(data)
+
+
class CollectionBase(object):
+ """Abstract base class for Collection classes."""
+
def __enter__(self):
return self
class CollectionWriter(CollectionBase):
+ """Deprecated, use Collection instead."""
+
def __init__(self, api_client=None, num_retries=0, replication=None):
"""Instantiate a CollectionWriter.
self.do_queued_work()
def write(self, newdata):
- if hasattr(newdata, '__iter__'):
+ if isinstance(newdata, bytes):
+ pass
+ elif isinstance(newdata, str):
+ newdata = newdata.encode()
+ elif hasattr(newdata, '__iter__'):
for s in newdata:
self.write(s)
return
streampath, filename = split(streampath)
if self._last_open and not self._last_open.closed:
raise errors.AssertionError(
- "can't open '{}' when '{}' is still open".format(
+ u"can't open '{}' when '{}' is still open".format(
filename, self._last_open.name))
if streampath != self.current_stream_name():
self.start_new_stream(streampath)
return self._last_open
def flush_data(self):
- data_buffer = ''.join(self._data_buffer)
+ data_buffer = b''.join(self._data_buffer)
if data_buffer:
self._current_stream_locators.append(
self._my_keep().put(
sending manifest_text() to the API server's "create
collection" endpoint.
"""
- return self._my_keep().put(self.manifest_text(), copies=self.replication)
+ return self._my_keep().put(self.manifest_text().encode(),
+ copies=self.replication)
def portable_data_hash(self):
- stripped = self.stripped_manifest()
- return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+ stripped = self.stripped_manifest().encode()
+ return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
def manifest_text(self):
self.finish_current_stream()
ret += locators
return ret
+ def save_new(self, name=None):
+ return self._api_client.collections().create(
+ ensure_unique_name=True,
+ body={
+ 'name': name,
+ 'manifest_text': self.manifest_text(),
+ }).execute(num_retries=self.num_retries)
+
class ResumableCollectionWriter(CollectionWriter):
+ """Deprecated, use Collection instead."""
+
STATE_PROPS = ['_current_stream_files', '_current_stream_length',
'_current_stream_locators', '_current_stream_name',
'_current_file_name', '_current_file_pos', '_close_file',
writer._queued_file.seek(pos)
except IOError as error:
raise errors.StaleWriterStateError(
- "failed to reopen active file {}: {}".format(path, error))
+ u"failed to reopen active file {}: {}".format(path, error))
return writer
def check_dependencies(self):
- for path, orig_stat in self._dependencies.items():
+ for path, orig_stat in listitems(self._dependencies):
if not S_ISREG(orig_stat[ST_MODE]):
- raise errors.StaleWriterStateError("{} not file".format(path))
+ raise errors.StaleWriterStateError(u"{} not file".format(path))
try:
now_stat = tuple(os.stat(path))
except OSError as error:
raise errors.StaleWriterStateError(
- "failed to stat {}: {}".format(path, error))
+ u"failed to stat {}: {}".format(path, error))
if ((not S_ISREG(now_stat[ST_MODE])) or
(orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
(orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
- raise errors.StaleWriterStateError("{} changed".format(path))
+ raise errors.StaleWriterStateError(u"{} changed".format(path))
def dump_state(self, copy_func=lambda x: x):
state = {attr: copy_func(getattr(self, attr))
try:
src_path = os.path.realpath(source)
except Exception:
- raise errors.AssertionError("{} not a file path".format(source))
+ raise errors.AssertionError(u"{} not a file path".format(source))
try:
path_stat = os.stat(src_path)
except OSError as stat_error:
self._dependencies[source] = tuple(fd_stat)
elif path_stat is None:
raise errors.AssertionError(
- "could not stat {}: {}".format(source, stat_error))
+ u"could not stat {}: {}".format(source, stat_error))
elif path_stat.st_ino != fd_stat.st_ino:
raise errors.AssertionError(
- "{} changed between open and stat calls".format(source))
+ u"{} changed between open and stat calls".format(source))
else:
self._dependencies[src_path] = tuple(fd_stat)
def __init__(self, parent=None):
self.parent = parent
self._committed = False
+ self._has_remote_blocks = False
self._callback = None
self._items = {}
def stream_name(self):
raise NotImplementedError()
+
+ @synchronized
+ def has_remote_blocks(self):
+ """Recursively check for a +R segment locator signature."""
+
+ if self._has_remote_blocks:
+ return True
+ for item in self:
+ if self[item].has_remote_blocks():
+ return True
+ return False
+
+ @synchronized
+ def set_has_remote_blocks(self, val):
+ self._has_remote_blocks = val
+ if self.parent:
+ self.parent.set_has_remote_blocks(val)
+
@must_be_writable
@synchronized
def find_or_create(self, path, create_type):
return self.find_or_create(path, COLLECTION)
- def open(self, path, mode="r"):
+ def open(self, path, mode="r", encoding=None):
"""Open a file-like object for access.
:path:
path to a file in the collection
:mode:
- one of "r", "r+", "w", "w+", "a", "a+"
+ a string consisting of "r", "w", or "a", optionally followed
+ by "b" or "t", optionally followed by "+".
+ :"b":
+ binary mode: write() accepts bytes, read() returns bytes.
+ :"t":
+ text mode (default): write() accepts strings, read() returns strings.
:"r":
opens for reading
:"r+":
opens for reading and writing. All writes are appended to
the end of the file. Writing does not affect the file pointer for
reading.
+
"""
- mode = mode.replace("b", "")
- if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
- raise errors.ArgumentError("Bad mode '%s'" % mode)
- create = (mode != "r")
- if create and not self.writable():
- raise IOError(errno.EROFS, "Collection is read only")
+ if not re.search(r'^[rwa][bt]?\+?$', mode):
+ raise errors.ArgumentError("Invalid mode {!r}".format(mode))
- if create:
- arvfile = self.find_or_create(path, FILE)
- else:
+ if mode[0] == 'r' and '+' not in mode:
+ fclass = ArvadosFileReader
arvfile = self.find(path)
+ elif not self.writable():
+ raise IOError(errno.EROFS, "Collection is read only")
+ else:
+ fclass = ArvadosFileWriter
+ arvfile = self.find_or_create(path, FILE)
if arvfile is None:
raise IOError(errno.ENOENT, "File not found", path)
if not isinstance(arvfile, ArvadosFile):
raise IOError(errno.EISDIR, "Is a directory", path)
- if mode[0] == "w":
+ if mode[0] == 'w':
arvfile.truncate(0)
- name = os.path.basename(path)
-
- if mode == "r":
- return ArvadosFileReader(arvfile, num_retries=self.num_retries)
- else:
- return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
+ binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+ f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+ if 'b' not in mode:
+ bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+ f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+ return f
def modified(self):
"""Determine if the collection has been modified since last commited."""
if value == self._committed:
return
if value:
- for k,v in self._items.items():
+ for k,v in listitems(self._items):
v.set_committed(True)
self._committed = True
else:
@synchronized
def __iter__(self):
"""Iterate over names of files and collections contained in this collection."""
- return iter(self._items.keys())
+ return iter(viewkeys(self._items))
@synchronized
def __getitem__(self, k):
@synchronized
def values(self):
"""Get a list of files and collection objects directly contained in this collection."""
- return self._items.values()
+ return listvalues(self._items)
@synchronized
def items(self):
"""Get a list of (name, object) tuples directly contained in this collection."""
- return self._items.items()
+ return listitems(self._items)
def exists(self, path):
"""Test if there is a file or collection at `path`."""
self.set_committed(False)
self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
- item.remove(pathcomponents[1])
+ item.remove(pathcomponents[1], recursive=recursive)
def _clonefrom(self, source):
- for k,v in source.items():
+ for k,v in listitems(source):
self._items[k] = v.clone(self, k)
def clone(self):
self._items[target_name] = item
self.set_committed(False)
+ if not self._has_remote_blocks and source_obj.has_remote_blocks():
+ self.set_has_remote_blocks(True)
if modified_from:
self.notify(MOD, self, target_name, (modified_from, item))
if stream:
buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
- buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
+ buf.append(self[dirname].manifest_text(
+ stream_name=os.path.join(stream_name, dirname),
+ strip=strip, normalize=True, only_committed=only_committed))
return "".join(buf)
else:
if strip:
else:
return self._manifest_text
+ @synchronized
+ def _copy_remote_blocks(self, remote_blocks={}):
+ """Scan through the entire collection and ask Keep to copy remote blocks.
+
+ When accessing a remote collection, blocks will have a remote signature
+ (+R instead of +A). Collect these signatures and request Keep to copy the
+ blocks to the local cluster, returning local (+A) signatures.
+
+ :remote_blocks:
+ Shared cache of remote to local block mappings. This is used to avoid
+ doing extra work when blocks are shared by more than one file in
+ different subdirectories.
+
+ """
+ for item in self:
+ remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
+ return remote_blocks
+
@synchronized
def diff(self, end_collection, prefix=".", holding_collection=None):
"""Generate list of add/modify/delete actions.
# then return API server's PDH response.
return self._portable_data_hash
else:
- stripped = self.portable_manifest_text()
- return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+ stripped = self.portable_manifest_text().encode()
+ return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
@synchronized
def subscribe(self, callback):
@synchronized
def flush(self):
"""Flush bufferblocks to Keep."""
- for e in self.values():
+ for e in listvalues(self):
e.flush()
apiconfig=None,
block_manager=None,
replication_desired=None,
- put_threads=None):
+ storage_classes_desired=None,
+ put_threads=None,
+ get_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
- One of Arvados collection UUID, block locator of
- a manifest, raw manifest text, or None (to create an empty collection).
+ An Arvados collection UUID, portable data hash, raw manifest
+ text, or (if creating an empty collection) None.
+
:parent:
the parent Collection, may be None.
configuration applies. If not None, this value will also be used
for determining the number of block copies being written.
+ :storage_classes_desired:
+ A list of storage class names where to upload the data. If None,
+ the keep client is expected to store the data into the cluster's
+ default storage class(es).
+
"""
+
+ if storage_classes_desired and type(storage_classes_desired) is not list:
+ raise errors.ArgumentError("storage_classes_desired must be list type.")
+
super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
+
+ # Use the keep client from ThreadSafeApiCache
+ if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
+ self._keep_client = self._api_client.keep
+
self._block_manager = block_manager
self.replication_desired = replication_desired
+ self._storage_classes_desired = storage_classes_desired
self.put_threads = put_threads
+ self.get_threads = get_threads
if apiconfig:
self._config = apiconfig
self._manifest_locator = manifest_locator_or_text
elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
+ if not self._has_local_collection_uuid():
+ self._has_remote_blocks = True
elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
self._manifest_text = manifest_locator_or_text
+ if '+R' in self._manifest_text:
+ self._has_remote_blocks = True
else:
raise errors.ArgumentError(
"Argument to CollectionReader is not a manifest or a collection UUID")
try:
self._populate()
- except (IOError, errors.SyntaxError) as e:
- raise errors.ArgumentError("Error processing manifest text: %s", e)
+ except errors.SyntaxError as e:
+ raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
+
+ def storage_classes_desired(self):
+ return self._storage_classes_desired or []
def root_collection(self):
return self
+ def get_properties(self):
+ if self._api_response and self._api_response["properties"]:
+ return self._api_response["properties"]
+ else:
+ return {}
+
+ def get_trash_at(self):
+ if self._api_response and self._api_response["trash_at"]:
+ try:
+ return ciso8601.parse_datetime(self._api_response["trash_at"])
+ except ValueError:
+ return None
+ else:
+ return None
+
def stream_name(self):
return "."
# our tokens.
return
else:
- self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+ self._remember_api_response(response)
other = CollectionReader(response["manifest_text"])
baseline = CollectionReader(self._manifest_text)
self.apply(baseline.diff(other))
@synchronized
def _my_api(self):
if self._api_client is None:
- self._api_client = ThreadSafeApiCache(self._config)
+ self._api_client = ThreadSafeApiCache(self._config, version='v1')
if self._keep_client is None:
self._keep_client = self._api_client.keep
return self._api_client
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
+ self._block_manager = _BlockManager(self._my_keep(),
+ copies=copies,
+ put_threads=self.put_threads,
+ num_retries=self.num_retries,
+ storage_classes_func=self.storage_classes_desired,
+ get_threads=self.get_threads,)
return self._block_manager
def _remember_api_response(self, response):
# it. If instantiation fails, we'll fall back to the except
# clause, just like any other Collection lookup
# failure. Return an exception, or None if successful.
- try:
- self._remember_api_response(self._my_api().collections().get(
- uuid=self._manifest_locator).execute(
- num_retries=self.num_retries))
- self._manifest_text = self._api_response['manifest_text']
- self._portable_data_hash = self._api_response['portable_data_hash']
- # If not overriden via kwargs, we should try to load the
- # replication_desired from the API server
- if self.replication_desired is None:
- self.replication_desired = self._api_response.get('replication_desired', None)
- return None
- except Exception as e:
- return e
-
- def _populate_from_keep(self):
- # Retrieve a manifest directly from Keep. This has a chance of
- # working if [a] the locator includes a permission signature
- # or [b] the Keep services are operating in world-readable
- # mode. Return an exception, or None if successful.
- try:
- self._manifest_text = self._my_keep().get(
- self._manifest_locator, num_retries=self.num_retries)
- except Exception as e:
- return e
+ self._remember_api_response(self._my_api().collections().get(
+ uuid=self._manifest_locator).execute(
+ num_retries=self.num_retries))
+ self._manifest_text = self._api_response['manifest_text']
+ self._portable_data_hash = self._api_response['portable_data_hash']
+ # If not overriden via kwargs, we should try to load the
+ # replication_desired and storage_classes_desired from the API server
+ if self.replication_desired is None:
+ self.replication_desired = self._api_response.get('replication_desired', None)
+ if self._storage_classes_desired is None:
+ self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
def _populate(self):
- if self._manifest_locator is None and self._manifest_text is None:
- return
- error_via_api = None
- error_via_keep = None
- should_try_keep = ((self._manifest_text is None) and
- arvados.util.keep_locator_pattern.match(
- self._manifest_locator))
- if ((self._manifest_text is None) and
- arvados.util.signed_locator_pattern.match(self._manifest_locator)):
- error_via_keep = self._populate_from_keep()
- if self._manifest_text is None:
- error_via_api = self._populate_from_api_server()
- if error_via_api is not None and not should_try_keep:
- raise error_via_api
- if ((self._manifest_text is None) and
- not error_via_keep and
- should_try_keep):
- # Looks like a keep locator, and we didn't already try keep above
- error_via_keep = self._populate_from_keep()
if self._manifest_text is None:
- # Nothing worked!
- raise errors.NotFoundError(
- ("Failed to retrieve collection '{}' " +
- "from either API server ({}) or Keep ({})."
- ).format(
- self._manifest_locator,
- error_via_api,
- error_via_keep))
- # populate
+ if self._manifest_locator is None:
+ return
+ else:
+ self._populate_from_api_server()
self._baseline_manifest = self._manifest_text
self._import_manifest(self._manifest_text)
-
def _has_collection_uuid(self):
return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
+ def _has_local_collection_uuid(self):
+ return self._has_collection_uuid and \
+ self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
+
def __enter__(self):
return self
@must_be_writable
@synchronized
@retry_method
- def save(self, merge=True, num_retries=None):
+ def save(self,
+ properties=None,
+ storage_classes=None,
+ trash_at=None,
+ merge=True,
+ num_retries=None,
+ preserve_version=False):
"""Save collection to an existing collection record.
Commit pending buffer blocks to Keep, merge with remote record (if
- merge=True, the default), and update the collection record. Returns
+ merge=True, the default), and update the collection record. Returns
the current manifest text.
Will raise AssertionError if not associated with a collection record on
the API server. If you want to save a manifest to Keep only, see
`save_new()`.
+ :properties:
+ Additional properties of collection. This value will replace any existing
+ properties of collection.
+
+ :storage_classes:
+ Specify desirable storage classes to be used when writing data to Keep.
+
+ :trash_at:
+ A collection is *expiring* when it has a *trash_at* time in the future.
+ An expiring collection can be accessed as normal,
+ but is scheduled to be trashed automatically at the *trash_at* time.
+
:merge:
Update and merge remote changes before saving. Otherwise, any
remote changes will be ignored and overwritten.
:num_retries:
Retry count on API calls (if None, use the collection default)
+ :preserve_version:
+ If True, indicate that the collection content being saved right now
+ should be preserved in a version snapshot if the collection record is
+ updated in the future. Requires that the API server has
+ Collections.CollectionVersioning enabled, if not, setting this will
+ raise an exception.
+
"""
+ if properties and type(properties) is not dict:
+ raise errors.ArgumentError("properties must be dictionary type.")
+
+ if storage_classes and type(storage_classes) is not list:
+ raise errors.ArgumentError("storage_classes must be list type.")
+ if storage_classes:
+ self._storage_classes_desired = storage_classes
+
+ if trash_at and type(trash_at) is not datetime.datetime:
+ raise errors.ArgumentError("trash_at must be datetime type.")
+
+ if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+ raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
+ body={}
+ if properties:
+ body["properties"] = properties
+ if self.storage_classes_desired():
+ body["storage_classes_desired"] = self.storage_classes_desired()
+ if trash_at:
+ t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ body["trash_at"] = t
+ if preserve_version:
+ body["preserve_version"] = preserve_version
+
if not self.committed():
+ if self._has_remote_blocks:
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+ self._has_remote_blocks = False
if not self._has_collection_uuid():
raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
+ elif not self._has_local_collection_uuid():
+ raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
self._my_block_manager().commit_all()
self.update()
text = self.manifest_text(strip=False)
+ body['manifest_text'] = text
+
self._remember_api_response(self._my_api().collections().update(
uuid=self._manifest_locator,
- body={'manifest_text': text}
- ).execute(
- num_retries=num_retries))
+ body=body
+ ).execute(num_retries=num_retries))
self._manifest_text = self._api_response["manifest_text"]
self._portable_data_hash = self._api_response["portable_data_hash"]
self.set_committed(True)
+ elif body:
+ self._remember_api_response(self._my_api().collections().update(
+ uuid=self._manifest_locator,
+ body=body
+ ).execute(num_retries=num_retries))
return self._manifest_text
def save_new(self, name=None,
create_collection_record=True,
owner_uuid=None,
+ properties=None,
+ storage_classes=None,
+ trash_at=None,
ensure_unique_name=False,
- num_retries=None):
+ num_retries=None,
+ preserve_version=False):
"""Save collection to a new collection record.
Commit pending buffer blocks to Keep and, when create_collection_record
is True (default), create a new collection record. After creating a
new collection record, this Collection object will be associated with
- the new record used by `save()`. Returns the current manifest text.
+ the new record used by `save()`. Returns the current manifest text.
:name:
The collection name.
the user, or project uuid that will own this collection.
If None, defaults to the current user.
+ :properties:
+ Additional properties of collection. This value will replace any existing
+ properties of collection.
+
+ :storage_classes:
+ Specify desirable storage classes to be used when writing data to Keep.
+
+ :trash_at:
+ A collection is *expiring* when it has a *trash_at* time in the future.
+ An expiring collection can be accessed as normal,
+ but is scheduled to be trashed automatically at the *trash_at* time.
+
:ensure_unique_name:
If True, ask the API server to rename the collection
if it conflicts with a collection with the same name and owner. If
:num_retries:
Retry count on API calls (if None, use the collection default)
+ :preserve_version:
+ If True, indicate that the collection content being saved right now
+ should be preserved in a version snapshot if the collection record is
+ updated in the future. Requires that the API server has
+ Collections.CollectionVersioning enabled, if not, setting this will
+ raise an exception.
+
"""
+ if properties and type(properties) is not dict:
+ raise errors.ArgumentError("properties must be dictionary type.")
+
+ if storage_classes and type(storage_classes) is not list:
+ raise errors.ArgumentError("storage_classes must be list type.")
+
+ if trash_at and type(trash_at) is not datetime.datetime:
+ raise errors.ArgumentError("trash_at must be datetime type.")
+
+ if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+ raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
+ if self._has_remote_blocks:
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+ self._has_remote_blocks = False
+
+ if storage_classes:
+ self._storage_classes_desired = storage_classes
+
self._my_block_manager().commit_all()
text = self.manifest_text(strip=False)
"replication_desired": self.replication_desired}
if owner_uuid:
body["owner_uuid"] = owner_uuid
+ if properties:
+ body["properties"] = properties
+ if self.storage_classes_desired():
+ body["storage_classes_desired"] = self.storage_classes_desired()
+ if trash_at:
+ t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ body["trash_at"] = t
+ if preserve_version:
+ body["preserve_version"] = preserve_version
self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
text = self._api_response["manifest_text"]
return text
+ _token_re = re.compile(r'(\S+)(\s+|$)')
+ _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+ _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+
+ def _unescape_manifest_path(self, path):
+ return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
+
@synchronized
def _import_manifest(self, manifest_text):
"""Import a manifest into a `Collection`.
stream_name = None
state = STREAM_NAME
- for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+ for token_and_separator in self._token_re.finditer(manifest_text):
tok = token_and_separator.group(1)
sep = token_and_separator.group(2)
if state == STREAM_NAME:
# starting a new stream
- stream_name = tok.replace('\\040', ' ')
+ stream_name = self._unescape_manifest_path(tok)
blocks = []
segments = []
streamoffset = 0
continue
if state == BLOCKS:
- block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+ block_locator = self._block_re.match(tok)
if block_locator:
- blocksize = long(block_locator.group(1))
+ blocksize = int(block_locator.group(1))
blocks.append(Range(tok, streamoffset, blocksize, 0))
streamoffset += blocksize
else:
state = SEGMENTS
if state == SEGMENTS:
- file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+ file_segment = self._segment_re.match(tok)
if file_segment:
- pos = long(file_segment.group(1))
- size = long(file_segment.group(2))
- name = file_segment.group(3).replace('\\040', ' ')
- filepath = os.path.join(stream_name, name)
- afile = self.find_or_create(filepath, FILE)
- if isinstance(afile, ArvadosFile):
- afile.add_segment(blocks, pos, size)
+ pos = int(file_segment.group(1))
+ size = int(file_segment.group(2))
+ name = self._unescape_manifest_path(file_segment.group(3))
+ if name.split('/')[-1] == '.':
+ # placeholder for persisting an empty directory, not a real file
+ if len(name) > 2:
+ self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
else:
- raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+ filepath = os.path.join(stream_name, name)
+ try:
+ afile = self.find_or_create(filepath, FILE)
+ except IOError as e:
+ if e.errno == errno.ENOTDIR:
+ raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
+ else:
+ raise e from None
+ if isinstance(afile, ArvadosFile):
+ afile.add_segment(blocks, pos, size)
+ else:
+ raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
else:
# error!
raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
self.name = newname
self.lock = self.parent.root_collection().lock
+ @synchronized
+ def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
+ """Encode empty directories by using an \056-named (".") empty file"""
+ if len(self._items) == 0:
+ return "%s %s 0:0:\\056\n" % (
+ escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
+ return super(Subcollection, self)._get_manifest_text(stream_name,
+ strip, normalize,
+ only_committed)
+
class CollectionReader(Collection):
"""A read-only collection object.
- Initialize from an api collection record locator, a portable data hash of a
- manifest, or raw manifest text. See `Collection` constructor for detailed
- options.
+ Initialize from a collection UUID or portable data hash, or raw
+ manifest text. See `Collection` constructor for detailed options.
"""
def __init__(self, manifest_locator_or_text, *args, **kwargs):