+from __future__ import absolute_import
+from builtins import str
+from past.builtins import basestring
+from builtins import object
import functools
import logging
import os
from stat import *
from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
-from keep import KeepLocator, KeepClient
+from .keep import KeepLocator, KeepClient
from .stream import StreamReader
from ._normalize_stream import normalize_stream
from ._ranges import Range, LocatorAndRange
from .safeapi import ThreadSafeApiCache
-import config
-import errors
-import util
-import events
+import arvados.config as config
+import arvados.errors as errors
+import arvados.util
+import arvados.events as events
from arvados.retry import retry_method
_logger = logging.getLogger('arvados.collection')
if fields:
clean_fields = fields[:1] + [
(re.sub(r'\+[^\d][^\+]*', '', x)
- if re.match(util.keep_locator_pattern, x)
+ if re.match(arvados.util.keep_locator_pattern, x)
else x)
for x in fields[1:]]
clean += [' '.join(clean_fields), "\n"]
def _work_trees(self):
path, stream_name, max_manifest_depth = self._queued_trees[0]
- d = util.listdir_recursive(
+ d = arvados.util.listdir_recursive(
path, max_depth = (None if max_manifest_depth == 0 else 0))
if d:
self._queue_dirents(stream_name, d)
return writer
def check_dependencies(self):
- for path, orig_stat in self._dependencies.items():
+ for path, orig_stat in list(self._dependencies.items()):
if not S_ISREG(orig_stat[ST_MODE]):
raise errors.StaleWriterStateError("{} not file".format(path))
try:
else:
item = ArvadosFile(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
- self._committed = False
+ self.set_committed(False)
self.notify(ADD, self, pathcomponents[0], item)
return item
else:
# create new collection
item = Subcollection(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
- self._committed = False
+ self.set_committed(False)
self.notify(ADD, self, pathcomponents[0], item)
if isinstance(item, RichCollectionBase):
return item.find_or_create(pathcomponents[1], create_type)
@synchronized
def committed(self):
"""Determine if the collection has been committed to the API server."""
-
- if self._committed is False:
- return False
- for v in self._items.values():
- if v.committed() is False:
- return False
- return True
+ return self._committed
@synchronized
- def set_committed(self):
- """Recursively set committed flag to True."""
- self._committed = True
- for k,v in self._items.items():
- v.set_committed()
+ def set_committed(self, value=True):
+ """Recursively set committed flag.
+
+ If value is True, set committed to be True for this and all children.
+
+ If value is False, set committed to be False for this and all parents.
+ """
+ if value == self._committed:
+ return
+ if value:
+ for k,v in list(self._items.items()):
+ v.set_committed(True)
+ self._committed = True
+ else:
+ self._committed = False
+ if self.parent is not None:
+ self.parent.set_committed(False)
@synchronized
def __iter__(self):
"""Iterate over names of files and collections contained in this collection."""
- return iter(self._items.keys())
+ return iter(list(self._items.keys()))
@synchronized
def __getitem__(self, k):
def __delitem__(self, p):
"""Delete an item by name which is directly contained by this collection."""
del self._items[p]
- self._committed = False
+ self.set_committed(False)
self.notify(DEL, self, p, None)
@synchronized
def keys(self):
"""Get a list of names of files and collections directly contained in this collection."""
- return self._items.keys()
+ return list(self._items.keys())
@synchronized
def values(self):
"""Get a list of files and collection objects directly contained in this collection."""
- return self._items.values()
+ return list(self._items.values())
@synchronized
def items(self):
"""Get a list of (name, object) tuples directly contained in this collection."""
- return self._items.items()
+ return list(self._items.items())
def exists(self, path):
"""Test if there is a file or collection at `path`."""
raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
deleteditem = self._items[pathcomponents[0]]
del self._items[pathcomponents[0]]
- self._committed = False
+ self.set_committed(False)
self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
item.remove(pathcomponents[1])
def _clonefrom(self, source):
- for k,v in source.items():
+ for k,v in list(source.items()):
self._items[k] = v.clone(self, k)
def clone(self):
item = source_obj.clone(self, target_name)
self._items[target_name] = item
- self._committed = False
+ self.set_committed(False)
if modified_from:
self.notify(MOD, self, target_name, (modified_from, item))
return self._get_manifest_text(stream_name, True, True)
@synchronized
- def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ def manifest_text(self, stream_name=".", strip=False, normalize=False,
+ only_committed=False):
"""Get the manifest text for this collection, sub collections and files.
This method will flush outstanding blocks to Keep. By default, it will
is not modified, return the original manifest text even if it is not
in normalized form.
+ :only_committed:
+ If True, don't commit pending blocks.
+
"""
- self._my_block_manager().commit_all()
- return self._get_manifest_text(stream_name, strip, normalize)
+ if not only_committed:
+ self._my_block_manager().commit_all()
+ return self._get_manifest_text(stream_name, strip, normalize,
+ only_committed=only_committed)
@synchronized
def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
if stream:
buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
- buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
+ buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
return "".join(buf)
else:
if strip:
"""
if changes:
- self._committed = False
+ self.set_committed(False)
for change in changes:
event_type = change[0]
path = change[1]
def portable_data_hash(self):
"""Get the portable data hash for this collection's manifest."""
- stripped = self.portable_manifest_text()
- return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+ if self._manifest_locator and self.committed():
+ # If the collection is already saved on the API server, and it's committed
+ # then return API server's PDH response.
+ return self._portable_data_hash
+ else:
+ stripped = self.portable_manifest_text()
+ return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
@synchronized
def subscribe(self, callback):
@synchronized
def flush(self):
"""Flush bufferblocks to Keep."""
- for e in self.values():
+ for e in list(self.values()):
e.flush()
parent=None,
apiconfig=None,
block_manager=None,
- replication_desired=None):
+ replication_desired=None,
+ put_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
self._keep_client = keep_client
self._block_manager = block_manager
self.replication_desired = replication_desired
+ self.put_threads = put_threads
if apiconfig:
self._config = apiconfig
self.num_retries = num_retries if num_retries is not None else 0
self._manifest_locator = None
self._manifest_text = None
+ self._portable_data_hash = None
self._api_response = None
self._past_versions = set()
self.events = None
if manifest_locator_or_text:
- if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+ if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
- elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+ elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
- elif re.match(util.manifest_pattern, manifest_locator_or_text):
+ elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
self._manifest_text = manifest_locator_or_text
else:
raise errors.ArgumentError(
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies)
+ self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
return self._block_manager
def _remember_api_response(self, response):
uuid=self._manifest_locator).execute(
num_retries=self.num_retries))
self._manifest_text = self._api_response['manifest_text']
+ self._portable_data_hash = self._api_response['portable_data_hash']
# If not overriden via kwargs, we should try to load the
# replication_desired from the API server
if self.replication_desired is None:
error_via_api = None
error_via_keep = None
should_try_keep = ((self._manifest_text is None) and
- util.keep_locator_pattern.match(
+ arvados.util.keep_locator_pattern.match(
self._manifest_locator))
if ((self._manifest_text is None) and
- util.signed_locator_pattern.match(self._manifest_locator)):
+ arvados.util.signed_locator_pattern.match(self._manifest_locator)):
error_via_keep = self._populate_from_keep()
if self._manifest_text is None:
error_via_api = self._populate_from_api_server()
def _has_collection_uuid(self):
- return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+ return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
def __enter__(self):
return self
).execute(
num_retries=num_retries))
self._manifest_text = self._api_response["manifest_text"]
- self.set_committed()
+ self._portable_data_hash = self._api_response["portable_data_hash"]
+ self.set_committed(True)
return self._manifest_text
text = self._api_response["manifest_text"]
self._manifest_locator = self._api_response["uuid"]
+ self._portable_data_hash = self._api_response["portable_data_hash"]
self._manifest_text = text
- self.set_committed()
+ self.set_committed(True)
return text
stream_name = tok.replace('\\040', ' ')
blocks = []
segments = []
- streamoffset = 0L
+ streamoffset = 0
state = BLOCKS
self.find_or_create(stream_name, COLLECTION)
continue
if state == BLOCKS:
block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
if block_locator:
- blocksize = long(block_locator.group(1))
+ blocksize = int(block_locator.group(1))
blocks.append(Range(tok, streamoffset, blocksize, 0))
streamoffset += blocksize
else:
if state == SEGMENTS:
file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
if file_segment:
- pos = long(file_segment.group(1))
- size = long(file_segment.group(2))
+ pos = int(file_segment.group(1))
+ size = int(file_segment.group(2))
name = file_segment.group(3).replace('\\040', ' ')
filepath = os.path.join(stream_name, name)
afile = self.find_or_create(filepath, FILE)
stream_name = None
state = STREAM_NAME
- self.set_committed()
+ self.set_committed(True)
@synchronized
def notify(self, event, collection, name, item):
@must_be_writable
@synchronized
def _reparent(self, newparent, newname):
- self._committed = False
+ self.set_committed(False)
self.flush()
self.parent.remove(self.name, recursive=True)
self.parent = newparent