from builtins import str
from past.builtins import basestring
from builtins import object
+import ciso8601
+import datetime
+import errno
import functools
+import hashlib
+import io
import logging
import os
import re
-import errno
-import hashlib
-import datetime
-import ciso8601
-import time
+import sys
import threading
+import time
from collections import deque
from stat import *
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
from .keep import KeepLocator, KeepClient
from .stream import StreamReader
from ._normalize_stream import normalize_stream
_logger = logging.getLogger('arvados.collection')
+
+if sys.version_info >= (3, 0):
+ TextIOWrapper = io.TextIOWrapper
+else:
+ class TextIOWrapper(io.TextIOWrapper):
+ """To maintain backward compatibility, cast str to unicode in
+ write('foo').
+
+ """
+ def write(self, data):
+ if isinstance(data, basestring):
+ data = unicode(data)
+ return super(TextIOWrapper, self).write(data)
+
+
class CollectionBase(object):
"""Abstract base class for Collection classes."""
return self.find_or_create(path, COLLECTION)
- def open(self, path, mode="r"):
+ def open(self, path, mode="r", encoding=None):
"""Open a file-like object for access.
:path:
opens for reading and writing. All writes are appended to
the end of the file. Writing does not affect the file pointer for
reading.
+
"""
if not re.search(r'^[rwa][bt]?\+?$', mode):
if mode[0] == 'w':
arvfile.truncate(0)
- return fclass(arvfile, mode=mode, num_retries=self.num_retries)
+ binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+ f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+ if 'b' not in mode:
+ bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+ f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+ return f
def modified(self):
"""Determine if the collection has been modified since last commited."""
_block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
_segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+ def _unescape_manifest_path(self, path):
+ return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
+
@synchronized
def _import_manifest(self, manifest_text):
"""Import a manifest into a `Collection`.
if state == STREAM_NAME:
# starting a new stream
- stream_name = tok.replace('\\040', ' ')
+ stream_name = self._unescape_manifest_path(tok)
blocks = []
segments = []
streamoffset = 0
if file_segment:
pos = int(file_segment.group(1))
size = int(file_segment.group(2))
- name = file_segment.group(3).replace('\\040', ' ')
- filepath = os.path.join(stream_name, name)
- afile = self.find_or_create(filepath, FILE)
- if isinstance(afile, ArvadosFile):
- afile.add_segment(blocks, pos, size)
+ name = self._unescape_manifest_path(file_segment.group(3))
+ if name.split('/')[-1] == '.':
+ # placeholder for persisting an empty directory, not a real file
+ if len(name) > 2:
+ self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
else:
- raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+ filepath = os.path.join(stream_name, name)
+ afile = self.find_or_create(filepath, FILE)
+ if isinstance(afile, ArvadosFile):
+ afile.add_segment(blocks, pos, size)
+ else:
+ raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
else:
# error!
raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)