+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from future.utils import listitems, listvalues, viewkeys
+from builtins import str
+from past.builtins import basestring
+from builtins import object
+import ciso8601
+import datetime
+import errno
import functools
+import hashlib
+import io
import logging
import os
import re
-import errno
-import hashlib
-import time
+import sys
import threading
+import time
from collections import deque
from stat import *
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
-from keep import KeepLocator, KeepClient
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
+from .keep import KeepLocator, KeepClient
from .stream import StreamReader
-from ._normalize_stream import normalize_stream
+from ._normalize_stream import normalize_stream, escape
from ._ranges import Range, LocatorAndRange
from .safeapi import ThreadSafeApiCache
-import config
-import errors
-import util
-import events
+import arvados.config as config
+import arvados.errors as errors
+import arvados.util
+import arvados.events as events
from arvados.retry import retry_method
_logger = logging.getLogger('arvados.collection')
+
+if sys.version_info >= (3, 0):
+ TextIOWrapper = io.TextIOWrapper
+else:
+ class TextIOWrapper(io.TextIOWrapper):
+ """To maintain backward compatibility, cast str to unicode in
+ write('foo').
+
+ """
+ def write(self, data):
+ if isinstance(data, basestring):
+ data = unicode(data)
+ return super(TextIOWrapper, self).write(data)
+
+
class CollectionBase(object):
+ """Abstract base class for Collection classes."""
+
def __enter__(self):
return self
if fields:
clean_fields = fields[:1] + [
(re.sub(r'\+[^\d][^\+]*', '', x)
- if re.match(util.keep_locator_pattern, x)
+ if re.match(arvados.util.keep_locator_pattern, x)
else x)
for x in fields[1:]]
clean += [' '.join(clean_fields), "\n"]
class CollectionWriter(CollectionBase):
+ """Deprecated, use Collection instead."""
+
def __init__(self, api_client=None, num_retries=0, replication=None):
"""Instantiate a CollectionWriter.
def _work_trees(self):
path, stream_name, max_manifest_depth = self._queued_trees[0]
- d = util.listdir_recursive(
+ d = arvados.util.listdir_recursive(
path, max_depth = (None if max_manifest_depth == 0 else 0))
if d:
self._queue_dirents(stream_name, d)
self.do_queued_work()
def write(self, newdata):
- if hasattr(newdata, '__iter__'):
+ if isinstance(newdata, bytes):
+ pass
+ elif isinstance(newdata, str):
+ newdata = newdata.encode()
+ elif hasattr(newdata, '__iter__'):
for s in newdata:
self.write(s)
return
streampath, filename = split(streampath)
if self._last_open and not self._last_open.closed:
raise errors.AssertionError(
- "can't open '{}' when '{}' is still open".format(
+ u"can't open '{}' when '{}' is still open".format(
filename, self._last_open.name))
if streampath != self.current_stream_name():
self.start_new_stream(streampath)
return self._last_open
def flush_data(self):
- data_buffer = ''.join(self._data_buffer)
+ data_buffer = b''.join(self._data_buffer)
if data_buffer:
self._current_stream_locators.append(
self._my_keep().put(
sending manifest_text() to the API server's "create
collection" endpoint.
"""
- return self._my_keep().put(self.manifest_text(), copies=self.replication)
+ return self._my_keep().put(self.manifest_text().encode(),
+ copies=self.replication)
def portable_data_hash(self):
- stripped = self.stripped_manifest()
- return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+ stripped = self.stripped_manifest().encode()
+ return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
def manifest_text(self):
self.finish_current_stream()
ret += locators
return ret
+ def save_new(self, name=None):
+ return self._api_client.collections().create(
+ ensure_unique_name=True,
+ body={
+ 'name': name,
+ 'manifest_text': self.manifest_text(),
+ }).execute(num_retries=self.num_retries)
+
class ResumableCollectionWriter(CollectionWriter):
+ """Deprecated, use Collection instead."""
+
STATE_PROPS = ['_current_stream_files', '_current_stream_length',
'_current_stream_locators', '_current_stream_name',
'_current_file_name', '_current_file_pos', '_close_file',
writer._queued_file.seek(pos)
except IOError as error:
raise errors.StaleWriterStateError(
- "failed to reopen active file {}: {}".format(path, error))
+ u"failed to reopen active file {}: {}".format(path, error))
return writer
def check_dependencies(self):
- for path, orig_stat in self._dependencies.items():
+ for path, orig_stat in listitems(self._dependencies):
if not S_ISREG(orig_stat[ST_MODE]):
- raise errors.StaleWriterStateError("{} not file".format(path))
+ raise errors.StaleWriterStateError(u"{} not file".format(path))
try:
now_stat = tuple(os.stat(path))
except OSError as error:
raise errors.StaleWriterStateError(
- "failed to stat {}: {}".format(path, error))
+ u"failed to stat {}: {}".format(path, error))
if ((not S_ISREG(now_stat[ST_MODE])) or
(orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
(orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
- raise errors.StaleWriterStateError("{} changed".format(path))
+ raise errors.StaleWriterStateError(u"{} changed".format(path))
def dump_state(self, copy_func=lambda x: x):
state = {attr: copy_func(getattr(self, attr))
try:
src_path = os.path.realpath(source)
except Exception:
- raise errors.AssertionError("{} not a file path".format(source))
+ raise errors.AssertionError(u"{} not a file path".format(source))
try:
path_stat = os.stat(src_path)
except OSError as stat_error:
self._dependencies[source] = tuple(fd_stat)
elif path_stat is None:
raise errors.AssertionError(
- "could not stat {}: {}".format(source, stat_error))
+ u"could not stat {}: {}".format(source, stat_error))
elif path_stat.st_ino != fd_stat.st_ino:
raise errors.AssertionError(
- "{} changed between open and stat calls".format(source))
+ u"{} changed between open and stat calls".format(source))
else:
self._dependencies[src_path] = tuple(fd_stat)
def __init__(self, parent=None):
self.parent = parent
self._committed = False
+ self._has_remote_blocks = False
self._callback = None
self._items = {}
def stream_name(self):
raise NotImplementedError()
+
+ @synchronized
+ def has_remote_blocks(self):
+ """Recursively check for a +R segment locator signature."""
+
+ if self._has_remote_blocks:
+ return True
+ for item in self:
+ if self[item].has_remote_blocks():
+ return True
+ return False
+
+ @synchronized
+ def set_has_remote_blocks(self, val):
+ self._has_remote_blocks = val
+ if self.parent:
+ self.parent.set_has_remote_blocks(val)
+
@must_be_writable
@synchronized
def find_or_create(self, path, create_type):
else:
item = ArvadosFile(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
- self._committed = False
+ self.set_committed(False)
self.notify(ADD, self, pathcomponents[0], item)
return item
else:
# create new collection
item = Subcollection(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
- self._committed = False
+ self.set_committed(False)
self.notify(ADD, self, pathcomponents[0], item)
if isinstance(item, RichCollectionBase):
return item.find_or_create(pathcomponents[1], create_type)
def find(self, path):
"""Recursively search the specified file path.
- May return either a Collection or ArvadosFile. Return None if not
+ May return either a Collection or ArvadosFile. Return None if not
found.
+ If path is invalid (ex: starts with '/'), an IOError exception will be
+ raised.
"""
if not path:
raise errors.ArgumentError("Parameter 'path' is empty.")
pathcomponents = path.split("/", 1)
+ if pathcomponents[0] == '':
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
item = self._items.get(pathcomponents[0])
- if len(pathcomponents) == 1:
+ if item is None:
+ return None
+ elif len(pathcomponents) == 1:
return item
else:
if isinstance(item, RichCollectionBase):
return self.find_or_create(path, COLLECTION)
- def open(self, path, mode="r"):
+ def open(self, path, mode="r", encoding=None):
"""Open a file-like object for access.
:path:
path to a file in the collection
:mode:
- one of "r", "r+", "w", "w+", "a", "a+"
+ a string consisting of "r", "w", or "a", optionally followed
+ by "b" or "t", optionally followed by "+".
+ :"b":
+ binary mode: write() accepts bytes, read() returns bytes.
+ :"t":
+ text mode (default): write() accepts strings, read() returns strings.
:"r":
opens for reading
:"r+":
opens for reading and writing. All writes are appended to
the end of the file. Writing does not affect the file pointer for
reading.
+
"""
- mode = mode.replace("b", "")
- if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
- raise errors.ArgumentError("Bad mode '%s'" % mode)
- create = (mode != "r")
- if create and not self.writable():
- raise IOError(errno.EROFS, "Collection is read only")
+ if not re.search(r'^[rwa][bt]?\+?$', mode):
+ raise errors.ArgumentError("Invalid mode {!r}".format(mode))
- if create:
- arvfile = self.find_or_create(path, FILE)
- else:
+ if mode[0] == 'r' and '+' not in mode:
+ fclass = ArvadosFileReader
arvfile = self.find(path)
+ elif not self.writable():
+ raise IOError(errno.EROFS, "Collection is read only")
+ else:
+ fclass = ArvadosFileWriter
+ arvfile = self.find_or_create(path, FILE)
if arvfile is None:
raise IOError(errno.ENOENT, "File not found", path)
if not isinstance(arvfile, ArvadosFile):
raise IOError(errno.EISDIR, "Is a directory", path)
- if mode[0] == "w":
+ if mode[0] == 'w':
arvfile.truncate(0)
- name = os.path.basename(path)
-
- if mode == "r":
- return ArvadosFileReader(arvfile, num_retries=self.num_retries)
- else:
- return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
+ binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+ f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+ if 'b' not in mode:
+ bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+ f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+ return f
def modified(self):
"""Determine if the collection has been modified since last commited."""
@synchronized
def committed(self):
"""Determine if the collection has been committed to the API server."""
-
- if self._committed is False:
- return False
- for v in self._items.values():
- if v.committed() is False:
- return False
- return True
+ return self._committed
@synchronized
- def set_committed(self):
- """Recursively set committed flag to True."""
- self._committed = True
- for k,v in self._items.items():
- v.set_committed()
+ def set_committed(self, value=True):
+ """Recursively set committed flag.
+
+ If value is True, set committed to be True for this and all children.
+
+ If value is False, set committed to be False for this and all parents.
+ """
+ if value == self._committed:
+ return
+ if value:
+ for k,v in listitems(self._items):
+ v.set_committed(True)
+ self._committed = True
+ else:
+ self._committed = False
+ if self.parent is not None:
+ self.parent.set_committed(False)
@synchronized
def __iter__(self):
"""Iterate over names of files and collections contained in this collection."""
- return iter(self._items.keys())
+ return iter(viewkeys(self._items))
@synchronized
def __getitem__(self, k):
def __delitem__(self, p):
"""Delete an item by name which is directly contained by this collection."""
del self._items[p]
- self._committed = False
+ self.set_committed(False)
self.notify(DEL, self, p, None)
@synchronized
@synchronized
def values(self):
"""Get a list of files and collection objects directly contained in this collection."""
- return self._items.values()
+ return listvalues(self._items)
@synchronized
def items(self):
"""Get a list of (name, object) tuples directly contained in this collection."""
- return self._items.items()
+ return listitems(self._items)
def exists(self, path):
"""Test if there is a file or collection at `path`."""
raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
deleteditem = self._items[pathcomponents[0]]
del self._items[pathcomponents[0]]
- self._committed = False
+ self.set_committed(False)
self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
item.remove(pathcomponents[1])
def _clonefrom(self, source):
- for k,v in source.items():
+ for k,v in listitems(source):
self._items[k] = v.clone(self, k)
def clone(self):
item = source_obj.clone(self, target_name)
self._items[target_name] = item
- self._committed = False
+ self.set_committed(False)
+ if not self._has_remote_blocks and source_obj.has_remote_blocks():
+ self.set_has_remote_blocks(True)
if modified_from:
self.notify(MOD, self, target_name, (modified_from, item))
if target_dir is None:
raise IOError(errno.ENOENT, "Target directory not found", target_name)
- if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+ if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
target_dir = target_dir[target_name]
target_name = sourcecomponents[-1]
return self._get_manifest_text(stream_name, True, True)
@synchronized
- def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ def manifest_text(self, stream_name=".", strip=False, normalize=False,
+ only_committed=False):
"""Get the manifest text for this collection, sub collections and files.
This method will flush outstanding blocks to Keep. By default, it will
is not modified, return the original manifest text even if it is not
in normalized form.
+ :only_committed:
+ If True, don't commit pending blocks.
+
"""
- self._my_block_manager().commit_all()
- return self._get_manifest_text(stream_name, strip, normalize)
+ if not only_committed:
+ self._my_block_manager().commit_all()
+ return self._get_manifest_text(stream_name, strip, normalize,
+ only_committed=only_committed)
@synchronized
def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
if stream:
buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
- buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
+ buf.append(self[dirname].manifest_text(
+ stream_name=os.path.join(stream_name, dirname),
+ strip=strip, normalize=True, only_committed=only_committed))
return "".join(buf)
else:
if strip:
else:
return self._manifest_text
+ @synchronized
+ def _copy_remote_blocks(self, remote_blocks={}):
+ """Scan through the entire collection and ask Keep to copy remote blocks.
+
+ When accessing a remote collection, blocks will have a remote signature
+ (+R instead of +A). Collect these signatures and request Keep to copy the
+ blocks to the local cluster, returning local (+A) signatures.
+
+ :remote_blocks:
+ Shared cache of remote to local block mappings. This is used to avoid
+ doing extra work when blocks are shared by more than one file in
+ different subdirectories.
+
+ """
+ for item in self:
+ remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
+ return remote_blocks
+
@synchronized
def diff(self, end_collection, prefix=".", holding_collection=None):
"""Generate list of add/modify/delete actions.
"""
if changes:
- self._committed = False
+ self.set_committed(False)
for change in changes:
event_type = change[0]
path = change[1]
def portable_data_hash(self):
"""Get the portable data hash for this collection's manifest."""
- stripped = self.portable_manifest_text()
- return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+ if self._manifest_locator and self.committed():
+ # If the collection is already saved on the API server, and it's committed
+ # then return API server's PDH response.
+ return self._portable_data_hash
+ else:
+ stripped = self.portable_manifest_text().encode()
+ return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
@synchronized
def subscribe(self, callback):
@synchronized
def flush(self):
"""Flush bufferblocks to Keep."""
- for e in self.values():
+ for e in listvalues(self):
e.flush()
parent=None,
apiconfig=None,
block_manager=None,
- replication_desired=None):
+ replication_desired=None,
+ put_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
- One of Arvados collection UUID, block locator of
- a manifest, raw manifest text, or None (to create an empty collection).
+ An Arvados collection UUID, portable data hash, raw manifest
+ text, or (if creating an empty collection) None.
+
:parent:
the parent Collection, may be None.
self._keep_client = keep_client
self._block_manager = block_manager
self.replication_desired = replication_desired
+ self.put_threads = put_threads
if apiconfig:
self._config = apiconfig
self.num_retries = num_retries if num_retries is not None else 0
self._manifest_locator = None
self._manifest_text = None
+ self._portable_data_hash = None
self._api_response = None
self._past_versions = set()
self.events = None
if manifest_locator_or_text:
- if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+ if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
- elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+ elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
- elif re.match(util.manifest_pattern, manifest_locator_or_text):
+ if not self._has_local_collection_uuid():
+ self._has_remote_blocks = True
+ elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
self._manifest_text = manifest_locator_or_text
+ if '+R' in self._manifest_text:
+ self._has_remote_blocks = True
else:
raise errors.ArgumentError(
"Argument to CollectionReader is not a manifest or a collection UUID")
def root_collection(self):
return self
+ def get_properties(self):
+ if self._api_response and self._api_response["properties"]:
+ return self._api_response["properties"]
+ else:
+ return {}
+
+ def get_trash_at(self):
+ if self._api_response and self._api_response["trash_at"]:
+ try:
+ return ciso8601.parse_datetime(self._api_response["trash_at"])
+ except ValueError:
+ return None
+ else:
+ return None
+
def stream_name(self):
return "."
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies)
+ self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
return self._block_manager
def _remember_api_response(self, response):
# it. If instantiation fails, we'll fall back to the except
# clause, just like any other Collection lookup
# failure. Return an exception, or None if successful.
- try:
- self._remember_api_response(self._my_api().collections().get(
- uuid=self._manifest_locator).execute(
- num_retries=self.num_retries))
- self._manifest_text = self._api_response['manifest_text']
- # If not overriden via kwargs, we should try to load the
- # replication_desired from the API server
- if self.replication_desired is None:
- self.replication_desired = self._api_response.get('replication_desired', None)
- return None
- except Exception as e:
- return e
-
- def _populate_from_keep(self):
- # Retrieve a manifest directly from Keep. This has a chance of
- # working if [a] the locator includes a permission signature
- # or [b] the Keep services are operating in world-readable
- # mode. Return an exception, or None if successful.
- try:
- self._manifest_text = self._my_keep().get(
- self._manifest_locator, num_retries=self.num_retries)
- except Exception as e:
- return e
+ self._remember_api_response(self._my_api().collections().get(
+ uuid=self._manifest_locator).execute(
+ num_retries=self.num_retries))
+ self._manifest_text = self._api_response['manifest_text']
+ self._portable_data_hash = self._api_response['portable_data_hash']
+ # If not overriden via kwargs, we should try to load the
+ # replication_desired from the API server
+ if self.replication_desired is None:
+ self.replication_desired = self._api_response.get('replication_desired', None)
def _populate(self):
- if self._manifest_locator is None and self._manifest_text is None:
- return
- error_via_api = None
- error_via_keep = None
- should_try_keep = ((self._manifest_text is None) and
- util.keep_locator_pattern.match(
- self._manifest_locator))
- if ((self._manifest_text is None) and
- util.signed_locator_pattern.match(self._manifest_locator)):
- error_via_keep = self._populate_from_keep()
- if self._manifest_text is None:
- error_via_api = self._populate_from_api_server()
- if error_via_api is not None and not should_try_keep:
- raise error_via_api
- if ((self._manifest_text is None) and
- not error_via_keep and
- should_try_keep):
- # Looks like a keep locator, and we didn't already try keep above
- error_via_keep = self._populate_from_keep()
if self._manifest_text is None:
- # Nothing worked!
- raise errors.NotFoundError(
- ("Failed to retrieve collection '{}' " +
- "from either API server ({}) or Keep ({})."
- ).format(
- self._manifest_locator,
- error_via_api,
- error_via_keep))
- # populate
+ if self._manifest_locator is None:
+ return
+ else:
+ self._populate_from_api_server()
self._baseline_manifest = self._manifest_text
self._import_manifest(self._manifest_text)
-
def _has_collection_uuid(self):
- return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+ return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
+
+ def _has_local_collection_uuid(self):
+ return self._has_collection_uuid and \
+ self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
def __enter__(self):
return self
@must_be_writable
@synchronized
@retry_method
- def save(self, merge=True, num_retries=None):
+ def save(self,
+ properties=None,
+ storage_classes=None,
+ trash_at=None,
+ merge=True,
+ num_retries=None):
"""Save collection to an existing collection record.
Commit pending buffer blocks to Keep, merge with remote record (if
- merge=True, the default), and update the collection record. Returns
+ merge=True, the default), and update the collection record. Returns
the current manifest text.
Will raise AssertionError if not associated with a collection record on
the API server. If you want to save a manifest to Keep only, see
`save_new()`.
+ :properties:
+ Additional properties of collection. This value will replace any existing
+ properties of collection.
+
+ :storage_classes:
+ Specify desirable storage classes to be used when writing data to Keep.
+
+ :trash_at:
+ A collection is *expiring* when it has a *trash_at* time in the future.
+ An expiring collection can be accessed as normal,
+ but is scheduled to be trashed automatically at the *trash_at* time.
+
:merge:
Update and merge remote changes before saving. Otherwise, any
remote changes will be ignored and overwritten.
Retry count on API calls (if None, use the collection default)
"""
+ if properties and type(properties) is not dict:
+ raise errors.ArgumentError("properties must be dictionary type.")
+
+ if storage_classes and type(storage_classes) is not list:
+ raise errors.ArgumentError("storage_classes must be list type.")
+
+ if trash_at and type(trash_at) is not datetime.datetime:
+ raise errors.ArgumentError("trash_at must be datetime type.")
+
+ body={}
+ if properties:
+ body["properties"] = properties
+ if storage_classes:
+ body["storage_classes_desired"] = storage_classes
+ if trash_at:
+ t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ body["trash_at"] = t
+
if not self.committed():
+ if self._has_remote_blocks:
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+ self._has_remote_blocks = False
if not self._has_collection_uuid():
raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
+ elif not self._has_local_collection_uuid():
+ raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
self._my_block_manager().commit_all()
self.update()
text = self.manifest_text(strip=False)
+ body['manifest_text'] = text
+
self._remember_api_response(self._my_api().collections().update(
uuid=self._manifest_locator,
- body={'manifest_text': text}
- ).execute(
- num_retries=num_retries))
+ body=body
+ ).execute(num_retries=num_retries))
self._manifest_text = self._api_response["manifest_text"]
- self.set_committed()
+ self._portable_data_hash = self._api_response["portable_data_hash"]
+ self.set_committed(True)
+ elif body:
+ self._remember_api_response(self._my_api().collections().update(
+ uuid=self._manifest_locator,
+ body=body
+ ).execute(num_retries=num_retries))
return self._manifest_text
def save_new(self, name=None,
create_collection_record=True,
owner_uuid=None,
+ properties=None,
+ storage_classes=None,
+ trash_at=None,
ensure_unique_name=False,
num_retries=None):
"""Save collection to a new collection record.
Commit pending buffer blocks to Keep and, when create_collection_record
is True (default), create a new collection record. After creating a
new collection record, this Collection object will be associated with
- the new record used by `save()`. Returns the current manifest text.
+ the new record used by `save()`. Returns the current manifest text.
:name:
The collection name.
the user, or project uuid that will own this collection.
If None, defaults to the current user.
+ :properties:
+ Additional properties of collection. This value will replace any existing
+ properties of collection.
+
+ :storage_classes:
+ Specify desirable storage classes to be used when writing data to Keep.
+
+ :trash_at:
+ A collection is *expiring* when it has a *trash_at* time in the future.
+ An expiring collection can be accessed as normal,
+ but is scheduled to be trashed automatically at the *trash_at* time.
+
:ensure_unique_name:
If True, ask the API server to rename the collection
if it conflicts with a collection with the same name and owner. If
Retry count on API calls (if None, use the collection default)
"""
+ if properties and type(properties) is not dict:
+ raise errors.ArgumentError("properties must be dictionary type.")
+
+ if storage_classes and type(storage_classes) is not list:
+ raise errors.ArgumentError("storage_classes must be list type.")
+
+ if trash_at and type(trash_at) is not datetime.datetime:
+ raise errors.ArgumentError("trash_at must be datetime type.")
+
+ if self._has_remote_blocks:
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+ self._has_remote_blocks = False
+
self._my_block_manager().commit_all()
text = self.manifest_text(strip=False)
"replication_desired": self.replication_desired}
if owner_uuid:
body["owner_uuid"] = owner_uuid
+ if properties:
+ body["properties"] = properties
+ if storage_classes:
+ body["storage_classes_desired"] = storage_classes
+ if trash_at:
+ t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ body["trash_at"] = t
self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
text = self._api_response["manifest_text"]
self._manifest_locator = self._api_response["uuid"]
+ self._portable_data_hash = self._api_response["portable_data_hash"]
self._manifest_text = text
- self.set_committed()
+ self.set_committed(True)
return text
+ _token_re = re.compile(r'(\S+)(\s+|$)')
+ _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+ _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+
+ def _unescape_manifest_path(self, path):
+ return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
+
@synchronized
def _import_manifest(self, manifest_text):
"""Import a manifest into a `Collection`.
stream_name = None
state = STREAM_NAME
- for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+ for token_and_separator in self._token_re.finditer(manifest_text):
tok = token_and_separator.group(1)
sep = token_and_separator.group(2)
if state == STREAM_NAME:
# starting a new stream
- stream_name = tok.replace('\\040', ' ')
+ stream_name = self._unescape_manifest_path(tok)
blocks = []
segments = []
- streamoffset = 0L
+ streamoffset = 0
state = BLOCKS
self.find_or_create(stream_name, COLLECTION)
continue
if state == BLOCKS:
- block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+ block_locator = self._block_re.match(tok)
if block_locator:
- blocksize = long(block_locator.group(1))
+ blocksize = int(block_locator.group(1))
blocks.append(Range(tok, streamoffset, blocksize, 0))
streamoffset += blocksize
else:
state = SEGMENTS
if state == SEGMENTS:
- file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+ file_segment = self._segment_re.match(tok)
if file_segment:
- pos = long(file_segment.group(1))
- size = long(file_segment.group(2))
- name = file_segment.group(3).replace('\\040', ' ')
- filepath = os.path.join(stream_name, name)
- afile = self.find_or_create(filepath, FILE)
- if isinstance(afile, ArvadosFile):
- afile.add_segment(blocks, pos, size)
+ pos = int(file_segment.group(1))
+ size = int(file_segment.group(2))
+ name = self._unescape_manifest_path(file_segment.group(3))
+ if name.split('/')[-1] == '.':
+ # placeholder for persisting an empty directory, not a real file
+ if len(name) > 2:
+ self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
else:
- raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+ filepath = os.path.join(stream_name, name)
+ afile = self.find_or_create(filepath, FILE)
+ if isinstance(afile, ArvadosFile):
+ afile.add_segment(blocks, pos, size)
+ else:
+ raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
else:
# error!
raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
stream_name = None
state = STREAM_NAME
- self.set_committed()
+ self.set_committed(True)
@synchronized
def notify(self, event, collection, name, item):
@must_be_writable
@synchronized
def _reparent(self, newparent, newname):
- self._committed = False
+ self.set_committed(False)
self.flush()
self.parent.remove(self.name, recursive=True)
self.parent = newparent
self.name = newname
self.lock = self.parent.root_collection().lock
+ @synchronized
+ def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
+ """Encode empty directories by using an \056-named (".") empty file"""
+ if len(self._items) == 0:
+ return "%s %s 0:0:\\056\n" % (
+ escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
+ return super(Subcollection, self)._get_manifest_text(stream_name,
+ strip, normalize,
+ only_committed)
+
class CollectionReader(Collection):
"""A read-only collection object.
- Initialize from an api collection record locator, a portable data hash of a
- manifest, or raw manifest text. See `Collection` constructor for detailed
- options.
+ Initialize from a collection UUID or portable data hash, or raw
+ manifest text. See `Collection` constructor for detailed options.
"""
def __init__(self, manifest_locator_or_text, *args, **kwargs):