pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
- self._filepos = min(max(pos, 0), self.size())
- if pos < 0L:
++ if pos < 0:
+ raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
+ self._filepos = pos
+ return self._filepos
def tell(self):
return self._filepos
data_size += len(s)
if data_size >= sizehint:
break
- return ''.join(data).splitlines(True)
+ return b''.join(data).decode().splitlines(True)
def size(self):
- raise NotImplementedError()
+ raise IOError(errno.ENOSYS, "Not implemented")
def read(self, size, num_retries=None):
- raise NotImplementedError()
+ raise IOError(errno.ENOSYS, "Not implemented")
def readfrom(self, start, size, num_retries=None):
- raise NotImplementedError()
+ raise IOError(errno.ENOSYS, "Not implemented")
class StreamFileReader(ArvadosFileReaderBase):
self.parent._my_block_manager().block_prefetch(lr.locator)
locs.add(lr.locator)
- return ''.join(data)
+ return b''.join(data)
def _repack_writes(self, num_retries):
- """Test if the buffer block has more data than actual segments.
+ """Optimize buffer block by repacking segments in file sequence.
- This happens when a buffered write over-writes a file range written in
- a previous buffered write. Re-pack the buffer block for efficiency
- and to avoid leaking information.
+ When the client makes random writes, they appear in the buffer block in
+ the sequence they were written rather than the sequence they appear in
+ the file. This makes for inefficient, fragmented manifests. Attempt
+ to optimize by repacking writes in file sequence.
"""
segs = self._segments
"""
def __init__(self, arvadosfile, mode, num_retries=None):
- super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
- self.mode = mode
+ super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
self.arvadosfile.add_writer(self)
+ def writable(self):
+ return True
+
@_FileLikeObjectBase._before_close
@retry_method
def write(self, data, num_retries=None):
open_flags |= os.O_EXCL
try:
if args.destination == "-":
- stdout.write(reader.manifest_text().encode())
- stdout.write(reader.manifest_text(strip=args.strip_manifest))
++ stdout.write(reader.manifest_text(strip=args.strip_manifest).encode())
else:
out_fd = os.open(args.destination, open_flags)
with os.fdopen(out_fd, 'wb') as out_file:
- out_file.write(reader.manifest_text().encode())
- out_file.write(reader.manifest_text(strip=args.strip_manifest))
++ out_file.write(reader.manifest_text(strip=args.strip_manifest).encode())
except (IOError, OSError) as error:
logger.error("can't write to '{}': {}".format(args.destination, error))
return 1
for img in old_images:
i = uuid_to_collection[img["collection"]]
pdh = i["portable_data_hash"]
- if pdh not in already_migrated and (only_migrate is None or pdh in only_migrate):
+ if pdh not in already_migrated and pdh not in need_migrate and (only_migrate is None or pdh in only_migrate):
need_migrate[pdh] = img
with CollectionReader(i["manifest_text"]) as c:
- if list(c.values())[0].size() > biggest:
- biggest = list(c.values())[0].size()
- if c.values()[0].size() > biggest:
- biggest = c.values()[0].size()
++ size = list(c.values())[0].size()
++ if size > biggest:
++ biggest = size
+ biggest_pdh = pdh
- totalbytes += c.values()[0].size()
++ totalbytes += size
+
+
+ if args.storage_driver == "vfs":
+ will_need = (biggest*20)
+ else:
+ will_need = (biggest*2.5)
if args.print_unmigrated:
only_migrate = set()
logger.info("Already migrated %i images", len(already_migrated))
logger.info("Need to migrate %i images", len(need_migrate))
logger.info("Using tempdir %s", tempfile.gettempdir())
- logger.info("Biggest image is about %i MiB, tempdir needs at least %i MiB free", biggest>>20, biggest>>19)
- logger.info("Biggest image %s is about %i MiB", biggest_pdh, biggest/(2**20))
- logger.info("Total data to migrate about %i MiB", totalbytes/(2**20))
++ logger.info("Biggest image %s is about %i MiB", biggest_pdh, biggest>>20)
++ logger.info("Total data to migrate about %i MiB", totalbytes>>20)
+
+ df_out = subprocess.check_output(["df", "-B1", tempfile.gettempdir()])
+ ln = df_out.splitlines()[1]
+ filesystem, blocks, used, available, use_pct, mounted = re.match(r"^([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+)", ln).groups(1)
+ if int(available) <= will_need:
- logger.warn("Temp filesystem mounted at %s does not have enough space for biggest image (has %i MiB, needs %i MiB)", mounted, int(available)/(2**20), will_need/(2**20))
++ logger.warn("Temp filesystem mounted at %s does not have enough space for biggest image (has %i MiB, needs %i MiB)", mounted, int(available)>>20, will_need>>20)
+ if not args.force:
+ exit(1)
+ else:
+ logger.warn("--force provided, will migrate anyway")
if args.dry_run:
return
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
+from __future__ import absolute_import
+from future.utils import listitems
import io
+ import os
+ import re
import shutil
import tempfile
'subdir/baz.txt' : 'baz',
}):
c = collection.Collection()
- for path, data in contents.items():
- with c.open(path, 'w') as f:
+ for path, data in listitems(contents):
+ with c.open(path, 'wb') as f:
f.write(data)
c.save_new()
- return (c.manifest_locator(), c.portable_data_hash(), c.manifest_text())
++
+ return (c.manifest_locator(),
+ c.portable_data_hash(),
+ c.manifest_text(strip=strip_manifest))
-
+
def run_get(self, args):
- self.stdout = io.BytesIO()
- self.stderr = io.BytesIO()
+ self.stdout = tutil.BytesIO()
+ self.stderr = tutil.StringIO()
return arv_get.main(args, self.stdout, self.stderr)
def test_version_argument(self):
self.assertEqual("zzzzz-4zz18-mockcollection0", c.manifest_locator())
self.assertFalse(c.modified())
+
+ def test_truncate2(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
+ api = ArvadosFileWriterTestCase.MockApi({"name":"test_truncate2",
+ "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 7f614da9329cd3aebf59b91aadc30bf0+67108864 0:12:count.txt\n",
+ "replication_desired":None},
+ {"uuid":"zzzzz-4zz18-mockcollection0",
+ "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 7f614da9329cd3aebf59b91aadc30bf0+67108864 0:12:count.txt\n",
+ "portable_data_hash":"272da898abdf86ddc71994835e3155f8+95"})
+ with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+ api_client=api, keep_client=keep) as c:
+ writer = c.open("count.txt", "r+")
+ self.assertEqual(writer.size(), 10)
+ self.assertEqual("0123456789", writer.read(12))
+
+ # extend file size
+ writer.truncate(12)
+
+ self.assertEqual(writer.size(), 12)
+ writer.seek(0, os.SEEK_SET)
+ self.assertEqual(b"0123456789\x00\x00", writer.read(12))
+
+ self.assertIsNone(c.manifest_locator())
+ self.assertTrue(c.modified())
+ c.save_new("test_truncate2")
+ self.assertEqual("zzzzz-4zz18-mockcollection0", c.manifest_locator())
+ self.assertFalse(c.modified())
+
+ def test_truncate3(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789",
+ "a925576942e94b2ef57a066101b48876+10": "abcdefghij"})
+ api = ArvadosFileWriterTestCase.MockApi({"name":"test_truncate",
+ "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n",
+ "replication_desired":None},
+ {"uuid":"zzzzz-4zz18-mockcollection0",
+ "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n",
+ "portable_data_hash":"7fcd0eaac3aad4c31a6a0e756475da92+52"})
+ with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:20:count.txt\n',
+ api_client=api, keep_client=keep) as c:
+ writer = c.open("count.txt", "r+")
+ self.assertEqual(writer.size(), 20)
+ self.assertEqual("0123456789ab", writer.read(12))
+ self.assertEqual(12, writer.tell())
+
+ writer.truncate(8)
+
+ # Make sure reading off the end doesn't break
+ self.assertEqual(12, writer.tell())
+ self.assertEqual("", writer.read(12))
+
+ self.assertEqual(writer.size(), 8)
+ self.assertEqual(2, writer.seek(-10, os.SEEK_CUR))
+ self.assertEqual("234567", writer.read(12))
+
+ self.assertIsNone(c.manifest_locator())
+ self.assertTrue(c.modified())
+ c.save_new("test_truncate")
+ self.assertEqual("zzzzz-4zz18-mockcollection0", c.manifest_locator())
+ self.assertFalse(c.modified())
+
+
+
def test_write_to_end(self):
- keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
- api = ArvadosFileWriterTestCase.MockApi({"name":"test_append",
- "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n",
- "replication_desired":None},
- {"uuid":"zzzzz-4zz18-mockcollection0",
- "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n",
- "portable_data_hash":"c5c3af76565c8efb6a806546bcf073f3+88"})
+ keep = ArvadosFileWriterTestCase.MockKeep({
+ "781e5e245d69b566979b86e28d23f2c7+10": b"0123456789",
+ })
+ api = ArvadosFileWriterTestCase.MockApi({
+ "name": "test_append",
+ "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n",
+ "replication_desired": None,
+ }, {
+ "uuid": "zzzzz-4zz18-mockcollection0",
+ "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n",
+ "portable_data_hash": "c5c3af76565c8efb6a806546bcf073f3+88",
+ })
with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
api_client=api, keep_client=keep) as c:
- writer = c.open("count.txt", "r+")
+ writer = c.open("count.txt", "rb+")
self.assertEqual(writer.size(), 10)
- writer.seek(5, os.SEEK_SET)
+ self.assertEqual(5, writer.seek(5, os.SEEK_SET))
- self.assertEqual("56789", writer.read(8))
+ self.assertEqual(b"56789", writer.read(8))
writer.seek(10, os.SEEK_SET)
writer.write("foo")