+from __future__ import division
+from builtins import object
import logging
_logger = logging.getLogger('arvados.ranges')
self.segment_offset == other.segment_offset)
def first_block(data_locators, range_start):
- block_start = 0L
+ block_start = 0
# range_start/block_start is the inclusive lower bound
# range_end/block_end is the exclusive upper bound
hi = len(data_locators)
lo = 0
- i = int((hi + lo) / 2)
+ i = (hi + lo) // 2
block_size = data_locators[i].range_size
block_start = data_locators[i].range_start
block_end = block_start + block_size
lo = i
else:
hi = i
- i = int((hi + lo) / 2)
+ i = (hi + lo) // 2
block_size = data_locators[i].range_size
block_start = data_locators[i].range_start
block_end = block_start + block_size
# range ends before this segment starts, so don't look at any more locators
break
- if old_segment_start <= new_range_start and new_range_end <= old_segment_end:
+ if old_segment_start <= new_range_start and new_range_end <= old_segment_end:
# new range starts and ends in old segment
# split segment into up to 3 pieces
if (new_range_start-old_segment_start) > 0:
-import functools
-import os
-import zlib
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+from future.utils import listitems, listvalues
+standard_library.install_aliases()
+from builtins import range
+from builtins import object
import bz2
-import config
-import hashlib
-import threading
-import Queue
+import collections
import copy
import errno
-import re
+import functools
+import hashlib
import logging
-import collections
+import os
+import queue
+import re
+import sys
+import threading
import uuid
+import zlib
+from . import config
from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
from ._normalize_stream import normalize_stream
- from ._ranges import locators_and_ranges, replace_range, Range
+ from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
from .retry import retry_method
MOD = "mod"
class ArvadosFileReaderBase(_FileLikeObjectBase):
def __init__(self, name, mode, num_retries=None):
super(ArvadosFileReaderBase, self).__init__(name, mode)
- self._filepos = 0L
+ self._binary = 'b' in mode
+ if sys.version_info >= (3, 0) and not self._binary:
+ raise NotImplementedError("text mode {!r} is not implemented".format(mode))
+ self._filepos = 0
self.num_retries = num_retries
self._readline_cache = (None, None)
pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
- if pos < 0L:
+ if pos < 0:
raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
self._filepos = pos
return self._filepos
def readall(self, size=2**20, num_retries=None):
while True:
data = self.read(size, num_retries=num_retries)
- if data == '':
+ if len(data) == 0:
break
yield data
data = [cache_data]
self._filepos += len(cache_data)
else:
- data = ['']
+ data = [b'']
data_size = len(data[-1])
- while (data_size < size) and ('\n' not in data[-1]):
+ while (data_size < size) and (b'\n' not in data[-1]):
next_read = self.read(2 ** 20, num_retries=num_retries)
if not next_read:
break
data.append(next_read)
data_size += len(next_read)
- data = ''.join(data)
+ data = b''.join(data)
try:
- nextline_index = data.index('\n') + 1
+ nextline_index = data.index(b'\n') + 1
except ValueError:
nextline_index = len(data)
nextline_index = min(nextline_index, size)
self._filepos -= len(data) - nextline_index
self._readline_cache = (self.tell(), data[nextline_index:])
- return data[:nextline_index]
+ return data[:nextline_index].decode()
@_FileLikeObjectBase._before_close
@retry_method
data_size += len(s)
if data_size >= sizehint:
break
- return ''.join(data).splitlines(True)
+ return b''.join(data).decode().splitlines(True)
def size(self):
raise IOError(errno.ENOSYS, "Not implemented")
def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
if size == 0:
- return ''
+ return b''
- data = ''
+ data = b''
available_chunks = locators_and_ranges(self.segments, self._filepos, size)
if available_chunks:
lr = available_chunks[0]
data = self._stream.readfrom(lr.locator+lr.segment_offset,
- lr.segment_size,
- num_retries=num_retries)
+ lr.segment_size,
+ num_retries=num_retries)
self._filepos += len(data)
return data
def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
- return ''
+ return b''
data = []
for lr in locators_and_ranges(self.segments, start, size):
data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
num_retries=num_retries))
- return ''.join(data)
+ return b''.join(data)
def as_manifest(self):
segs = []
PENDING = 1
COMMITTED = 2
ERROR = 3
+ DELETED = 4
def __init__(self, blockid, starting_capacity, owner):
"""
"""
if self._state == _BufferBlock.WRITABLE:
+ if not isinstance(data, bytes) and not isinstance(data, memoryview):
+ data = data.encode()
while (self.write_pointer+len(data)) > len(self.buffer_block):
new_buffer_block = bytearray(len(self.buffer_block) * 2)
new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
@synchronized
def clear(self):
+ self._state = _BufferBlock.DELETED
self.owner = None
self.buffer_block = None
self.buffer_view = None
+ @synchronized
+ def repack_writes(self):
+ """Optimize buffer block by repacking segments in file sequence.
+
+ When the client makes random writes, they appear in the buffer block in
+ the sequence they were written rather than the sequence they appear in
+ the file. This makes for inefficient, fragmented manifests. Attempt
+ to optimize by repacking writes in file sequence.
+
+ """
+ if self._state != _BufferBlock.WRITABLE:
+ raise AssertionError("Cannot repack non-writable block")
+
+ segs = self.owner.segments()
+
+ # Collect the segments that reference the buffer block.
+ bufferblock_segs = [s for s in segs if s.locator == self.blockid]
+
+ # Collect total data referenced by segments (could be smaller than
+ # bufferblock size if a portion of the file was written and
+ # then overwritten).
+ write_total = sum([s.range_size for s in bufferblock_segs])
+
+ if write_total < self.size() or len(bufferblock_segs) > 1:
+ # If there's more than one segment referencing this block, it is
+ # due to out-of-order writes and will produce a fragmented
+ # manifest, so try to optimize by re-packing into a new buffer.
+ contents = self.buffer_view[0:self.write_pointer].tobytes()
+ new_bb = _BufferBlock(None, write_total, None)
+ for t in bufferblock_segs:
+ new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
+ t.segment_offset = new_bb.size() - t.range_size
+
+ self.buffer_block = new_bb.buffer_block
+ self.buffer_view = new_bb.buffer_view
+ self.write_pointer = new_bb.write_pointer
+ self._locator = None
+ new_bb.clear()
+ self.owner.set_segments(segs)
+
+ def __repr__(self):
+ return "<BufferBlock %s>" % (self.blockid)
+
class NoopLock(object):
def __enter__(self):
def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if blockid is None:
- blockid = "%s" % uuid.uuid4()
+ blockid = str(uuid.uuid4())
bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
self._bufferblocks[bufferblock.blockid] = bufferblock
return bufferblock
ArvadosFile that owns the new block
"""
- new_blockid = "bufferblock%i" % len(self._bufferblocks)
+ new_blockid = str(uuid.uuid4())
bufferblock = block.clone(new_blockid, owner)
self._bufferblocks[bufferblock.blockid] = bufferblock
return bufferblock
# blocks pending. If they are full 64 MiB blocks, that means up to
# 256 MiB of internal buffering, which is the same size as the
# default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
+ self._put_queue = queue.Queue(maxsize=2)
self._put_threads = []
- for i in xrange(0, self.num_put_threads):
+ for i in range(0, self.num_put_threads):
thread = threading.Thread(target=self._commit_bufferblock_worker)
self._put_threads.append(thread)
thread.daemon = True
@synchronized
def start_get_threads(self):
if self._prefetch_threads is None:
- self._prefetch_queue = Queue.Queue()
+ self._prefetch_queue = queue.Queue()
self._prefetch_threads = []
- for i in xrange(0, self.num_get_threads):
+ for i in range(0, self.num_get_threads):
thread = threading.Thread(target=self._block_prefetch_worker)
self._prefetch_threads.append(thread)
thread.daemon = True
@synchronized
def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
"""Packs small blocks together before uploading"""
+
self._pending_write_size += closed_file_size
# Check if there are enough small blocks for filling up one in full
- if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+ if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+ return
- # Search blocks ready for getting packed together before being committed to Keep.
- # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
- # size is <= KEEP_BLOCK_SIZE/2.
- try:
- small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
- except AttributeError:
- # Writable blocks without owner shouldn't exist.
- raise UnownedBlockError()
- # Search blocks ready for getting packed together before being committed to Keep.
++ # Search blocks ready for getting packed together before being
++ # committed to Keep.
+ # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
++ # A WRITABLE block with its owner.closed() implies that its
+ # size is <= KEEP_BLOCK_SIZE/2.
+ try:
- small_blocks = [b for b in self._bufferblocks.values()
++ small_blocks = [b for b in listvalues(self._bufferblocks)
+ if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ except AttributeError:
+ # Writable blocks without owner shouldn't exist.
+ raise UnownedBlockError()
+
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
- if len(small_blocks) <= 1:
- # Not enough small blocks for repacking
- return
+ for bb in small_blocks:
+ bb.repack_writes()
- # Update the pending write size count with its true value, just in case
- # some small file was opened, written and closed several times.
- self._pending_write_size = sum([b.size() for b in small_blocks])
- if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
- return
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
- new_bb = self._alloc_bufferblock()
- while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
- bb = small_blocks.pop(0)
- arvfile = bb.owner
- self._pending_write_size -= bb.size()
- new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
- arvfile.set_segments([Range(new_bb.blockid,
- 0,
- bb.size(),
- new_bb.write_pointer - bb.size())])
- self._delete_bufferblock(bb.blockid)
- self.commit_bufferblock(new_bb, sync=sync)
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
+ new_bb = self._alloc_bufferblock()
+ files = []
+ while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ bb = small_blocks.pop(0)
+ self._pending_write_size -= bb.size()
+ new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+ files.append((bb, new_bb.write_pointer - bb.size()))
+
+ self.commit_bufferblock(new_bb, sync=sync)
+
+ for bb, new_bb_segment_offset in files:
+ newsegs = bb.owner.segments()
+ for s in newsegs:
+ if s.locator == bb.blockid:
+ s.locator = new_bb.locator()
+ s.segment_offset = new_bb_segment_offset+s.segment_offset
+ bb.owner.set_segments(newsegs)
+ self._delete_bufferblock(bb.blockid)
def commit_bufferblock(self, block, sync):
"""Initiate a background upload of a bufferblock.
self.repack_small_blocks(force=True, sync=True)
with self.lock:
- items = self._bufferblocks.items()
+ items = listitems(self._bufferblocks)
for k,v in items:
if v.state() != _BufferBlock.COMMITTED and v.owner:
with self.lock:
if len(self._segments) != len(othersegs):
return False
- for i in xrange(0, len(othersegs)):
+ for i in range(0, len(othersegs)):
seg1 = self._segments[i]
seg2 = othersegs[i]
loc1 = seg1.locator
"""
self._writers.remove(writer)
- if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
+ if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
# File writer closed, not small enough for repacking
self.flush()
elif self.closed():
with self.lock:
if size == 0 or offset >= self.size():
- return ''
+ return b''
readsegs = locators_and_ranges(self._segments, offset, size)
prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
self.parent._my_block_manager().block_prefetch(lr.locator)
locs.add(lr.locator)
- return ''.join(data)
+ return b''.join(data)
- def _repack_writes(self, num_retries):
- """Optimize buffer block by repacking segments in file sequence.
-
- When the client makes random writes, they appear in the buffer block in
- the sequence they were written rather than the sequence they appear in
- the file. This makes for inefficient, fragmented manifests. Attempt
- to optimize by repacking writes in file sequence.
-
- """
- segs = self._segments
-
- # Collect the segments that reference the buffer block.
- bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
-
- # Collect total data referenced by segments (could be smaller than
- # bufferblock size if a portion of the file was written and
- # then overwritten).
- write_total = sum([s.range_size for s in bufferblock_segs])
-
- if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
- # If there's more than one segment referencing this block, it is
- # due to out-of-order writes and will produce a fragmented
- # manifest, so try to optimize by re-packing into a new buffer.
- contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
- new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
- for t in bufferblock_segs:
- new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
- t.segment_offset = new_bb.size() - t.range_size
-
- self._current_bblock = new_bb
-
@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
necessary.
"""
+ if not isinstance(data, bytes) and not isinstance(data, memoryview):
+ data = data.encode()
if len(data) == 0:
return
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes(num_retries)
+ self._current_bblock.repack_writes()
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
if self._current_bblock.state() == _BufferBlock.WRITABLE:
- self._repack_writes(num_retries)
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
+ self._current_bblock.repack_writes()
+ if self._current_bblock.state() != _BufferBlock.DELETED:
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
if sync:
to_delete = set()
normalize=False, only_committed=False):
buf = ""
filestream = []
- for segment in self.segments:
+ for segment in self._segments:
loc = segment.locator
if self.parent._my_block_manager().is_bufferblock(loc):
if only_committed:
continue
- loc = self._bufferblocks[loc].calculate_locator()
+ loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
- filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+ filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
segment.segment_offset, segment.range_size))
- buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
+ buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
buf += "\n"
return buf
"""
- def __init__(self, arvadosfile, num_retries=None):
- super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
+ def __init__(self, arvadosfile, mode="r", num_retries=None):
+ super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
self.arvadosfile = arvadosfile
def size(self):
data.append(rd)
self._filepos += len(rd)
rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
- return ''.join(data)
+ return b''.join(data)
else:
data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
self._filepos += len(data)
"""
def __init__(self, arvadosfile, mode, num_retries=None):
- super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
- self.mode = mode
+ super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
self.arvadosfile.add_writer(self)
def writable(self):
@retry_method
def write(self, data, num_retries=None):
if self.mode[0] == "a":
- self.arvadosfile.writeto(self.size(), data, num_retries)
- else:
- self.arvadosfile.writeto(self._filepos, data, num_retries)
- self._filepos += len(data)
+ self._filepos = self.size()
+ self.arvadosfile.writeto(self._filepos, data, num_retries)
+ self._filepos += len(data)
return len(data)
@_FileLikeObjectBase._before_close
-#!/usr/bin/env python
-
-# TODO:
-# --md5sum - display md5 of each file as read from disk
-
+from __future__ import division
+from future.utils import listitems, listvalues
+from builtins import str
+from builtins import object
import argparse
import arvados
import arvados.collection
Do not continue interrupted uploads from cached state.
""")
+ _group = run_opts.add_mutually_exclusive_group()
+ _group.add_argument('--follow-links', action='store_true', default=True,
+ dest='follow_links', help="""
+ Follow file and directory symlinks (default).
+ """)
+ _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
+ help="""
+ Do not follow file and directory symlinks.
+ """)
+
_group = run_opts.add_mutually_exclusive_group()
_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
help="""
if len(args.paths) == 0:
args.paths = ['-']
- args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
+ args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
if args.filename:
return args
+ class PathDoesNotExistError(Exception):
+ pass
+
+
class CollectionUpdateError(Exception):
pass
@classmethod
def make_path(cls, args):
md5 = hashlib.md5()
- md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
realpaths = sorted(os.path.realpath(path) for path in args.paths)
- md5.update('\0'.join(realpaths))
+ md5.update(b'\0'.join([p.encode() for p in realpaths]))
if any(os.path.isdir(path) for path in realpaths):
- md5.update("-1")
+ md5.update(b'-1')
elif args.filename:
- md5.update(args.filename)
+ md5.update(args.filename.encode())
return os.path.join(
arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
md5.hexdigest())
ensure_unique_name=False, num_retries=None,
put_threads=None, replication_desired=None,
filename=None, update_time=60.0, update_collection=None,
- logger=logging.getLogger('arvados.arv_put'), dry_run=False):
+ logger=logging.getLogger('arvados.arv_put'), dry_run=False,
+ follow_links=True):
self.paths = paths
self.resume = resume
self.use_cache = use_cache
self.logger = logger
self.dry_run = dry_run
self._checkpoint_before_quit = True
+ self.follow_links = follow_links
if not self.use_cache and self.resume:
raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
if self.dry_run:
raise ArvPutUploadIsPending()
self._write_stdin(self.filename or 'stdin')
+ elif not os.path.exists(path):
+ raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
elif os.path.isdir(path):
# Use absolute paths on cache index so CWD doesn't interfere
# with the caching logic.
prefixdir = path = os.path.abspath(path)
if prefixdir != '/':
prefixdir += '/'
- for root, dirs, files in os.walk(path):
+ for root, dirs, files in os.walk(path, followlinks=self.follow_links):
# Make os.walk()'s dir traversing order deterministic
dirs.sort()
files.sort()
except (SystemExit, Exception) as e:
self._checkpoint_before_quit = False
# Log stack trace only when Ctrl-C isn't pressed (SIGINT)
-- # Note: We're expecting SystemExit instead of KeyboardInterrupt because
-- # we have a custom signal handler in place that raises SystemExit with
-- # the catched signal's code.
- if not isinstance(e, SystemExit) or e.code != -2:
++ # Note: We're expecting SystemExit instead of
++ # KeyboardInterrupt because we have a custom signal
++ # handler in place that raises SystemExit with the catched
++ # signal's code.
+ if isinstance(e, PathDoesNotExistError):
+ # We aren't interested in the traceback for this case
+ pass
+ elif not isinstance(e, SystemExit) or e.code != -2:
- self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
+ self.logger.warning("Abnormal termination:\n{}".format(
+ traceback.format_exc()))
raise
finally:
if not self.dry_run:
Recursively get the total size of the collection
"""
size = 0
- for item in collection.values():
+ for item in listvalues(collection):
if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
size += self._collection_size(item)
else:
self.reporter(self.bytes_written, self.bytes_expected)
def _write_stdin(self, filename):
- output = self._local_collection.open(filename, 'w')
+ output = self._local_collection.open(filename, 'wb')
self._write(sys.stdin, output)
output.close()
def _check_file(self, source, filename):
- """Check if this file needs to be uploaded"""
+ """
+ Check if this file needs to be uploaded
+ """
+ # Ignore symlinks when requested
+ if (not self.follow_links) and os.path.islink(source):
+ return
resume_offset = 0
should_upload = False
new_file_in_cache = False
def _upload_files(self):
for source, resume_offset, filename in self._files_to_upload:
- with open(source, 'r') as source_fd:
+ with open(source, 'rb') as source_fd:
with self._state_lock:
self._state['files'][source]['mtime'] = os.path.getmtime(source)
self._state['files'][source]['size'] = os.path.getsize(source)
if resume_offset > 0:
# Start upload where we left off
- output = self._local_collection.open(filename, 'a')
+ output = self._local_collection.open(filename, 'ab')
source_fd.seek(resume_offset)
else:
# Start from scratch
- output = self._local_collection.open(filename, 'w')
+ output = self._local_collection.open(filename, 'wb')
self._write(source_fd, output)
output.close(flush=False)
if self.use_cache:
# Set up cache file name from input paths.
md5 = hashlib.md5()
- md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
realpaths = sorted(os.path.realpath(path) for path in self.paths)
- md5.update('\0'.join(realpaths))
+ md5.update(b'\0'.join([p.encode() for p in realpaths]))
if self.filename:
- md5.update(self.filename)
+ md5.update(self.filename.encode())
cache_filename = md5.hexdigest()
cache_filepath = os.path.join(
arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
def collection_file_paths(self, col, path_prefix='.'):
"""Return a list of file paths by recursively go through the entire collection `col`"""
file_paths = []
- for name, item in col.items():
+ for name, item in listitems(col):
if isinstance(item, arvados.arvfile.ArvadosFile):
file_paths.append(os.path.join(path_prefix, name))
elif isinstance(item, arvados.collection.Subcollection):
state = json.dumps(self._state)
try:
new_cache = tempfile.NamedTemporaryFile(
+ mode='w+',
dir=os.path.dirname(self._cache_filename), delete=False)
self._lock_file(new_cache)
new_cache.write(state)
def portable_data_hash(self):
pdh = self._my_collection().portable_data_hash()
- m = self._my_collection().stripped_manifest()
- local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
+ m = self._my_collection().stripped_manifest().encode()
+ local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
if pdh != local_pdh:
logger.warning("\n".join([
"arv-put: API server provided PDH differs from local manifest.",
locators.append(loc)
return locators
elif isinstance(item, arvados.collection.Collection):
- l = [self._datablocks_on_item(x) for x in item.values()]
+ l = [self._datablocks_on_item(x) for x in listvalues(item)]
# Fast list flattener method taken from:
# http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
return [loc for sublist in l for loc in sublist]
return datablocks
- def expected_bytes_for(pathlist):
+ def expected_bytes_for(pathlist, follow_links=True):
# Walk the given directory trees and stat files, adding up file sizes,
# so we can display progress as percent
bytesum = 0
for path in pathlist:
if os.path.isdir(path):
- for filename in arvados.util.listdir_recursive(path):
- bytesum += os.path.getsize(os.path.join(path, filename))
+ for root, dirs, files in os.walk(path, followlinks=follow_links):
+ # Sum file sizes
+ for f in files:
+ filepath = os.path.join(root, f)
+ # Ignore symlinked files when requested
+ if (not follow_links) and os.path.islink(filepath):
+ continue
+ bytesum += os.path.getsize(filepath)
elif not os.path.isfile(path):
return None
else:
# uploaded, the expected bytes calculation can take a moment.
if args.progress and any([os.path.isdir(f) for f in args.paths]):
logger.info("Calculating upload size, this could take some time...")
- bytes_expected = expected_bytes_for(args.paths)
+ bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
try:
writer = ArvPutUploadJob(paths = args.paths,
ensure_unique_name = True,
update_collection = args.update_collection,
logger=logger,
- dry_run=args.dry_run)
+ dry_run=args.dry_run,
+ follow_links=args.follow_links)
except ResumeCacheConflict:
logger.error("\n".join([
"arv-put: Another process is already uploading this data.",
except ArvPutUploadNotPending:
# No files pending for upload
sys.exit(0)
+ except PathDoesNotExistError as error:
+ logger.error("\n".join([
+ "arv-put: %s" % str(error)]))
+ sys.exit(1)
if args.progress: # Print newline to split stderr from stdout for humans.
logger.info("\n")
if not output.endswith('\n'):
stdout.write('\n')
- for sigcode, orig_handler in orig_signal_handlers.items():
+ for sigcode, orig_handler in listitems(orig_signal_handlers):
signal.signal(sigcode, orig_handler)
if status != 0:
-#!/usr/bin/env python
-
from __future__ import print_function
+from __future__ import division
+from builtins import str
+from builtins import range
import argparse
import atexit
import errno
# Use up to half of the +wait+ period waiting for "passenger
# stop" to work. If the process hasn't exited by then, start
# sending TERM signals.
- startTERM += wait/2
+ startTERM += wait//2
server_pid = None
while now <= deadline and server_pid is None:
except OSError as error:
if error.errno != errno.ENOENT:
raise
- os.mkfifo(fifo, 0700)
+ os.mkfifo(fifo, 0o700)
subprocess.Popen(
['stdbuf', '-i0', '-oL', '-eL', 'sed', '-e', 's/^/['+label+'] /', fifo],
stdout=sys.stderr)
env = os.environ.copy()
env['RAILS_ENV'] = 'test'
env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
- if env.get('ARVADOS_TEST_EXPERIMENTAL_WS'):
- env.pop('ARVADOS_WEBSOCKETS', None)
- else:
- env['ARVADOS_WEBSOCKETS'] = 'yes'
+ env.pop('ARVADOS_WEBSOCKETS', None)
env.pop('ARVADOS_TEST_API_HOST', None)
env.pop('ARVADOS_API_HOST', None)
env.pop('ARVADOS_API_HOST_INSECURE', None)
"-listen=:{}".format(port),
"-pid="+_pidfile('keep{}'.format(n))]
- for arg, val in keep_args.iteritems():
+ for arg, val in keep_args.items():
keep_cmd.append("{}={}".format(arg, val))
logf = open(_fifo2stderr('keep{}'.format(n)), 'w')
@staticmethod
def _restore_dict(src, dest):
- for key in dest.keys():
+ for key in list(dest.keys()):
if key not in src:
del dest[key]
dest.update(src)
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+standard_library.install_aliases()
+from builtins import str
+from builtins import range
import apiclient
-import io
import mock
import os
import pwd
import threading
import hashlib
import random
+ import uuid
-from cStringIO import StringIO
-
import arvados
import arvados.commands.put as arv_put
-import arvados_testutil as tutil
+from . import arvados_testutil as tutil
-from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
-import run_test_server
+from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
+from . import run_test_server
class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
CACHE_ARGSET = [
_, self.large_file_name = tempfile.mkstemp()
fileobj = open(self.large_file_name, 'w')
# Make sure to write just a little more than one block
- for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
- data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+ for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
+ data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
fileobj.write(data)
fileobj.close()
# Temp dir containing small files to be repacked
with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
f.write(data + str(i))
self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
+ # Temp dir to hold a symlink to other temp dir
+ self.tempdir_with_symlink = tempfile.mkdtemp()
+ os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
+ os.symlink(os.path.join(self.tempdir, '1'),
+ os.path.join(self.tempdir_with_symlink, 'linkedfile'))
def tearDown(self):
super(ArvPutUploadJobTest, self).tearDown()
shutil.rmtree(self.tempdir)
os.unlink(self.large_file_name)
shutil.rmtree(self.small_files_dir)
+ shutil.rmtree(self.tempdir_with_symlink)
+
+ def test_symlinks_are_followed_by_default(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
+ cwriter.start(save_collection=False)
+ self.assertIn('linkeddir', cwriter.manifest_text())
+ self.assertIn('linkedfile', cwriter.manifest_text())
+ cwriter.destroy_cache()
+
+ def test_symlinks_are_not_followed_when_requested(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
+ follow_links=False)
+ cwriter.start(save_collection=False)
+ self.assertNotIn('linkeddir', cwriter.manifest_text())
+ self.assertNotIn('linkedfile', cwriter.manifest_text())
+ cwriter.destroy_cache()
+
+ def test_passing_nonexistant_path_raise_exception(self):
+ uuid_str = str(uuid.uuid4())
+ cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
+ with self.assertRaises(arv_put.PathDoesNotExistError):
+ cwriter.start(save_collection=False)
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
def test_writer_works_with_cache(self):
with tempfile.NamedTemporaryFile() as f:
- f.write('foo')
+ f.write(b'foo')
f.flush()
cwriter = arv_put.ArvPutUploadJob([f.name])
cwriter.start(save_collection=False)
- self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
+ self.assertEqual(0, cwriter.bytes_skipped)
+ self.assertEqual(3, cwriter.bytes_written)
# Don't destroy the cache, and start another upload
cwriter_new = arv_put.ArvPutUploadJob([f.name])
cwriter_new.start(save_collection=False)
cwriter_new.destroy_cache()
- self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
+ self.assertEqual(3, cwriter_new.bytes_skipped)
+ self.assertEqual(3, cwriter_new.bytes_written)
def make_progress_tester(self):
progression = []
def test_progress_reporting(self):
with tempfile.NamedTemporaryFile() as f:
- f.write('foo')
+ f.write(b'foo')
f.flush()
for expect_count in (None, 8):
progression, reporter = self.make_progress_tester()
def test_known_human_progress(self):
for count, total in [(0, 1), (2, 4), (45, 60)]:
- expect = '{:.1%}'.format(float(count) / total)
+ expect = '{:.1%}'.format(1.0*count/total)
actual = arv_put.human_progress(count, total)
self.assertTrue(actual.startswith('\r'))
self.assertIn(expect, actual)
arv_put.human_progress(count, None)))
-class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
+class ArvadosPutTest(run_test_server.TestCaseWithServers,
+ ArvadosBaseTestCase,
+ tutil.VersionChecker):
MAIN_SERVER = {}
Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
def call_main_with_args(self, args):
- self.main_stdout = StringIO()
- self.main_stderr = StringIO()
+ self.main_stdout = tutil.StringIO()
+ self.main_stderr = tutil.StringIO()
return arv_put.main(args, self.main_stdout, self.main_stderr)
def call_main_on_test_file(self, args=[]):
super(ArvadosPutTest, self).tearDown()
def test_version_argument(self):
- err = io.BytesIO()
- out = io.BytesIO()
- with tutil.redirected_streams(stdout=out, stderr=err):
+ with tutil.redirected_streams(
+ stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
with self.assertRaises(SystemExit):
self.call_main_with_args(['--version'])
- self.assertEqual(out.getvalue(), '')
- self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+ self.assertVersionOutput(out, err)
def test_simple_file_put(self):
self.call_main_on_test_file()
def test_api_error_handling(self):
coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
coll_save_mock.side_effect = arvados.errors.ApiError(
- fake_httplib2_response(403), '{}')
+ fake_httplib2_response(403), b'{}')
with mock.patch('arvados.collection.Collection.save_new',
new=coll_save_mock):
with self.assertRaises(SystemExit) as exc_test:
result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
0)
except ValueError as error:
- self.assertIn(BAD_UUID, error.message)
+ self.assertIn(BAD_UUID, str(error))
else:
self.assertFalse(result, "incorrectly found nonexistent project")
[sys.executable, arv_put.__file__, '--stream'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, env=self.ENVIRON)
- pipe.stdin.write('stdin test\n')
+ pipe.stdin.write(b'stdin test\n')
pipe.stdin.close()
deadline = time.time() + 5
while (pipe.poll() is None) and (time.time() < deadline):
elif returncode != 0:
sys.stdout.write(pipe.stdout.read())
self.fail("arv-put returned exit code {}".format(returncode))
- self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
+ self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
+ pipe.stdout.read().decode())
def test_ArvPutSignedManifest(self):
# ArvPutSignedManifest runs "arv-put foo" and then attempts to get
with open(os.path.join(datadir, "foo"), "w") as f:
f.write("The quick brown fox jumped over the lazy dog")
p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
- stdout=subprocess.PIPE, env=self.ENVIRON)
- (arvout, arverr) = p.communicate()
- self.assertEqual(arverr, None)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Collection saved as ')
self.assertEqual(p.returncode, 0)
# The manifest text stored in the API server under the same
# manifest UUID must use signed locators.
c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
- self.assertRegexpMatches(
+ self.assertRegex(
c['manifest_text'],
r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
[sys.executable, arv_put.__file__] + extra_args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=self.ENVIRON)
- stdout, stderr = pipe.communicate(text)
+ stdout, stderr = pipe.communicate(text.encode())
+ self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
search_key = ('portable_data_hash'
if '--portable-data-hash' in extra_args else 'uuid')
collection_list = arvados.api('v1').collections().list(
- filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
+ filters=[[search_key, '=', stdout.decode().strip()]]
+ ).execute().get('items', [])
self.assertEqual(1, len(collection_list))
return collection_list[0]
self.assertEqual(col['uuid'], updated_col['uuid'])
# Get the manifest and check that the new file is being included
c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
- self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+ self.assertRegex(c['manifest_text'], r'^\. .*:44:file2\n')
def test_put_collection_with_high_redundancy(self):
# Write empty data: we're not testing CollectionWriter, just
"Test unnamed collection",
['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
username = pwd.getpwuid(os.getuid()).pw_name
- self.assertRegexpMatches(
+ self.assertRegex(
link['name'],
r'^Saved at .* by {}@'.format(re.escape(username)))
-# usage example:
-#
-# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
+from __future__ import absolute_import
+from builtins import object
import arvados
import copy
import mock
import os
import pprint
import re
+import sys
import tempfile
import unittest
-import run_test_server
+from . import run_test_server
from arvados._ranges import Range, LocatorAndRange
from arvados.collection import Collection, CollectionReader
-import arvados_testutil as tutil
+from . import arvados_testutil as tutil
class TestResumableWriter(arvados.ResumableCollectionWriter):
KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
self.assertEqual(cw.current_stream_name(), '.',
'current_stream_name() should be "." now')
cw.set_current_file_name('foo.txt')
- cw.write('foo')
+ cw.write(b'foo')
self.assertEqual(cw.current_file_name(), 'foo.txt',
'current_file_name() should be foo.txt now')
cw.start_new_file('bar.txt')
- cw.write('bar')
+ cw.write(b'bar')
cw.start_new_stream('baz')
- cw.write('baz')
+ cw.write(b'baz')
cw.set_current_file_name('baz.txt')
self.assertEqual(cw.manifest_text(),
". 3858f62230ac3c915f300c664312c63f+6 0:3:foo.txt 3:3:bar.txt\n" +
return cw.portable_data_hash()
def test_keep_local_store(self):
- self.assertEqual(self.keep_client.put('foo'), 'acbd18db4cc2f85cedef654fccc4a4d8+3', 'wrong md5 hash from Keep.put')
- self.assertEqual(self.keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3'), 'foo', 'wrong data from Keep.get')
+ self.assertEqual(self.keep_client.put(b'foo'), 'acbd18db4cc2f85cedef654fccc4a4d8+3', 'wrong md5 hash from Keep.put')
+ self.assertEqual(self.keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3'), b'foo', 'wrong data from Keep.get')
def test_local_collection_writer(self):
self.assertEqual(self.write_foo_bar_baz(),
for s in cr.all_streams():
for f in s.all_files():
got += [[f.size(), f.stream_name(), f.name(), f.read(2**26)]]
- expected = [[3, '.', 'foo.txt', 'foo'],
- [3, '.', 'bar.txt', 'bar'],
- [3, './baz', 'baz.txt', 'baz']]
+ expected = [[3, '.', 'foo.txt', b'foo'],
+ [3, '.', 'bar.txt', b'bar'],
+ [3, './baz', 'baz.txt', b'baz']]
self.assertEqual(got,
expected)
stream0 = cr.all_streams()[0]
self.assertEqual(stream0.readfrom(0, 0),
- '',
+ b'',
'reading zero bytes should have returned empty string')
self.assertEqual(stream0.readfrom(0, 2**26),
- 'foobar',
+ b'foobar',
'reading entire stream failed')
self.assertEqual(stream0.readfrom(2**26, 0),
- '',
+ b'',
'reading zero bytes should have returned empty string')
def _test_subset(self, collection, expected):
def test_collection_manifest_subset(self):
foobarbaz = self.write_foo_bar_baz()
self._test_subset(foobarbaz,
- [[3, '.', 'bar.txt', 'bar'],
- [3, '.', 'foo.txt', 'foo'],
- [3, './baz', 'baz.txt', 'baz']])
+ [[3, '.', 'bar.txt', b'bar'],
+ [3, '.', 'foo.txt', b'foo'],
+ [3, './baz', 'baz.txt', b'baz']])
self._test_subset((". %s %s 0:3:foo.txt 3:3:bar.txt\n" %
- (self.keep_client.put("foo"),
- self.keep_client.put("bar"))),
- [[3, '.', 'bar.txt', 'bar'],
- [3, '.', 'foo.txt', 'foo']])
+ (self.keep_client.put(b"foo"),
+ self.keep_client.put(b"bar"))),
+ [[3, '.', 'bar.txt', b'bar'],
+ [3, '.', 'foo.txt', b'foo']])
self._test_subset((". %s %s 0:2:fo.txt 2:4:obar.txt\n" %
- (self.keep_client.put("foo"),
- self.keep_client.put("bar"))),
- [[2, '.', 'fo.txt', 'fo'],
- [4, '.', 'obar.txt', 'obar']])
+ (self.keep_client.put(b"foo"),
+ self.keep_client.put(b"bar"))),
+ [[2, '.', 'fo.txt', b'fo'],
+ [4, '.', 'obar.txt', b'obar']])
self._test_subset((". %s %s 0:2:fo.txt 2:0:zero.txt 2:2:ob.txt 4:2:ar.txt\n" %
- (self.keep_client.put("foo"),
- self.keep_client.put("bar"))),
- [[2, '.', 'ar.txt', 'ar'],
- [2, '.', 'fo.txt', 'fo'],
- [2, '.', 'ob.txt', 'ob'],
- [0, '.', 'zero.txt', '']])
+ (self.keep_client.put(b"foo"),
+ self.keep_client.put(b"bar"))),
+ [[2, '.', 'ar.txt', b'ar'],
+ [2, '.', 'fo.txt', b'fo'],
+ [2, '.', 'ob.txt', b'ob'],
+ [0, '.', 'zero.txt', b'']])
def test_collection_empty_file(self):
cw = arvados.CollectionWriter(self.api_client)
cw.start_new_file('zero.txt')
- cw.write('')
+ cw.write(b'')
self.assertEqual(cw.manifest_text(), ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:zero.txt\n")
self.check_manifest_file_sizes(cw.manifest_text(), [0])
cw = arvados.CollectionWriter(self.api_client)
cw.start_new_file('zero.txt')
- cw.write('')
+ cw.write(b'')
cw.start_new_file('one.txt')
- cw.write('1')
+ cw.write(b'1')
cw.start_new_stream('foo')
cw.start_new_file('zero.txt')
- cw.write('')
+ cw.write(b'')
self.check_manifest_file_sizes(cw.manifest_text(), [0,1,0])
def test_no_implicit_normalize(self):
cw = arvados.CollectionWriter(self.api_client)
cw.start_new_file('b')
- cw.write('b')
+ cw.write(b'b')
cw.start_new_file('a')
- cw.write('')
+ cw.write(b'')
self.check_manifest_file_sizes(cw.manifest_text(), [1,0])
self.check_manifest_file_sizes(
arvados.CollectionReader(
return self.content[locator]
def test_stream_reader(self):
- keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
- 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
- 'cccccccccccccccccccccccccccccccc+5': 'z0123'}
+ keepblocks = {
+ 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': b'abcdefghij',
+ 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': b'klmnopqrstuvwxy',
+ 'cccccccccccccccccccccccccccccccc+5': b'z0123',
+ }
mk = self.MockKeep(keepblocks)
sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:30:foo"], mk)
- content = 'abcdefghijklmnopqrstuvwxyz0123456789'
+ content = b'abcdefghijklmnopqrstuvwxyz0123456789'
self.assertEqual(sr.readfrom(0, 30), content[0:30])
self.assertEqual(sr.readfrom(2, 30), content[2:30])
self.assertEqual(sr.readfrom(15, 5), content[15:20])
self.assertEqual(sr.readfrom(20, 5), content[20:25])
self.assertEqual(sr.readfrom(25, 5), content[25:30])
- self.assertEqual(sr.readfrom(30, 5), '')
+ self.assertEqual(sr.readfrom(30, 5), b'')
def test_extract_file(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
def test_write_multiple_files(self):
cwriter = arvados.CollectionWriter(self.api_client)
for letter in 'ABC':
- with self.make_test_file(letter) as testfile:
+ with self.make_test_file(letter.encode()) as testfile:
cwriter.write_file(testfile.name, letter)
self.assertEqual(
cwriter.manifest_text(),
with self.make_test_file() as testfile:
cwriter.write_file(testfile.name, 'test')
orig_mtime = os.fstat(testfile.fileno()).st_mtime
- testfile.write('extra')
+ testfile.write(b'extra')
testfile.flush()
os.utime(testfile.name, (orig_mtime, orig_mtime))
self.assertRaises(arvados.errors.StaleWriterStateError,
@tutil.skip_sleep
class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
- def mock_get_collection(self, api_mock, code, body):
- body = self.API_COLLECTIONS.get(body)
+ def mock_get_collection(self, api_mock, code, fixturename):
+ body = self.API_COLLECTIONS.get(fixturename)
self._mock_api_call(api_mock.collections().get, code, body)
def api_client_mock(self, status=200):
reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
num_retries=3)
with tutil.mock_keep_responses('foo', 500, 500, 200):
- self.assertEqual('foo',
- ''.join(f.read(9) for f in reader.all_files()))
+ self.assertEqual(b'foo',
+ b''.join(f.read(9) for f in reader.all_files()))
def test_read_nonnormalized_manifest_with_collection_reader(self):
# client should be able to use CollectionReader on a manifest without normalizing it
def test_open_collection_file_one_argument(self):
client = self.api_client_mock(200)
reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
- cfile = reader.open('./foo')
+ cfile = reader.open('./foo', 'rb')
self.check_open_file(cfile, '.', 'foo', 3)
def test_open_deep_file(self):
self.mock_get_collection(client, 200, coll_name)
reader = arvados.CollectionReader(
self.API_COLLECTIONS[coll_name]['uuid'], api_client=client)
- cfile = reader.open('./subdir2/subdir3/file2_in_subdir3.txt')
+ cfile = reader.open('./subdir2/subdir3/file2_in_subdir3.txt', 'rb')
self.check_open_file(cfile, './subdir2/subdir3', 'file2_in_subdir3.txt',
32)
kwargs.setdefault('api_client', self.api_client_mock())
writer = arvados.CollectionWriter(**kwargs)
writer.start_new_file('foo')
- writer.write('foo')
+ writer.write(b'foo')
return writer
def test_write_whole_collection(self):
with writer.open('out') as out_file:
self.assertEqual('.', writer.current_stream_name())
self.assertEqual('out', writer.current_file_name())
- out_file.write('test data')
+ out_file.write(b'test data')
data_loc = tutil.str_keep_locator('test data')
self.assertTrue(out_file.closed, "writer file not closed after context")
self.assertRaises(ValueError, out_file.write, 'extra text')
with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
writer = arvados.CollectionWriter(client)
with writer.open('flush_test') as out_file:
- out_file.write('flush1')
+ out_file.write(b'flush1')
out_file.flush()
- out_file.write('flush2')
+ out_file.write(b'flush2')
self.assertEqual(". {} {} 0:12:flush_test\n".format(data_loc1,
data_loc2),
writer.manifest_text())
client = self.api_client_mock()
writer = arvados.CollectionWriter(client)
with writer.open('.', '1') as out_file:
- out_file.write('1st')
+ out_file.write(b'1st')
with writer.open('.', '2') as out_file:
- out_file.write('2nd')
+ out_file.write(b'2nd')
data_loc = tutil.str_keep_locator('1st2nd')
with self.mock_keep(data_loc, 200) as keep_mock:
self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
writer = arvados.CollectionWriter(client)
with writer.open('file') as out_file:
- out_file.write('file')
+ out_file.write(b'file')
with writer.open('./dir', 'indir') as out_file:
- out_file.write('indir')
+ out_file.write(b'indir')
expected = ". {} 0:4:file\n./dir {} 0:5:indir\n".format(
data_loc1, data_loc2)
self.assertEqual(expected, writer.manifest_text())
self.assertRaises(arvados.errors.AssertionError, writer.open, 'two')
+class CollectionOpenModes(run_test_server.TestCaseWithServers):
+
+ def test_open_binary_modes(self):
+ c = Collection()
+ for mode in ['wb', 'wb+', 'ab', 'ab+']:
+ with c.open('foo', 'wb') as f:
+ f.write(b'foo')
+
+ def test_open_invalid_modes(self):
+ c = Collection()
+ for mode in ['+r', 'aa', '++', 'r+b', 'beer', '', None]:
+ with self.assertRaises(Exception):
+ c.open('foo', mode)
+
+ def test_open_text_modes(self):
+ c = Collection()
+ with c.open('foo', 'wb') as f:
+ f.write('foo')
+ for mode in ['r', 'rt', 'r+', 'rt+', 'w', 'wt', 'a', 'at']:
+ if sys.version_info >= (3, 0):
+ with self.assertRaises(NotImplementedError):
+ c.open('foo', mode)
+ else:
+ with c.open('foo', mode) as f:
+ if mode[0] == 'r' and '+' not in mode:
+ self.assertEqual('foo', f.read(3))
+ else:
+ f.write('bar')
+ f.seek(-3, os.SEEK_CUR)
+ self.assertEqual('bar', f.read(3))
+
+
class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
def test_replication_desired_kept_on_load(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
d = c2.diff(c1)
- self.assertEqual(d, [('del', './count2.txt', c2["count2.txt"]),
- ('add', './count1.txt', c1["count1.txt"])])
+ self.assertEqual(sorted(d), [
+ ('add', './count1.txt', c1["count1.txt"]),
+ ('del', './count2.txt', c2["count2.txt"]),
+ ])
d = c1.diff(c2)
- self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
- ('add', './count2.txt', c2["count2.txt"])])
+ self.assertEqual(sorted(d), [
+ ('add', './count2.txt', c2["count2.txt"]),
+ ('del', './count1.txt', c1["count1.txt"]),
+ ])
self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
c2 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt 10:20:count2.txt\n')
d = c2.diff(c1)
- self.assertEqual(d, [('del', './count2.txt', c2["count2.txt"]),
- ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
+ self.assertEqual(sorted(d), [
+ ('del', './count2.txt', c2["count2.txt"]),
+ ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
+ ])
d = c1.diff(c2)
- self.assertEqual(d, [('add', './count2.txt', c2["count2.txt"]),
- ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
+ self.assertEqual(sorted(d), [
+ ('add', './count2.txt', c2["count2.txt"]),
+ ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
+ ])
self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
c2 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
d = c2.diff(c1)
- self.assertEqual(d, [('del', './foo', c2["foo"]),
- ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
+ self.assertEqual(sorted(d), [
+ ('del', './foo', c2["foo"]),
+ ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
+ ])
d = c1.diff(c2)
- self.assertEqual(d, [('add', './foo', c2["foo"]),
- ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
-
+ self.assertEqual(sorted(d), [
+ ('add', './foo', c2["foo"]),
+ ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
+ ])
self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
def test_diff_del_add_in_subcollection(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
c2 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:3:count3.txt\n')
-
d = c2.diff(c1)
- self.assertEqual(d, [('del', './foo/count3.txt', c2.find("foo/count3.txt")),
- ('add', './foo/count2.txt', c1.find("foo/count2.txt")),
- ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
+ self.assertEqual(sorted(d), [
+ ('add', './foo/count2.txt', c1.find("foo/count2.txt")),
+ ('del', './foo/count3.txt', c2.find("foo/count3.txt")),
+ ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
+ ])
d = c1.diff(c2)
- self.assertEqual(d, [('del', './foo/count2.txt', c1.find("foo/count2.txt")),
- ('add', './foo/count3.txt', c2.find("foo/count3.txt")),
- ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
+ self.assertEqual(sorted(d), [
+ ('add', './foo/count3.txt', c2.find("foo/count3.txt")),
+ ('del', './foo/count2.txt', c1.find("foo/count2.txt")),
+ ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
+ ])
self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
c2 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:3:foo\n')
d = c2.diff(c1)
- self.assertEqual(d, [('mod', './foo', c2["foo"], c1["foo"]),
- ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
+ self.assertEqual(sorted(d), [
+ ('mod', './foo', c2["foo"], c1["foo"]),
+ ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
+ ])
d = c1.diff(c2)
- self.assertEqual(d, [('mod', './foo', c1["foo"], c2["foo"]),
- ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
+ self.assertEqual(sorted(d), [
+ ('mod', './foo', c1["foo"], c2["foo"]),
+ ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
+ ])
self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
d = c1.diff(c2)
- self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
- ('add', './count2.txt', c2["count2.txt"])])
- f = c1.open("count1.txt", "w")
- f.write("zzzzz")
+ self.assertEqual(sorted(d), [
+ ('add', './count2.txt', c2["count2.txt"]),
+ ('del', './count1.txt', c1["count1.txt"]),
+ ])
+ f = c1.open("count1.txt", "wb")
+ f.write(b"zzzzz")
# c1 changed, so it should not be deleted.
c1.apply(d)
c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt')
d = c1.diff(c2)
self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])])
- f = c1.open("count1.txt", "w")
- f.write("zzzzz")
+ f = c1.open("count1.txt", "wb")
+ f.write(b"zzzzz")
# c1 changed, so c2 mod will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+ self.assertRegex(
+ c1.portable_manifest_text(),
+ r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
def test_conflict_add(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt\n')
d = c1.diff(c2)
- self.assertEqual(d, [('del', './count2.txt', c1["count2.txt"]),
- ('add', './count1.txt', c2["count1.txt"])])
- f = c1.open("count1.txt", "w")
- f.write("zzzzz")
+ self.assertEqual(sorted(d), [
+ ('add', './count1.txt', c2["count1.txt"]),
+ ('del', './count2.txt', c1["count2.txt"]),
+ ])
+ f = c1.open("count1.txt", "wb")
+ f.write(b"zzzzz")
# c1 added count1.txt, so c2 add will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+ self.assertRegex(
+ c1.portable_manifest_text(),
+ r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
def test_conflict_del(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
# c1 deleted, so c2 mod will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+ self.assertRegex(
+ c1.portable_manifest_text(),
+ r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
def test_notify(self):
c1 = Collection()
events = []
c1.subscribe(lambda event, collection, name, item: events.append((event, collection, name, item)))
- f = c1.open("foo.txt", "w")
+ f = c1.open("foo.txt", "wb")
self.assertEqual(events[0], (arvados.collection.ADD, c1, "foo.txt", f.arvadosfile))
def test_open_w(self):
c1 = Collection(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n")
self.assertEqual(c1["count1.txt"].size(), 10)
- c1.open("count1.txt", "w").close()
+ c1.open("count1.txt", "wb").close()
self.assertEqual(c1["count1.txt"].size(), 0)
class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
def test_get_manifest_text_only_committed(self):
c = Collection()
- with c.open("count.txt", "w") as f:
+ with c.open("count.txt", "wb") as f:
# One file committed
- with c.open("foo.txt", "w") as foo:
- foo.write("foo")
+ with c.open("foo.txt", "wb") as foo:
+ foo.write(b"foo")
foo.flush() # Force block commit
- f.write("0123456789")
+ f.write(b"0123456789")
# Other file not committed. Block not written to keep yet.
self.assertEqual(
c._get_manifest_text(".",
def test_only_small_blocks_are_packed_together(self):
c = Collection()
- # Write a couple of small files,
+ # Write a couple of small files,
- f = c.open("count.txt", "w")
- f.write("0123456789")
+ f = c.open("count.txt", "wb")
+ f.write(b"0123456789")
f.close(flush=False)
- foo = c.open("foo.txt", "w")
- foo.write("foo")
+ foo = c.open("foo.txt", "wb")
+ foo.write(b"foo")
foo.close(flush=False)
# Then, write a big file, it shouldn't be packed with the ones above
- big = c.open("bigfile.txt", "w")
- big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+ big = c.open("bigfile.txt", "wb")
+ big.write(b"x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
big.close(flush=False)
self.assertEqual(
c.manifest_text("."),
'. 2d303c138c118af809f39319e5d507e9+34603008 a8430a058b8fbf408e1931b794dbd6fb+13 0:34603008:bigfile.txt 34603008:10:count.txt 34603018:3:foo.txt\n')
+ def test_flush_after_small_block_packing(self):
+ c = Collection()
+ # Write a couple of small files,
+ f = c.open("count.txt", "w")
+ f.write("0123456789")
+ f.close(flush=False)
+ foo = c.open("foo.txt", "w")
+ foo.write("foo")
+ foo.close(flush=False)
+
+ self.assertEqual(
+ c.manifest_text(),
+ '. a8430a058b8fbf408e1931b794dbd6fb+13 0:10:count.txt 10:3:foo.txt\n')
+
+ f = c.open("count.txt", "r+")
+ f.close(flush=True)
+
+ self.assertEqual(
+ c.manifest_text(),
+ '. a8430a058b8fbf408e1931b794dbd6fb+13 0:10:count.txt 10:3:foo.txt\n')
+
+ def test_write_after_small_block_packing2(self):
+ c = Collection()
+ # Write a couple of small files,
+ f = c.open("count.txt", "w")
+ f.write("0123456789")
+ f.close(flush=False)
+ foo = c.open("foo.txt", "w")
+ foo.write("foo")
+ foo.close(flush=False)
+
+ self.assertEqual(
+ c.manifest_text(),
+ '. a8430a058b8fbf408e1931b794dbd6fb+13 0:10:count.txt 10:3:foo.txt\n')
+
+ f = c.open("count.txt", "r+")
+ f.write("abc")
+ f.close(flush=False)
+
+ self.assertEqual(
+ c.manifest_text(),
+ '. 900150983cd24fb0d6963f7d28e17f72+3 a8430a058b8fbf408e1931b794dbd6fb+13 0:3:count.txt 6:7:count.txt 13:3:foo.txt\n')
+
+
+ def test_small_block_packing_with_overwrite(self):
+ c = Collection()
+ c.open("b1", "w").close()
+ c["b1"].writeto(0, "b1", 0)
+
+ c.open("b2", "w").close()
+ c["b2"].writeto(0, "b2", 0)
+
+ c["b1"].writeto(0, "1b", 0)
+
+ self.assertEquals(c.manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 0:2:b1 2:2:b2\n")
+ self.assertEquals(c["b1"].manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 0:2:b1\n")
+ self.assertEquals(c["b2"].manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 2:2:b2\n")
+
class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
self.assertEqual(c.portable_data_hash(), "d41d8cd98f00b204e9800998ecf8427e+0")
self.assertEqual(c.api_response()["portable_data_hash"], "d41d8cd98f00b204e9800998ecf8427e+0" )
- with c.open("count.txt", "w") as f:
- f.write("0123456789")
+ with c.open("count.txt", "wb") as f:
+ f.write(b"0123456789")
self.assertEqual(c.portable_manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n")
def test_create_and_save(self):
c = self.create_count_txt()
c.save()
- self.assertRegexpMatches(c.manifest_text(), r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
+ self.assertRegex(
+ c.manifest_text(),
+ r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
def test_create_and_save_new(self):
c = self.create_count_txt()
c.save_new()
- self.assertRegexpMatches(c.manifest_text(), r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
+ self.assertRegex(
+ c.manifest_text(),
+ r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
def test_create_diff_apply(self):
c1 = self.create_count_txt()
c1.save()
c2 = Collection(c1.manifest_locator())
- with c2.open("count.txt", "w") as f:
- f.write("abcdefg")
+ with c2.open("count.txt", "wb") as f:
+ f.write(b"abcdefg")
diff = c1.diff(c2)
c1.save()
c2 = arvados.collection.Collection(c1.manifest_locator())
- with c2.open("count.txt", "w") as f:
- f.write("abcdefg")
+ with c2.open("count.txt", "wb") as f:
+ f.write(b"abcdefg")
c2.save()
c1 = self.create_count_txt()
c1.save()
- with c1.open("count.txt", "w") as f:
- f.write("XYZ")
+ with c1.open("count.txt", "wb") as f:
+ f.write(b"XYZ")
c2 = arvados.collection.Collection(c1.manifest_locator())
- with c2.open("count.txt", "w") as f:
- f.write("abcdefg")
+ with c2.open("count.txt", "wb") as f:
+ f.write(b"abcdefg")
c2.save()
c1.update()
- self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+ self.assertRegex(
+ c1.manifest_text(),
+ r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
if __name__ == '__main__':