class ArvadosFileReaderBase(_FileLikeObjectBase):
def __init__(self, name, mode, num_retries=None):
super(ArvadosFileReaderBase, self).__init__(name, mode)
- self._binary = 'b' in mode
- if sys.version_info >= (3, 0) and not self._binary:
- raise NotImplementedError("text mode {!r} is not implemented".format(mode))
self._filepos = 0
self.num_retries = num_retries
self._readline_cache = (None, None)
def stream_name(self):
return self.arvadosfile.parent.stream_name()
+ def readinto(self, b):
+ data = self.read(len(b))
+ b[:len(data)] = data
+ return len(data)
+
@_FileLikeObjectBase._before_close
@retry_method
def read(self, size=None, num_retries=None):
if not self.closed:
self.arvadosfile.remove_writer(self, flush)
super(ArvadosFileWriter, self).close()
+
+
+class WrappableFile(object):
+ """An interface to an Arvados file that's compatible with io wrappers.
+
+ """
+ def __init__(self, f):
+ self.f = f
+ self.closed = False
+ def close(self):
+ self.closed = True
+ return self.f.close()
+ def flush(self):
+ return self.f.flush()
+ def read(self, *args, **kwargs):
+ return self.f.read(*args, **kwargs)
+ def readable(self):
+ return self.f.readable()
+ def readinto(self, *args, **kwargs):
+ return self.f.readinto(*args, **kwargs)
+ def seek(self, *args, **kwargs):
+ return self.f.seek(*args, **kwargs)
+ def seekable(self):
+ return self.f.seekable()
+ def tell(self):
+ return self.f.tell()
+ def writable(self):
+ return self.f.writable()
+ def write(self, *args, **kwargs):
+ return self.f.write(*args, **kwargs)
from builtins import str
from past.builtins import basestring
from builtins import object
+import ciso8601
+import datetime
+import errno
import functools
+import hashlib
+import io
import logging
import os
import re
-import errno
-import hashlib
-import datetime
-import ciso8601
-import time
+import sys
import threading
+import time
from collections import deque
from stat import *
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
from .keep import KeepLocator, KeepClient
from .stream import StreamReader
from ._normalize_stream import normalize_stream
_logger = logging.getLogger('arvados.collection')
+
+if sys.version_info >= (3, 0):
+ TextIOWrapper = io.TextIOWrapper
+else:
+ class TextIOWrapper(io.TextIOWrapper):
+ """To maintain backward compatibility, cast str to unicode in
+ write('foo').
+
+ """
+ def write(self, data):
+ if isinstance(data, basestring):
+ data = unicode(data)
+ return super(TextIOWrapper, self).write(data)
+
+
class CollectionBase(object):
"""Abstract base class for Collection classes."""
return self.find_or_create(path, COLLECTION)
- def open(self, path, mode="r"):
+ def open(self, path, mode="r", encoding=None):
"""Open a file-like object for access.
:path:
opens for reading and writing. All writes are appended to
the end of the file. Writing does not affect the file pointer for
reading.
+
"""
if not re.search(r'^[rwa][bt]?\+?$', mode):
if mode[0] == 'w':
arvfile.truncate(0)
- return fclass(arvfile, mode=mode, num_retries=self.num_retries)
+ binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+ f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+ if 'b' not in mode:
+ bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+ f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+ return f
def modified(self):
"""Determine if the collection has been modified since last commited."""
return prefix+fn
def write_file(collection, pathprefix, fn, flush=False):
- with open(os.path.join(pathprefix, fn)) as src:
- dst = collection.open(fn, "w")
+ with open(os.path.join(pathprefix, fn), "rb") as src:
+ dst = collection.open(fn, "wb")
r = src.read(1024*128)
while r:
dst.write(r)
keep = ArvadosFileWriterTestCase.MockKeep({
"781e5e245d69b566979b86e28d23f2c7+10": b"0123456789",
})
- c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', keep_client=keep)
- writer = c.open("count.txt", "ab+")
- self.assertEqual(writer.read(20), b"0123456789")
-
- writer.seek(0, os.SEEK_SET)
- writer.write("hello")
- self.assertEqual(writer.read(), b"")
- writer.seek(-5, os.SEEK_CUR)
- self.assertEqual(writer.read(3), b"hel")
- self.assertEqual(writer.read(), b"lo")
- writer.seek(0, os.SEEK_SET)
- self.assertEqual(writer.read(), b"0123456789hello")
-
- writer.seek(0)
- writer.write("world")
- self.assertEqual(writer.read(), b"")
- writer.seek(0)
- self.assertEqual(writer.read(), b"0123456789helloworld")
-
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", c.portable_manifest_text())
+ for (mode, convert) in (
+ ('a+', lambda data: data.decode(encoding='utf-8')),
+ ('at+', lambda data: data.decode(encoding='utf-8')),
+ ('ab+', lambda data: data)):
+ c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', keep_client=keep)
+ writer = c.open("count.txt", mode)
+ self.assertEqual(writer.read(20), convert(b"0123456789"))
+
+ writer.seek(0, os.SEEK_SET)
+ writer.write(convert(b"hello"))
+ self.assertEqual(writer.read(), convert(b""))
+ if 'b' in mode:
+ writer.seek(-5, os.SEEK_CUR)
+ self.assertEqual(writer.read(3), convert(b"hel"))
+ self.assertEqual(writer.read(), convert(b"lo"))
+ else:
+ with self.assertRaises(IOError):
+ writer.seek(-5, os.SEEK_CUR)
+ with self.assertRaises(IOError):
+ writer.seek(-3, os.SEEK_END)
+ writer.seek(0, os.SEEK_SET)
+ writer.read(7)
+ self.assertEqual(7, writer.tell())
+ self.assertEqual(7, writer.seek(7, os.SEEK_SET))
+
+ writer.seek(0, os.SEEK_SET)
+ self.assertEqual(writer.read(), convert(b"0123456789hello"))
+
+ writer.seek(0)
+ writer.write(convert(b"world"))
+ self.assertEqual(writer.read(), convert(b""))
+ writer.seek(0)
+ self.assertEqual(writer.read(), convert(b"0123456789helloworld"))
+
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", c.portable_manifest_text())
def test_write_at_beginning(self):
keep = ArvadosFileWriterTestCase.MockKeep({
with c.open('foo', 'wb') as f:
f.write('foo')
for mode in ['r', 'rt', 'r+', 'rt+', 'w', 'wt', 'a', 'at']:
- if sys.version_info >= (3, 0):
- with self.assertRaises(NotImplementedError):
- c.open('foo', mode)
- else:
- with c.open('foo', mode) as f:
- if mode[0] == 'r' and '+' not in mode:
- self.assertEqual('foo', f.read(3))
- else:
- f.write('bar')
- f.seek(-3, os.SEEK_CUR)
- self.assertEqual('bar', f.read(3))
+ with c.open('foo', mode) as f:
+ if mode[0] == 'r' and '+' not in mode:
+ self.assertEqual('foo', f.read(3))
+ else:
+ f.write('bar')
+ f.seek(0, os.SEEK_SET)
+ self.assertEqual('bar', f.read(3))
+
+
+class TextModes(run_test_server.TestCaseWithServers):
+
+ def setUp(self):
+ arvados.config.KEEP_BLOCK_SIZE = 4
+ if sys.version_info < (3, 0):
+ import unicodedata
+ self.sailboat = unicodedata.lookup('SAILBOAT')
+ self.snowman = unicodedata.lookup('SNOWMAN')
+ else:
+ self.sailboat = '\N{SAILBOAT}'
+ self.snowman = '\N{SNOWMAN}'
+
+ def tearDown(self):
+ arvados.config.KEEP_BLOCK_SIZE = 2 ** 26
+
+ def test_read_sailboat_across_block_boundary(self):
+ c = Collection()
+ f = c.open('sailboats', 'wb')
+ data = self.sailboat.encode('utf-8')
+ f.write(data)
+ f.write(data[:1])
+ f.write(data[1:])
+ f.write(b'\n')
+ f.close()
+ self.assertRegex(c.portable_manifest_text(), r'\+4 .*\+3 ')
+
+ f = c.open('sailboats', 'r')
+ string = f.readline()
+ self.assertEqual(string, self.sailboat+self.sailboat+'\n')
+ f.close()
+
+ def test_write_snowman_across_block_boundary(self):
+ c = Collection()
+ f = c.open('snowmany', 'w')
+ data = self.snowman
+ f.write(data+data+'\n'+data+'\n')
+ f.close()
+ self.assertRegex(c.portable_manifest_text(), r'\+4 .*\+4 .*\+3 ')
+
+ f = c.open('snowmany', 'r')
+ self.assertEqual(f.readline(), self.snowman+self.snowman+'\n')
+ self.assertEqual(f.readline(), self.snowman+'\n')
+ f.close()
class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):