+from __future__ import absolute_import
+from future.utils import listitems, listvalues, viewkeys
+from builtins import str
+from past.builtins import basestring
+from builtins import object
import functools
import logging
import os
from stat import *
from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
-from keep import KeepLocator, KeepClient
+from .keep import KeepLocator, KeepClient
from .stream import StreamReader
from ._normalize_stream import normalize_stream
from ._ranges import Range, LocatorAndRange
from .safeapi import ThreadSafeApiCache
-import config
-import errors
-import util
-import events
+import arvados.config as config
+import arvados.errors as errors
+import arvados.util
+import arvados.events as events
from arvados.retry import retry_method
_logger = logging.getLogger('arvados.collection')
if fields:
clean_fields = fields[:1] + [
(re.sub(r'\+[^\d][^\+]*', '', x)
- if re.match(util.keep_locator_pattern, x)
+ if re.match(arvados.util.keep_locator_pattern, x)
else x)
for x in fields[1:]]
clean += [' '.join(clean_fields), "\n"]
def _work_trees(self):
path, stream_name, max_manifest_depth = self._queued_trees[0]
- d = util.listdir_recursive(
+ d = arvados.util.listdir_recursive(
path, max_depth = (None if max_manifest_depth == 0 else 0))
if d:
self._queue_dirents(stream_name, d)
self.do_queued_work()
def write(self, newdata):
- if hasattr(newdata, '__iter__'):
+ if isinstance(newdata, bytes):
+ pass
+ elif isinstance(newdata, str):
+ newdata = newdata.encode()
+ elif hasattr(newdata, '__iter__'):
for s in newdata:
self.write(s)
return
return self._last_open
def flush_data(self):
- data_buffer = ''.join(self._data_buffer)
+ data_buffer = b''.join(self._data_buffer)
if data_buffer:
self._current_stream_locators.append(
self._my_keep().put(
sending manifest_text() to the API server's "create
collection" endpoint.
"""
- return self._my_keep().put(self.manifest_text(), copies=self.replication)
+ return self._my_keep().put(self.manifest_text().encode(),
+ copies=self.replication)
def portable_data_hash(self):
- stripped = self.stripped_manifest()
+ stripped = self.stripped_manifest().encode()
return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
def manifest_text(self):
return writer
def check_dependencies(self):
- for path, orig_stat in self._dependencies.items():
+ for path, orig_stat in listitems(self._dependencies):
if not S_ISREG(orig_stat[ST_MODE]):
raise errors.StaleWriterStateError("{} not file".format(path))
try:
else:
item = ArvadosFile(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
- self._committed = False
+ self.set_committed(False)
self.notify(ADD, self, pathcomponents[0], item)
return item
else:
# create new collection
item = Subcollection(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
- self._committed = False
+ self.set_committed(False)
self.notify(ADD, self, pathcomponents[0], item)
if isinstance(item, RichCollectionBase):
return item.find_or_create(pathcomponents[1], create_type)
def find(self, path):
"""Recursively search the specified file path.
- May return either a Collection or ArvadosFile. Return None if not
+ May return either a Collection or ArvadosFile. Return None if not
found.
+ If path is invalid (ex: starts with '/'), an IOError exception will be
+ raised.
"""
if not path:
raise errors.ArgumentError("Parameter 'path' is empty.")
pathcomponents = path.split("/", 1)
+ if pathcomponents[0] == '':
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
item = self._items.get(pathcomponents[0])
- if len(pathcomponents) == 1:
+ if item is None:
+ return None
+ elif len(pathcomponents) == 1:
return item
else:
if isinstance(item, RichCollectionBase):
:path:
path to a file in the collection
:mode:
- one of "r", "r+", "w", "w+", "a", "a+"
+ a string consisting of "r", "w", or "a", optionally followed
+ by "b" or "t", optionally followed by "+".
+ :"b":
+ binary mode: write() accepts bytes, read() returns bytes.
+ :"t":
+ text mode (default): write() accepts strings, read() returns strings.
:"r":
opens for reading
:"r+":
the end of the file. Writing does not affect the file pointer for
reading.
"""
- mode = mode.replace("b", "")
- if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
- raise errors.ArgumentError("Bad mode '%s'" % mode)
- create = (mode != "r")
- if create and not self.writable():
- raise IOError(errno.EROFS, "Collection is read only")
+ if not re.search(r'^[rwa][bt]?\+?$', mode):
+ raise errors.ArgumentError("Invalid mode {!r}".format(mode))
- if create:
- arvfile = self.find_or_create(path, FILE)
- else:
+ if mode[0] == 'r' and '+' not in mode:
+ fclass = ArvadosFileReader
arvfile = self.find(path)
+ elif not self.writable():
+ raise IOError(errno.EROFS, "Collection is read only")
+ else:
+ fclass = ArvadosFileWriter
+ arvfile = self.find_or_create(path, FILE)
if arvfile is None:
raise IOError(errno.ENOENT, "File not found", path)
if not isinstance(arvfile, ArvadosFile):
raise IOError(errno.EISDIR, "Is a directory", path)
- if mode[0] == "w":
+ if mode[0] == 'w':
arvfile.truncate(0)
- name = os.path.basename(path)
-
- if mode == "r":
- return ArvadosFileReader(arvfile, num_retries=self.num_retries)
- else:
- return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
+ return fclass(arvfile, mode=mode, num_retries=self.num_retries)
def modified(self):
"""Determine if the collection has been modified since last commited."""
@synchronized
def committed(self):
"""Determine if the collection has been committed to the API server."""
-
- if self._committed is False:
- return False
- for v in self._items.values():
- if v.committed() is False:
- return False
- return True
+ return self._committed
@synchronized
- def set_committed(self):
- """Recursively set committed flag to True."""
- self._committed = True
- for k,v in self._items.items():
- v.set_committed()
+ def set_committed(self, value=True):
+ """Recursively set committed flag.
+
+ If value is True, set committed to be True for this and all children.
+
+ If value is False, set committed to be False for this and all parents.
+ """
+ if value == self._committed:
+ return
+ if value:
+ for k,v in listitems(self._items):
+ v.set_committed(True)
+ self._committed = True
+ else:
+ self._committed = False
+ if self.parent is not None:
+ self.parent.set_committed(False)
@synchronized
def __iter__(self):
"""Iterate over names of files and collections contained in this collection."""
- return iter(self._items.keys())
+ return iter(viewkeys(self._items))
@synchronized
def __getitem__(self, k):
def __delitem__(self, p):
"""Delete an item by name which is directly contained by this collection."""
del self._items[p]
- self._committed = False
+ self.set_committed(False)
self.notify(DEL, self, p, None)
@synchronized
def keys(self):
"""Get a list of names of files and collections directly contained in this collection."""
- return self._items.keys()
+ return viewkeys(self._items)
@synchronized
def values(self):
"""Get a list of files and collection objects directly contained in this collection."""
- return self._items.values()
+ return listvalues(self._items)
@synchronized
def items(self):
"""Get a list of (name, object) tuples directly contained in this collection."""
- return self._items.items()
+ return listitems(self._items)
def exists(self, path):
"""Test if there is a file or collection at `path`."""
raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
deleteditem = self._items[pathcomponents[0]]
del self._items[pathcomponents[0]]
- self._committed = False
+ self.set_committed(False)
self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
item.remove(pathcomponents[1])
def _clonefrom(self, source):
- for k,v in source.items():
+ for k,v in listitems(source):
self._items[k] = v.clone(self, k)
def clone(self):
item = source_obj.clone(self, target_name)
self._items[target_name] = item
- self._committed = False
+ self.set_committed(False)
if modified_from:
self.notify(MOD, self, target_name, (modified_from, item))
if target_dir is None:
raise IOError(errno.ENOENT, "Target directory not found", target_name)
- if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+ if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
target_dir = target_dir[target_name]
target_name = sourcecomponents[-1]
return self._get_manifest_text(stream_name, True, True)
@synchronized
- def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ def manifest_text(self, stream_name=".", strip=False, normalize=False,
+ only_committed=False):
"""Get the manifest text for this collection, sub collections and files.
This method will flush outstanding blocks to Keep. By default, it will
is not modified, return the original manifest text even if it is not
in normalized form.
+ :only_committed:
+ If True, don't commit pending blocks.
+
"""
- self._my_block_manager().commit_all()
- return self._get_manifest_text(stream_name, strip, normalize)
+ if not only_committed:
+ self._my_block_manager().commit_all()
+ return self._get_manifest_text(stream_name, strip, normalize,
+ only_committed=only_committed)
@synchronized
def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
if stream:
buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
- buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
+ buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
return "".join(buf)
else:
if strip:
"""
if changes:
- self._committed = False
+ self.set_committed(False)
for change in changes:
event_type = change[0]
path = change[1]
def portable_data_hash(self):
"""Get the portable data hash for this collection's manifest."""
- stripped = self.portable_manifest_text()
- return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+ if self._manifest_locator and self.committed():
+ # If the collection is already saved on the API server, and it's committed
+ # then return API server's PDH response.
+ return self._portable_data_hash
+ else:
+ stripped = self.portable_manifest_text().encode()
+ return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
@synchronized
def subscribe(self, callback):
@synchronized
def flush(self):
"""Flush bufferblocks to Keep."""
- for e in self.values():
+ for e in listvalues(self):
e.flush()
parent=None,
apiconfig=None,
block_manager=None,
- replication_desired=None):
+ replication_desired=None,
+ put_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
self._keep_client = keep_client
self._block_manager = block_manager
self.replication_desired = replication_desired
+ self.put_threads = put_threads
if apiconfig:
self._config = apiconfig
self.num_retries = num_retries if num_retries is not None else 0
self._manifest_locator = None
self._manifest_text = None
+ self._portable_data_hash = None
self._api_response = None
self._past_versions = set()
self.events = None
if manifest_locator_or_text:
- if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+ if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
- elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+ elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
- elif re.match(util.manifest_pattern, manifest_locator_or_text):
+ elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
self._manifest_text = manifest_locator_or_text
else:
raise errors.ArgumentError(
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies)
+ self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
return self._block_manager
def _remember_api_response(self, response):
uuid=self._manifest_locator).execute(
num_retries=self.num_retries))
self._manifest_text = self._api_response['manifest_text']
+ self._portable_data_hash = self._api_response['portable_data_hash']
# If not overriden via kwargs, we should try to load the
# replication_desired from the API server
if self.replication_desired is None:
# mode. Return an exception, or None if successful.
try:
self._manifest_text = self._my_keep().get(
- self._manifest_locator, num_retries=self.num_retries)
+ self._manifest_locator, num_retries=self.num_retries).decode()
except Exception as e:
return e
error_via_api = None
error_via_keep = None
should_try_keep = ((self._manifest_text is None) and
- util.keep_locator_pattern.match(
+ arvados.util.keep_locator_pattern.match(
self._manifest_locator))
if ((self._manifest_text is None) and
- util.signed_locator_pattern.match(self._manifest_locator)):
+ arvados.util.signed_locator_pattern.match(self._manifest_locator)):
error_via_keep = self._populate_from_keep()
if self._manifest_text is None:
error_via_api = self._populate_from_api_server()
def _has_collection_uuid(self):
- return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+ return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
def __enter__(self):
return self
).execute(
num_retries=num_retries))
self._manifest_text = self._api_response["manifest_text"]
- self.set_committed()
+ self._portable_data_hash = self._api_response["portable_data_hash"]
+ self.set_committed(True)
return self._manifest_text
text = self._api_response["manifest_text"]
self._manifest_locator = self._api_response["uuid"]
+ self._portable_data_hash = self._api_response["portable_data_hash"]
self._manifest_text = text
- self.set_committed()
+ self.set_committed(True)
return text
stream_name = tok.replace('\\040', ' ')
blocks = []
segments = []
- streamoffset = 0L
+ streamoffset = 0
state = BLOCKS
self.find_or_create(stream_name, COLLECTION)
continue
if state == BLOCKS:
block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
if block_locator:
- blocksize = long(block_locator.group(1))
+ blocksize = int(block_locator.group(1))
blocks.append(Range(tok, streamoffset, blocksize, 0))
streamoffset += blocksize
else:
if state == SEGMENTS:
file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
if file_segment:
- pos = long(file_segment.group(1))
- size = long(file_segment.group(2))
+ pos = int(file_segment.group(1))
+ size = int(file_segment.group(2))
name = file_segment.group(3).replace('\\040', ' ')
filepath = os.path.join(stream_name, name)
afile = self.find_or_create(filepath, FILE)
stream_name = None
state = STREAM_NAME
- self.set_committed()
+ self.set_committed(True)
@synchronized
def notify(self, event, collection, name, item):
@must_be_writable
@synchronized
def _reparent(self, newparent, newname):
- self._committed = False
+ self.set_committed(False)
self.flush()
self.parent.remove(self.name, recursive=True)
self.parent = newparent