from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
from .keep import KeepLocator, KeepClient
from .stream import StreamReader
-from ._normalize_stream import normalize_stream
+from ._normalize_stream import normalize_stream, escape
from ._ranges import Range, LocatorAndRange
from .safeapi import ThreadSafeApiCache
import arvados.config as config
streampath, filename = split(streampath)
if self._last_open and not self._last_open.closed:
raise errors.AssertionError(
- "can't open '{}' when '{}' is still open".format(
+ u"can't open '{}' when '{}' is still open".format(
filename, self._last_open.name))
if streampath != self.current_stream_name():
self.start_new_stream(streampath)
writer._queued_file.seek(pos)
except IOError as error:
raise errors.StaleWriterStateError(
- "failed to reopen active file {}: {}".format(path, error))
+ u"failed to reopen active file {}: {}".format(path, error))
return writer
def check_dependencies(self):
for path, orig_stat in listitems(self._dependencies):
if not S_ISREG(orig_stat[ST_MODE]):
- raise errors.StaleWriterStateError("{} not file".format(path))
+ raise errors.StaleWriterStateError(u"{} not file".format(path))
try:
now_stat = tuple(os.stat(path))
except OSError as error:
raise errors.StaleWriterStateError(
- "failed to stat {}: {}".format(path, error))
+ u"failed to stat {}: {}".format(path, error))
if ((not S_ISREG(now_stat[ST_MODE])) or
(orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
(orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
- raise errors.StaleWriterStateError("{} changed".format(path))
+ raise errors.StaleWriterStateError(u"{} changed".format(path))
def dump_state(self, copy_func=lambda x: x):
state = {attr: copy_func(getattr(self, attr))
try:
src_path = os.path.realpath(source)
except Exception:
- raise errors.AssertionError("{} not a file path".format(source))
+ raise errors.AssertionError(u"{} not a file path".format(source))
try:
path_stat = os.stat(src_path)
except OSError as stat_error:
self._dependencies[source] = tuple(fd_stat)
elif path_stat is None:
raise errors.AssertionError(
- "could not stat {}: {}".format(source, stat_error))
+ u"could not stat {}: {}".format(source, stat_error))
elif path_stat.st_ino != fd_stat.st_ino:
raise errors.AssertionError(
- "{} changed between open and stat calls".format(source))
+ u"{} changed between open and stat calls".format(source))
else:
self._dependencies[src_path] = tuple(fd_stat)
def stream_name(self):
raise NotImplementedError()
+
@synchronized
def has_remote_blocks(self):
"""Recursively check for a +R segment locator signature."""
if stream:
buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
- buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
+ buf.append(self[dirname].manifest_text(
+ stream_name=os.path.join(stream_name, dirname),
+ strip=strip, normalize=True, only_committed=only_committed))
return "".join(buf)
else:
if strip:
def get_trash_at(self):
if self._api_response and self._api_response["trash_at"]:
- return ciso8601.parse_datetime(self._api_response["trash_at"])
+ try:
+ return ciso8601.parse_datetime(self._api_response["trash_at"])
+ except ValueError:
+ return None
else:
return None
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
+ self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries)
return self._block_manager
def _remember_api_response(self, response):
self.name = newname
self.lock = self.parent.root_collection().lock
+ @synchronized
+ def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
+ """Encode empty directories by using an \056-named (".") empty file"""
+ if len(self._items) == 0:
+ return "%s %s 0:0:\\056\n" % (
+ escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
+ return super(Subcollection, self)._get_manifest_text(stream_name,
+ strip, normalize,
+ only_committed)
+
class CollectionReader(Collection):
"""A read-only collection object.