+from __future__ import absolute_import
+from builtins import str
+from past.builtins import basestring
+from builtins import object
import functools
import logging
import os
from stat import *
from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
-from keep import KeepLocator, KeepClient
+from .keep import KeepLocator, KeepClient
from .stream import StreamReader
from ._normalize_stream import normalize_stream
from ._ranges import Range, LocatorAndRange
from .safeapi import ThreadSafeApiCache
-import config
-import errors
-import util
-import events
+import arvados.config as config
+import arvados.errors as errors
+import arvados.util
+import arvados.events as events
from arvados.retry import retry_method
_logger = logging.getLogger('arvados.collection')
if fields:
clean_fields = fields[:1] + [
(re.sub(r'\+[^\d][^\+]*', '', x)
- if re.match(util.keep_locator_pattern, x)
+ if re.match(arvados.util.keep_locator_pattern, x)
else x)
for x in fields[1:]]
clean += [' '.join(clean_fields), "\n"]
def _work_trees(self):
path, stream_name, max_manifest_depth = self._queued_trees[0]
- d = util.listdir_recursive(
+ d = arvados.util.listdir_recursive(
path, max_depth = (None if max_manifest_depth == 0 else 0))
if d:
self._queue_dirents(stream_name, d)
return writer
def check_dependencies(self):
- for path, orig_stat in self._dependencies.items():
+ for path, orig_stat in list(self._dependencies.items()):
if not S_ISREG(orig_stat[ST_MODE]):
raise errors.StaleWriterStateError("{} not file".format(path))
try:
if value == self._committed:
return
if value:
- for k,v in self._items.items():
+ for k,v in list(self._items.items()):
v.set_committed(True)
self._committed = True
else:
@synchronized
def __iter__(self):
"""Iterate over names of files and collections contained in this collection."""
- return iter(self._items.keys())
+ return iter(list(self._items.keys()))
@synchronized
def __getitem__(self, k):
@synchronized
def keys(self):
"""Get a list of names of files and collections directly contained in this collection."""
- return self._items.keys()
+ return list(self._items.keys())
@synchronized
def values(self):
"""Get a list of files and collection objects directly contained in this collection."""
- return self._items.values()
+ return list(self._items.values())
@synchronized
def items(self):
"""Get a list of (name, object) tuples directly contained in this collection."""
- return self._items.items()
+ return list(self._items.items())
def exists(self, path):
"""Test if there is a file or collection at `path`."""
item.remove(pathcomponents[1])
def _clonefrom(self, source):
- for k,v in source.items():
+ for k,v in list(source.items()):
self._items[k] = v.clone(self, k)
def clone(self):
@synchronized
def flush(self):
"""Flush bufferblocks to Keep."""
- for e in self.values():
+ for e in list(self.values()):
e.flush()
self.events = None
if manifest_locator_or_text:
- if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+ if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
- elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+ elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
- elif re.match(util.manifest_pattern, manifest_locator_or_text):
+ elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
self._manifest_text = manifest_locator_or_text
else:
raise errors.ArgumentError(
uuid=self._manifest_locator).execute(
num_retries=self.num_retries))
self._manifest_text = self._api_response['manifest_text']
+ self._portable_data_hash = self._api_response['portable_data_hash']
# If not overriden via kwargs, we should try to load the
# replication_desired from the API server
if self.replication_desired is None:
error_via_api = None
error_via_keep = None
should_try_keep = ((self._manifest_text is None) and
- util.keep_locator_pattern.match(
+ arvados.util.keep_locator_pattern.match(
self._manifest_locator))
if ((self._manifest_text is None) and
- util.signed_locator_pattern.match(self._manifest_locator)):
+ arvados.util.signed_locator_pattern.match(self._manifest_locator)):
error_via_keep = self._populate_from_keep()
if self._manifest_text is None:
error_via_api = self._populate_from_api_server()
def _has_collection_uuid(self):
- return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+ return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
def __enter__(self):
return self
stream_name = tok.replace('\\040', ' ')
blocks = []
segments = []
- streamoffset = 0L
+ streamoffset = 0
state = BLOCKS
self.find_or_create(stream_name, COLLECTION)
continue
if state == BLOCKS:
block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
if block_locator:
- blocksize = long(block_locator.group(1))
+ blocksize = int(block_locator.group(1))
blocks.append(Range(tok, streamoffset, blocksize, 0))
streamoffset += blocksize
else:
if state == SEGMENTS:
file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
if file_segment:
- pos = long(file_segment.group(1))
- size = long(file_segment.group(2))
+ pos = int(file_segment.group(1))
+ size = int(file_segment.group(2))
name = file_segment.group(3).replace('\\040', ' ')
filepath = os.path.join(stream_name, name)
afile = self.find_or_create(filepath, FILE)