11308: Merge branch 'master' into 11308-python3
authorTom Clegg <tom@curoverse.com>
Tue, 2 May 2017 17:02:32 +0000 (13:02 -0400)
committerTom Clegg <tom@curoverse.com>
Tue, 2 May 2017 17:02:32 +0000 (13:02 -0400)
Conflicts:
sdk/python/arvados/arvfile.py
sdk/python/arvados/commands/put.py
sdk/python/tests/test_collections.py

1  2 
sdk/python/arvados/_ranges.py
sdk/python/arvados/arvfile.py
sdk/python/arvados/commands/put.py
sdk/python/tests/run_test_server.py
sdk/python/tests/test_arv_put.py
sdk/python/tests/test_collections.py

index 1d0a793e2c79cf72eccb13c93bb687397672ca08,b4368ad3cd72227ec3a42f9556f67add5978bde9..669a61c8dfdf5cb7bdc8739f6a736e64fb968bae
@@@ -1,5 -1,3 +1,5 @@@
 +from __future__ import division
 +from builtins import object
  import logging
  
  _logger = logging.getLogger('arvados.ranges')
@@@ -26,14 -24,14 +26,14 @@@ class Range(object)
                  self.segment_offset == other.segment_offset)
  
  def first_block(data_locators, range_start):
 -    block_start = 0L
 +    block_start = 0
  
      # range_start/block_start is the inclusive lower bound
      # range_end/block_end is the exclusive upper bound
  
      hi = len(data_locators)
      lo = 0
 -    i = int((hi + lo) / 2)
 +    i = (hi + lo) // 2
      block_size = data_locators[i].range_size
      block_start = data_locators[i].range_start
      block_end = block_start + block_size
@@@ -49,7 -47,7 +49,7 @@@
              lo = i
          else:
              hi = i
 -        i = int((hi + lo) / 2)
 +        i = (hi + lo) // 2
          block_size = data_locators[i].range_size
          block_start = data_locators[i].range_start
          block_end = block_start + block_size
@@@ -192,7 -190,7 +192,7 @@@ def replace_range(data_locators, new_ra
              # range ends before this segment starts, so don't look at any more locators
              break
  
-         if  old_segment_start <= new_range_start and new_range_end <= old_segment_end:
+         if old_segment_start <= new_range_start and new_range_end <= old_segment_end:
              # new range starts and ends in old segment
              # split segment into up to 3 pieces
              if (new_range_start-old_segment_start) > 0:
index 931a3a198db9407fd72b274225821cc3f12ca6d7,a2ec76a0761cb43a43731e5e31e9638f0dbf3019..2fc9c73afe031543ab7afb67edcd2e7a5c4c7d6f
@@@ -1,30 -1,22 +1,30 @@@
 -import functools
 -import os
 -import zlib
 +from __future__ import absolute_import
 +from __future__ import division
 +from future import standard_library
 +from future.utils import listitems, listvalues
 +standard_library.install_aliases()
 +from builtins import range
 +from builtins import object
  import bz2
 -import config
 -import hashlib
 -import threading
 -import Queue
 +import collections
  import copy
  import errno
 -import re
 +import functools
 +import hashlib
  import logging
 -import collections
 +import os
 +import queue
 +import re
 +import sys
 +import threading
  import uuid
 +import zlib
  
 +from . import config
  from .errors import KeepWriteError, AssertionError, ArgumentError
  from .keep import KeepLocator
  from ._normalize_stream import normalize_stream
- from ._ranges import locators_and_ranges, replace_range, Range
+ from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
  from .retry import retry_method
  
  MOD = "mod"
@@@ -84,10 -76,7 +84,10 @@@ class _FileLikeObjectBase(object)
  class ArvadosFileReaderBase(_FileLikeObjectBase):
      def __init__(self, name, mode, num_retries=None):
          super(ArvadosFileReaderBase, self).__init__(name, mode)
 -        self._filepos = 0L
 +        self._binary = 'b' in mode
 +        if sys.version_info >= (3, 0) and not self._binary:
 +            raise NotImplementedError("text mode {!r} is not implemented".format(mode))
 +        self._filepos = 0
          self.num_retries = num_retries
          self._readline_cache = (None, None)
  
              pos += self._filepos
          elif whence == os.SEEK_END:
              pos += self.size()
 -        if pos < 0L:
 +        if pos < 0:
              raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
          self._filepos = pos
          return self._filepos
      def readall(self, size=2**20, num_retries=None):
          while True:
              data = self.read(size, num_retries=num_retries)
 -            if data == '':
 +            if len(data) == 0:
                  break
              yield data
  
              data = [cache_data]
              self._filepos += len(cache_data)
          else:
 -            data = ['']
 +            data = [b'']
          data_size = len(data[-1])
 -        while (data_size < size) and ('\n' not in data[-1]):
 +        while (data_size < size) and (b'\n' not in data[-1]):
              next_read = self.read(2 ** 20, num_retries=num_retries)
              if not next_read:
                  break
              data.append(next_read)
              data_size += len(next_read)
 -        data = ''.join(data)
 +        data = b''.join(data)
          try:
 -            nextline_index = data.index('\n') + 1
 +            nextline_index = data.index(b'\n') + 1
          except ValueError:
              nextline_index = len(data)
          nextline_index = min(nextline_index, size)
          self._filepos -= len(data) - nextline_index
          self._readline_cache = (self.tell(), data[nextline_index:])
 -        return data[:nextline_index]
 +        return data[:nextline_index].decode()
  
      @_FileLikeObjectBase._before_close
      @retry_method
              data_size += len(s)
              if data_size >= sizehint:
                  break
 -        return ''.join(data).splitlines(True)
 +        return b''.join(data).decode().splitlines(True)
  
      def size(self):
          raise IOError(errno.ENOSYS, "Not implemented")
@@@ -229,15 -218,15 +229,15 @@@ class StreamFileReader(ArvadosFileReade
      def read(self, size, num_retries=None):
          """Read up to 'size' bytes from the stream, starting at the current file position"""
          if size == 0:
 -            return ''
 +            return b''
  
 -        data = ''
 +        data = b''
          available_chunks = locators_and_ranges(self.segments, self._filepos, size)
          if available_chunks:
              lr = available_chunks[0]
              data = self._stream.readfrom(lr.locator+lr.segment_offset,
 -                                          lr.segment_size,
 -                                          num_retries=num_retries)
 +                                         lr.segment_size,
 +                                         num_retries=num_retries)
  
          self._filepos += len(data)
          return data
      def readfrom(self, start, size, num_retries=None):
          """Read up to 'size' bytes from the stream, starting at 'start'"""
          if size == 0:
 -            return ''
 +            return b''
  
          data = []
          for lr in locators_and_ranges(self.segments, start, size):
              data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
                                                num_retries=num_retries))
 -        return ''.join(data)
 +        return b''.join(data)
  
      def as_manifest(self):
          segs = []
@@@ -300,6 -289,7 +300,7 @@@ class _BufferBlock(object)
      PENDING = 1
      COMMITTED = 2
      ERROR = 3
+     DELETED = 4
  
      def __init__(self, blockid, starting_capacity, owner):
          """
  
          """
          if self._state == _BufferBlock.WRITABLE:
 +            if not isinstance(data, bytes) and not isinstance(data, memoryview):
 +                data = data.encode()
              while (self.write_pointer+len(data)) > len(self.buffer_block):
                  new_buffer_block = bytearray(len(self.buffer_block) * 2)
                  new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
  
      @synchronized
      def clear(self):
+         self._state = _BufferBlock.DELETED
          self.owner = None
          self.buffer_block = None
          self.buffer_view = None
  
+     @synchronized
+     def repack_writes(self):
+         """Optimize buffer block by repacking segments in file sequence.
+         When the client makes random writes, they appear in the buffer block in
+         the sequence they were written rather than the sequence they appear in
+         the file.  This makes for inefficient, fragmented manifests.  Attempt
+         to optimize by repacking writes in file sequence.
+         """
+         if self._state != _BufferBlock.WRITABLE:
+             raise AssertionError("Cannot repack non-writable block")
+         segs = self.owner.segments()
+         # Collect the segments that reference the buffer block.
+         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
+         # Collect total data referenced by segments (could be smaller than
+         # bufferblock size if a portion of the file was written and
+         # then overwritten).
+         write_total = sum([s.range_size for s in bufferblock_segs])
+         if write_total < self.size() or len(bufferblock_segs) > 1:
+             # If there's more than one segment referencing this block, it is
+             # due to out-of-order writes and will produce a fragmented
+             # manifest, so try to optimize by re-packing into a new buffer.
+             contents = self.buffer_view[0:self.write_pointer].tobytes()
+             new_bb = _BufferBlock(None, write_total, None)
+             for t in bufferblock_segs:
+                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
+                 t.segment_offset = new_bb.size() - t.range_size
+             self.buffer_block = new_bb.buffer_block
+             self.buffer_view = new_bb.buffer_view
+             self.write_pointer = new_bb.write_pointer
+             self._locator = None
+             new_bb.clear()
+             self.owner.set_segments(segs)
+     def __repr__(self):
+         return "<BufferBlock %s>" % (self.blockid)
  
  class NoopLock(object):
      def __enter__(self):
@@@ -473,7 -505,7 +518,7 @@@ class _BlockManager(object)
  
      def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
          if blockid is None:
-             blockid = "%s" % uuid.uuid4()
+             blockid = str(uuid.uuid4())
          bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
          self._bufferblocks[bufferblock.blockid] = bufferblock
          return bufferblock
            ArvadosFile that owns the new block
  
          """
-         new_blockid = "bufferblock%i" % len(self._bufferblocks)
+         new_blockid = str(uuid.uuid4())
          bufferblock = block.clone(new_blockid, owner)
          self._bufferblocks[bufferblock.blockid] = bufferblock
          return bufferblock
                  # blocks pending.  If they are full 64 MiB blocks, that means up to
                  # 256 MiB of internal buffering, which is the same size as the
                  # default download block cache in KeepClient.
 -                self._put_queue = Queue.Queue(maxsize=2)
 +                self._put_queue = queue.Queue(maxsize=2)
  
                  self._put_threads = []
 -                for i in xrange(0, self.num_put_threads):
 +                for i in range(0, self.num_put_threads):
                      thread = threading.Thread(target=self._commit_bufferblock_worker)
                      self._put_threads.append(thread)
                      thread.daemon = True
      @synchronized
      def start_get_threads(self):
          if self._prefetch_threads is None:
 -            self._prefetch_queue = Queue.Queue()
 +            self._prefetch_queue = queue.Queue()
              self._prefetch_threads = []
 -            for i in xrange(0, self.num_get_threads):
 +            for i in range(0, self.num_get_threads):
                  thread = threading.Thread(target=self._block_prefetch_worker)
                  self._prefetch_threads.append(thread)
                  thread.daemon = True
      @synchronized
      def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
          """Packs small blocks together before uploading"""
          self._pending_write_size += closed_file_size
  
          # Check if there are enough small blocks for filling up one in full
-         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+             return
  
-             # Search blocks ready for getting packed together before being committed to Keep.
-             # A WRITABLE block always has an owner.
-             # A WRITABLE block with its owner.closed() implies that it's
-             # size is <= KEEP_BLOCK_SIZE/2.
-             try:
-                 small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
-             except AttributeError:
-                 # Writable blocks without owner shouldn't exist.
-                 raise UnownedBlockError()
 -        # Search blocks ready for getting packed together before being committed to Keep.
++        # Search blocks ready for getting packed together before being
++        # committed to Keep.
+         # A WRITABLE block always has an owner.
 -        # A WRITABLE block with its owner.closed() implies that it's
++        # A WRITABLE block with its owner.closed() implies that its
+         # size is <= KEEP_BLOCK_SIZE/2.
+         try:
 -            small_blocks = [b for b in self._bufferblocks.values()
++            small_blocks = [b for b in listvalues(self._bufferblocks)
+                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+         except AttributeError:
+             # Writable blocks without owner shouldn't exist.
+             raise UnownedBlockError()
+         if len(small_blocks) <= 1:
+             # Not enough small blocks for repacking
+             return
  
-             if len(small_blocks) <= 1:
-                 # Not enough small blocks for repacking
-                 return
+         for bb in small_blocks:
+             bb.repack_writes()
  
-             # Update the pending write size count with its true value, just in case
-             # some small file was opened, written and closed several times.
-             self._pending_write_size = sum([b.size() for b in small_blocks])
-             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
-                 return
+         # Update the pending write size count with its true value, just in case
+         # some small file was opened, written and closed several times.
+         self._pending_write_size = sum([b.size() for b in small_blocks])
  
-             new_bb = self._alloc_bufferblock()
-             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
-                 bb = small_blocks.pop(0)
-                 arvfile = bb.owner
-                 self._pending_write_size -= bb.size()
-                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
-                 arvfile.set_segments([Range(new_bb.blockid,
-                                             0,
-                                             bb.size(),
-                                             new_bb.write_pointer - bb.size())])
-                 self._delete_bufferblock(bb.blockid)
-             self.commit_bufferblock(new_bb, sync=sync)
+         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+             return
+         new_bb = self._alloc_bufferblock()
+         files = []
+         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+             bb = small_blocks.pop(0)
+             self._pending_write_size -= bb.size()
+             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+             files.append((bb, new_bb.write_pointer - bb.size()))
+         self.commit_bufferblock(new_bb, sync=sync)
+         for bb, new_bb_segment_offset in files:
+             newsegs = bb.owner.segments()
+             for s in newsegs:
+                 if s.locator == bb.blockid:
+                     s.locator = new_bb.locator()
+                     s.segment_offset = new_bb_segment_offset+s.segment_offset
+             bb.owner.set_segments(newsegs)
+             self._delete_bufferblock(bb.blockid)
  
      def commit_bufferblock(self, block, sync):
          """Initiate a background upload of a bufferblock.
          self.repack_small_blocks(force=True, sync=True)
  
          with self.lock:
 -            items = self._bufferblocks.items()
 +            items = listitems(self._bufferblocks)
  
          for k,v in items:
              if v.state() != _BufferBlock.COMMITTED and v.owner:
@@@ -865,7 -910,7 +924,7 @@@ class ArvadosFile(object)
          with self.lock:
              if len(self._segments) != len(othersegs):
                  return False
 -            for i in xrange(0, len(othersegs)):
 +            for i in range(0, len(othersegs)):
                  seg1 = self._segments[i]
                  seg2 = othersegs[i]
                  loc1 = seg1.locator
          """
          self._writers.remove(writer)
  
 -        if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
 +        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
              # File writer closed, not small enough for repacking
              self.flush()
          elif self.closed():
  
          with self.lock:
              if size == 0 or offset >= self.size():
 -                return ''
 +                return b''
              readsegs = locators_and_ranges(self._segments, offset, size)
              prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
  
                  self.parent._my_block_manager().block_prefetch(lr.locator)
                  locs.add(lr.locator)
  
 -        return ''.join(data)
 +        return b''.join(data)
  
-     def _repack_writes(self, num_retries):
-         """Optimize buffer block by repacking segments in file sequence.
-         When the client makes random writes, they appear in the buffer block in
-         the sequence they were written rather than the sequence they appear in
-         the file.  This makes for inefficient, fragmented manifests.  Attempt
-         to optimize by repacking writes in file sequence.
-         """
-         segs = self._segments
-         # Collect the segments that reference the buffer block.
-         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
-         # Collect total data referenced by segments (could be smaller than
-         # bufferblock size if a portion of the file was written and
-         # then overwritten).
-         write_total = sum([s.range_size for s in bufferblock_segs])
-         if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
-             # If there's more than one segment referencing this block, it is
-             # due to out-of-order writes and will produce a fragmented
-             # manifest, so try to optimize by re-packing into a new buffer.
-             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
-             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
-             for t in bufferblock_segs:
-                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
-                 t.segment_offset = new_bb.size() - t.range_size
-             self._current_bblock = new_bb
      @must_be_writable
      @synchronized
      def writeto(self, offset, data, num_retries):
          necessary.
  
          """
 +        if not isinstance(data, bytes) and not isinstance(data, memoryview):
 +            data = data.encode()
          if len(data) == 0:
              return
  
              self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
  
          if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-             self._repack_writes(num_retries)
+             self._current_bblock.repack_writes()
              if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
                  self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
                  self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
  
          if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
              if self._current_bblock.state() == _BufferBlock.WRITABLE:
-                 self._repack_writes(num_retries)
-             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
+                 self._current_bblock.repack_writes()
+             if self._current_bblock.state() != _BufferBlock.DELETED:
+                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
  
          if sync:
              to_delete = set()
                        normalize=False, only_committed=False):
          buf = ""
          filestream = []
-         for segment in self.segments:
+         for segment in self._segments:
              loc = segment.locator
              if self.parent._my_block_manager().is_bufferblock(loc):
                  if only_committed:
                      continue
-                 loc = self._bufferblocks[loc].calculate_locator()
+                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
              if portable_locators:
                  loc = KeepLocator(loc).stripped()
-             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
                                   segment.segment_offset, segment.range_size))
-         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
+         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
          buf += "\n"
          return buf
  
@@@ -1185,8 -1198,8 +1214,8 @@@ class ArvadosFileReader(ArvadosFileRead
  
      """
  
 -    def __init__(self, arvadosfile, num_retries=None):
 -        super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
 +    def __init__(self, arvadosfile, mode="r", num_retries=None):
 +        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
          self.arvadosfile = arvadosfile
  
      def size(self):
                  data.append(rd)
                  self._filepos += len(rd)
                  rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
 -            return ''.join(data)
 +            return b''.join(data)
          else:
              data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
              self._filepos += len(data)
@@@ -1238,7 -1251,8 +1267,7 @@@ class ArvadosFileWriter(ArvadosFileRead
      """
  
      def __init__(self, arvadosfile, mode, num_retries=None):
 -        super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
 -        self.mode = mode
 +        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
          self.arvadosfile.add_writer(self)
  
      def writable(self):
      @retry_method
      def write(self, data, num_retries=None):
          if self.mode[0] == "a":
 -            self.arvadosfile.writeto(self.size(), data, num_retries)
 -        else:
 -            self.arvadosfile.writeto(self._filepos, data, num_retries)
 -            self._filepos += len(data)
 +            self._filepos = self.size()
 +        self.arvadosfile.writeto(self._filepos, data, num_retries)
 +        self._filepos += len(data)
          return len(data)
  
      @_FileLikeObjectBase._before_close
index ed9d55cfc2df31a2b175254a441b4aa9a046cb02,6836d803886291a57dcc39b34c3dcb2c96a8c515..12f93298bba6b3ef2cf461576b2816e88790cae2
@@@ -1,7 -1,8 +1,7 @@@
 -#!/usr/bin/env python
 -
 -# TODO:
 -# --md5sum - display md5 of each file as read from disk
 -
 +from __future__ import division
 +from future.utils import listitems, listvalues
 +from builtins import str
 +from builtins import object
  import argparse
  import arvados
  import arvados.collection
@@@ -184,6 -185,16 +184,16 @@@ _group.add_argument('--no-resume', acti
  Do not continue interrupted uploads from cached state.
  """)
  
+ _group = run_opts.add_mutually_exclusive_group()
+ _group.add_argument('--follow-links', action='store_true', default=True,
+                     dest='follow_links', help="""
+ Follow file and directory symlinks (default).
+ """)
+ _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
+                     help="""
+ Do not follow file and directory symlinks.
+ """)
  _group = run_opts.add_mutually_exclusive_group()
  _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
                      help="""
@@@ -204,7 -215,7 +214,7 @@@ def parse_arguments(arguments)
      if len(args.paths) == 0:
          args.paths = ['-']
  
 -    args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
 +    args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
  
      if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
          if args.filename:
      return args
  
  
+ class PathDoesNotExistError(Exception):
+     pass
  class CollectionUpdateError(Exception):
      pass
  
@@@ -277,13 -292,13 +291,13 @@@ class ResumeCache(object)
      @classmethod
      def make_path(cls, args):
          md5 = hashlib.md5()
 -        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
 +        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
          realpaths = sorted(os.path.realpath(path) for path in args.paths)
 -        md5.update('\0'.join(realpaths))
 +        md5.update(b'\0'.join([p.encode() for p in realpaths]))
          if any(os.path.isdir(path) for path in realpaths):
 -            md5.update("-1")
 +            md5.update(b'-1')
          elif args.filename:
 -            md5.update(args.filename)
 +            md5.update(args.filename.encode())
          return os.path.join(
              arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
              md5.hexdigest())
@@@ -360,7 -375,8 +374,8 @@@ class ArvPutUploadJob(object)
                   ensure_unique_name=False, num_retries=None,
                   put_threads=None, replication_desired=None,
                   filename=None, update_time=60.0, update_collection=None,
-                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
+                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
+                  follow_links=True):
          self.paths = paths
          self.resume = resume
          self.use_cache = use_cache
          self.logger = logger
          self.dry_run = dry_run
          self._checkpoint_before_quit = True
+         self.follow_links = follow_links
  
          if not self.use_cache and self.resume:
              raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
                      if self.dry_run:
                          raise ArvPutUploadIsPending()
                      self._write_stdin(self.filename or 'stdin')
+                 elif not os.path.exists(path):
+                      raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
                  elif os.path.isdir(path):
                      # Use absolute paths on cache index so CWD doesn't interfere
                      # with the caching logic.
                      prefixdir = path = os.path.abspath(path)
                      if prefixdir != '/':
                          prefixdir += '/'
-                     for root, dirs, files in os.walk(path):
+                     for root, dirs, files in os.walk(path, followlinks=self.follow_links):
                          # Make os.walk()'s dir traversing order deterministic
                          dirs.sort()
                          files.sort()
          except (SystemExit, Exception) as e:
              self._checkpoint_before_quit = False
              # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
--            # Note: We're expecting SystemExit instead of KeyboardInterrupt because
--            #   we have a custom signal handler in place that raises SystemExit with
--            #   the catched signal's code.
-             if not isinstance(e, SystemExit) or e.code != -2:
++            # Note: We're expecting SystemExit instead of
++            # KeyboardInterrupt because we have a custom signal
++            # handler in place that raises SystemExit with the catched
++            # signal's code.
+             if isinstance(e, PathDoesNotExistError):
+                 # We aren't interested in the traceback for this case
+                 pass
+             elif not isinstance(e, SystemExit) or e.code != -2:
 -                self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
 +                self.logger.warning("Abnormal termination:\n{}".format(
 +                    traceback.format_exc()))
              raise
          finally:
              if not self.dry_run:
          Recursively get the total size of the collection
          """
          size = 0
 -        for item in collection.values():
 +        for item in listvalues(collection):
              if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                  size += self._collection_size(item)
              else:
              self.reporter(self.bytes_written, self.bytes_expected)
  
      def _write_stdin(self, filename):
 -        output = self._local_collection.open(filename, 'w')
 +        output = self._local_collection.open(filename, 'wb')
          self._write(sys.stdin, output)
          output.close()
  
      def _check_file(self, source, filename):
-         """Check if this file needs to be uploaded"""
+         """
+         Check if this file needs to be uploaded
+         """
+         # Ignore symlinks when requested
+         if (not self.follow_links) and os.path.islink(source):
+             return
          resume_offset = 0
          should_upload = False
          new_file_in_cache = False
  
      def _upload_files(self):
          for source, resume_offset, filename in self._files_to_upload:
 -            with open(source, 'r') as source_fd:
 +            with open(source, 'rb') as source_fd:
                  with self._state_lock:
                      self._state['files'][source]['mtime'] = os.path.getmtime(source)
                      self._state['files'][source]['size'] = os.path.getsize(source)
                  if resume_offset > 0:
                      # Start upload where we left off
 -                    output = self._local_collection.open(filename, 'a')
 +                    output = self._local_collection.open(filename, 'ab')
                      source_fd.seek(resume_offset)
                  else:
                      # Start from scratch
 -                    output = self._local_collection.open(filename, 'w')
 +                    output = self._local_collection.open(filename, 'wb')
                  self._write(source_fd, output)
                  output.close(flush=False)
  
          if self.use_cache:
              # Set up cache file name from input paths.
              md5 = hashlib.md5()
 -            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
 +            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
              realpaths = sorted(os.path.realpath(path) for path in self.paths)
 -            md5.update('\0'.join(realpaths))
 +            md5.update(b'\0'.join([p.encode() for p in realpaths]))
              if self.filename:
 -                md5.update(self.filename)
 +                md5.update(self.filename.encode())
              cache_filename = md5.hexdigest()
              cache_filepath = os.path.join(
                  arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
      def collection_file_paths(self, col, path_prefix='.'):
          """Return a list of file paths by recursively go through the entire collection `col`"""
          file_paths = []
 -        for name, item in col.items():
 +        for name, item in listitems(col):
              if isinstance(item, arvados.arvfile.ArvadosFile):
                  file_paths.append(os.path.join(path_prefix, name))
              elif isinstance(item, arvados.collection.Subcollection):
              state = json.dumps(self._state)
          try:
              new_cache = tempfile.NamedTemporaryFile(
 +                mode='w+',
                  dir=os.path.dirname(self._cache_filename), delete=False)
              self._lock_file(new_cache)
              new_cache.write(state)
  
      def portable_data_hash(self):
          pdh = self._my_collection().portable_data_hash()
 -        m = self._my_collection().stripped_manifest()
 -        local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
 +        m = self._my_collection().stripped_manifest().encode()
 +        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
          if pdh != local_pdh:
              logger.warning("\n".join([
                  "arv-put: API server provided PDH differs from local manifest.",
                      locators.append(loc)
                  return locators
          elif isinstance(item, arvados.collection.Collection):
 -            l = [self._datablocks_on_item(x) for x in item.values()]
 +            l = [self._datablocks_on_item(x) for x in listvalues(item)]
              # Fast list flattener method taken from:
              # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
              return [loc for sublist in l for loc in sublist]
          return datablocks
  
  
- def expected_bytes_for(pathlist):
+ def expected_bytes_for(pathlist, follow_links=True):
      # Walk the given directory trees and stat files, adding up file sizes,
      # so we can display progress as percent
      bytesum = 0
      for path in pathlist:
          if os.path.isdir(path):
-             for filename in arvados.util.listdir_recursive(path):
-                 bytesum += os.path.getsize(os.path.join(path, filename))
+             for root, dirs, files in os.walk(path, followlinks=follow_links):
+                 # Sum file sizes
+                 for f in files:
+                     filepath = os.path.join(root, f)
+                     # Ignore symlinked files when requested
+                     if (not follow_links) and os.path.islink(filepath):
+                         continue
+                     bytesum += os.path.getsize(filepath)
          elif not os.path.isfile(path):
              return None
          else:
@@@ -894,7 -925,7 +927,7 @@@ def main(arguments=None, stdout=sys.std
      # uploaded, the expected bytes calculation can take a moment.
      if args.progress and any([os.path.isdir(f) for f in args.paths]):
          logger.info("Calculating upload size, this could take some time...")
-     bytes_expected = expected_bytes_for(args.paths)
+     bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
  
      try:
          writer = ArvPutUploadJob(paths = args.paths,
                                   ensure_unique_name = True,
                                   update_collection = args.update_collection,
                                   logger=logger,
-                                  dry_run=args.dry_run)
+                                  dry_run=args.dry_run,
+                                  follow_links=args.follow_links)
      except ResumeCacheConflict:
          logger.error("\n".join([
              "arv-put: Another process is already uploading this data.",
      except ArvPutUploadNotPending:
          # No files pending for upload
          sys.exit(0)
+     except PathDoesNotExistError as error:
+         logger.error("\n".join([
+             "arv-put: %s" % str(error)]))
+         sys.exit(1)
  
      if args.progress:  # Print newline to split stderr from stdout for humans.
          logger.info("\n")
          if not output.endswith('\n'):
              stdout.write('\n')
  
 -    for sigcode, orig_handler in orig_signal_handlers.items():
 +    for sigcode, orig_handler in listitems(orig_signal_handlers):
          signal.signal(sigcode, orig_handler)
  
      if status != 0:
index 5aff52b3c3c206a68ea0a7e1e156d795860301d9,4c6c2d3b9d97fe587b6d897f5f896d5be59e6c32..cca51b8a437e8d8d76c9101e0d2478fb7adc2840
@@@ -1,7 -1,6 +1,7 @@@
 -#!/usr/bin/env python
 -
  from __future__ import print_function
 +from __future__ import division
 +from builtins import str
 +from builtins import range
  import argparse
  import atexit
  import errno
@@@ -97,7 -96,7 +97,7 @@@ def kill_server_pid(pidfile, wait=10, p
          # Use up to half of the +wait+ period waiting for "passenger
          # stop" to work. If the process hasn't exited by then, start
          # sending TERM signals.
 -        startTERM += wait/2
 +        startTERM += wait//2
  
      server_pid = None
      while now <= deadline and server_pid is None:
@@@ -212,7 -211,7 +212,7 @@@ def _fifo2stderr(label)
      except OSError as error:
          if error.errno != errno.ENOENT:
              raise
 -    os.mkfifo(fifo, 0700)
 +    os.mkfifo(fifo, 0o700)
      subprocess.Popen(
          ['stdbuf', '-i0', '-oL', '-eL', 'sed', '-e', 's/^/['+label+'] /', fifo],
          stdout=sys.stderr)
@@@ -315,10 -314,7 +315,7 @@@ def run(leave_running_atexit=False)
      env = os.environ.copy()
      env['RAILS_ENV'] = 'test'
      env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
-     if env.get('ARVADOS_TEST_EXPERIMENTAL_WS'):
-         env.pop('ARVADOS_WEBSOCKETS', None)
-     else:
-         env['ARVADOS_WEBSOCKETS'] = 'yes'
+     env.pop('ARVADOS_WEBSOCKETS', None)
      env.pop('ARVADOS_TEST_API_HOST', None)
      env.pop('ARVADOS_API_HOST', None)
      env.pop('ARVADOS_API_HOST_INSECURE', None)
@@@ -440,7 -436,7 +437,7 @@@ def _start_keep(n, keep_args)
                  "-listen=:{}".format(port),
                  "-pid="+_pidfile('keep{}'.format(n))]
  
 -    for arg, val in keep_args.iteritems():
 +    for arg, val in keep_args.items():
          keep_cmd.append("{}={}".format(arg, val))
  
      logf = open(_fifo2stderr('keep{}'.format(n)), 'w')
@@@ -737,7 -733,7 +734,7 @@@ class TestCaseWithServers(unittest.Test
  
      @staticmethod
      def _restore_dict(src, dest):
 -        for key in dest.keys():
 +        for key in list(dest.keys()):
              if key not in src:
                  del dest[key]
          dest.update(src)
index 083b8fc1ade6b7ef6fb0994193aef0b081835b07,320189104ab555dc97be4c618e83adc8d6acdd7f..6d103526f40ebc1b8de64cdf2956f1867cdcf538
@@@ -1,10 -1,8 +1,10 @@@
 -#!/usr/bin/env python
 -# -*- coding: utf-8 -*-
 -
 +from __future__ import absolute_import
 +from __future__ import division
 +from future import standard_library
 +standard_library.install_aliases()
 +from builtins import str
 +from builtins import range
  import apiclient
 -import io
  import mock
  import os
  import pwd
@@@ -19,13 -17,16 +19,14 @@@ import yam
  import threading
  import hashlib
  import random
+ import uuid
  
 -from cStringIO import StringIO
 -
  import arvados
  import arvados.commands.put as arv_put
 -import arvados_testutil as tutil
 +from . import arvados_testutil as tutil
  
 -from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 -import run_test_server
 +from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 +from . import run_test_server
  
  class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
      CACHE_ARGSET = [
@@@ -257,8 -258,8 +258,8 @@@ class ArvPutUploadJobTest(run_test_serv
          _, self.large_file_name = tempfile.mkstemp()
          fileobj = open(self.large_file_name, 'w')
          # Make sure to write just a little more than one block
 -        for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
 -            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
 +        for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
 +            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
              fileobj.write(data)
          fileobj.close()
          # Temp dir containing small files to be repacked
              with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
                  f.write(data + str(i))
          self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
+         # Temp dir to hold a symlink to other temp dir
+         self.tempdir_with_symlink = tempfile.mkdtemp()
+         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
+         os.symlink(os.path.join(self.tempdir, '1'),
+                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
  
      def tearDown(self):
          super(ArvPutUploadJobTest, self).tearDown()
          shutil.rmtree(self.tempdir)
          os.unlink(self.large_file_name)
          shutil.rmtree(self.small_files_dir)
+         shutil.rmtree(self.tempdir_with_symlink)
+     def test_symlinks_are_followed_by_default(self):
+         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
+         cwriter.start(save_collection=False)
+         self.assertIn('linkeddir', cwriter.manifest_text())
+         self.assertIn('linkedfile', cwriter.manifest_text())
+         cwriter.destroy_cache()
+     def test_symlinks_are_not_followed_when_requested(self):
+         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
+                                           follow_links=False)
+         cwriter.start(save_collection=False)
+         self.assertNotIn('linkeddir', cwriter.manifest_text())
+         self.assertNotIn('linkedfile', cwriter.manifest_text())
+         cwriter.destroy_cache()
+     def test_passing_nonexistant_path_raise_exception(self):
+         uuid_str = str(uuid.uuid4())
+         cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
+         with self.assertRaises(arv_put.PathDoesNotExistError):
+             cwriter.start(save_collection=False)
  
      def test_writer_works_without_cache(self):
          cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
  
      def test_writer_works_with_cache(self):
          with tempfile.NamedTemporaryFile() as f:
 -            f.write('foo')
 +            f.write(b'foo')
              f.flush()
              cwriter = arv_put.ArvPutUploadJob([f.name])
              cwriter.start(save_collection=False)
 -            self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
 +            self.assertEqual(0, cwriter.bytes_skipped)
 +            self.assertEqual(3, cwriter.bytes_written)
              # Don't destroy the cache, and start another upload
              cwriter_new = arv_put.ArvPutUploadJob([f.name])
              cwriter_new.start(save_collection=False)
              cwriter_new.destroy_cache()
 -            self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
 +            self.assertEqual(3, cwriter_new.bytes_skipped)
 +            self.assertEqual(3, cwriter_new.bytes_written)
  
      def make_progress_tester(self):
          progression = []
  
      def test_progress_reporting(self):
          with tempfile.NamedTemporaryFile() as f:
 -            f.write('foo')
 +            f.write(b'foo')
              f.flush()
              for expect_count in (None, 8):
                  progression, reporter = self.make_progress_tester()
@@@ -526,7 -552,7 +554,7 @@@ class ArvadosPutReportTest(ArvadosBaseT
  
      def test_known_human_progress(self):
          for count, total in [(0, 1), (2, 4), (45, 60)]:
 -            expect = '{:.1%}'.format(float(count) / total)
 +            expect = '{:.1%}'.format(1.0*count/total)
              actual = arv_put.human_progress(count, total)
              self.assertTrue(actual.startswith('\r'))
              self.assertIn(expect, actual)
                                        arv_put.human_progress(count, None)))
  
  
 -class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
 +class ArvadosPutTest(run_test_server.TestCaseWithServers,
 +                     ArvadosBaseTestCase,
 +                     tutil.VersionChecker):
      MAIN_SERVER = {}
      Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
  
      def call_main_with_args(self, args):
 -        self.main_stdout = StringIO()
 -        self.main_stderr = StringIO()
 +        self.main_stdout = tutil.StringIO()
 +        self.main_stderr = tutil.StringIO()
          return arv_put.main(args, self.main_stdout, self.main_stderr)
  
      def call_main_on_test_file(self, args=[]):
          super(ArvadosPutTest, self).tearDown()
  
      def test_version_argument(self):
 -        err = io.BytesIO()
 -        out = io.BytesIO()
 -        with tutil.redirected_streams(stdout=out, stderr=err):
 +        with tutil.redirected_streams(
 +                stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
              with self.assertRaises(SystemExit):
                  self.call_main_with_args(['--version'])
 -        self.assertEqual(out.getvalue(), '')
 -        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
 +        self.assertVersionOutput(out, err)
  
      def test_simple_file_put(self):
          self.call_main_on_test_file()
      def test_api_error_handling(self):
          coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
          coll_save_mock.side_effect = arvados.errors.ApiError(
 -            fake_httplib2_response(403), '{}')
 +            fake_httplib2_response(403), b'{}')
          with mock.patch('arvados.collection.Collection.save_new',
                          new=coll_save_mock):
              with self.assertRaises(SystemExit) as exc_test:
@@@ -712,7 -738,7 +740,7 @@@ class ArvPutIntegrationTest(run_test_se
              result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
                                                    0)
          except ValueError as error:
 -            self.assertIn(BAD_UUID, error.message)
 +            self.assertIn(BAD_UUID, str(error))
          else:
              self.assertFalse(result, "incorrectly found nonexistent project")
  
              [sys.executable, arv_put.__file__, '--stream'],
              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
              stderr=subprocess.STDOUT, env=self.ENVIRON)
 -        pipe.stdin.write('stdin test\n')
 +        pipe.stdin.write(b'stdin test\n')
          pipe.stdin.close()
          deadline = time.time() + 5
          while (pipe.poll() is None) and (time.time() < deadline):
          elif returncode != 0:
              sys.stdout.write(pipe.stdout.read())
              self.fail("arv-put returned exit code {}".format(returncode))
 -        self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
 +        self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
 +                      pipe.stdout.read().decode())
  
      def test_ArvPutSignedManifest(self):
          # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
          with open(os.path.join(datadir, "foo"), "w") as f:
              f.write("The quick brown fox jumped over the lazy dog")
          p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
 -                             stdout=subprocess.PIPE, env=self.ENVIRON)
 -        (arvout, arverr) = p.communicate()
 -        self.assertEqual(arverr, None)
 +                             stdout=subprocess.PIPE,
 +                             stderr=subprocess.PIPE,
 +                             env=self.ENVIRON)
 +        (out, err) = p.communicate()
 +        self.assertRegex(err.decode(), r'INFO: Collection saved as ')
          self.assertEqual(p.returncode, 0)
  
          # The manifest text stored in the API server under the same
          # manifest UUID must use signed locators.
          c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
 -        self.assertRegexpMatches(
 +        self.assertRegex(
              c['manifest_text'],
              r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
  
              [sys.executable, arv_put.__file__] + extra_args,
              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
              stderr=subprocess.PIPE, env=self.ENVIRON)
 -        stdout, stderr = pipe.communicate(text)
 +        stdout, stderr = pipe.communicate(text.encode())
 +        self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
          search_key = ('portable_data_hash'
                        if '--portable-data-hash' in extra_args else 'uuid')
          collection_list = arvados.api('v1').collections().list(
 -            filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
 +            filters=[[search_key, '=', stdout.decode().strip()]]
 +        ).execute().get('items', [])
          self.assertEqual(1, len(collection_list))
          return collection_list[0]
  
          self.assertEqual(col['uuid'], updated_col['uuid'])
          # Get the manifest and check that the new file is being included
          c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
 -        self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
 +        self.assertRegex(c['manifest_text'], r'^\. .*:44:file2\n')
  
      def test_put_collection_with_high_redundancy(self):
          # Write empty data: we're not testing CollectionWriter, just
              "Test unnamed collection",
              ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
          username = pwd.getpwuid(os.getuid()).pw_name
 -        self.assertRegexpMatches(
 +        self.assertRegex(
              link['name'],
              r'^Saved at .* by {}@'.format(re.escape(username)))
  
index 86215f535a21634b18a0167636c79431657abd1f,fd31664a528d1ea3cfe1d03b1852b304bb4c88e7..24f305ac732d562596b12dff2179abe990637601
@@@ -1,20 -1,20 +1,20 @@@
 -# usage example:
 -#
 -# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
 +from __future__ import absolute_import
  
 +from builtins import object
  import arvados
  import copy
  import mock
  import os
  import pprint
  import re
 +import sys
  import tempfile
  import unittest
  
 -import run_test_server
 +from . import run_test_server
  from arvados._ranges import Range, LocatorAndRange
  from arvados.collection import Collection, CollectionReader
 -import arvados_testutil as tutil
 +from . import arvados_testutil as tutil
  
  class TestResumableWriter(arvados.ResumableCollectionWriter):
      KEEP_BLOCK_SIZE = 1024  # PUT to Keep every 1K.
@@@ -40,13 -40,13 +40,13 @@@ class ArvadosCollectionsTest(run_test_s
          self.assertEqual(cw.current_stream_name(), '.',
                           'current_stream_name() should be "." now')
          cw.set_current_file_name('foo.txt')
 -        cw.write('foo')
 +        cw.write(b'foo')
          self.assertEqual(cw.current_file_name(), 'foo.txt',
                           'current_file_name() should be foo.txt now')
          cw.start_new_file('bar.txt')
 -        cw.write('bar')
 +        cw.write(b'bar')
          cw.start_new_stream('baz')
 -        cw.write('baz')
 +        cw.write(b'baz')
          cw.set_current_file_name('baz.txt')
          self.assertEqual(cw.manifest_text(),
                           ". 3858f62230ac3c915f300c664312c63f+6 0:3:foo.txt 3:3:bar.txt\n" +
@@@ -56,8 -56,8 +56,8 @@@
          return cw.portable_data_hash()
  
      def test_keep_local_store(self):
 -        self.assertEqual(self.keep_client.put('foo'), 'acbd18db4cc2f85cedef654fccc4a4d8+3', 'wrong md5 hash from Keep.put')
 -        self.assertEqual(self.keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3'), 'foo', 'wrong data from Keep.get')
 +        self.assertEqual(self.keep_client.put(b'foo'), 'acbd18db4cc2f85cedef654fccc4a4d8+3', 'wrong md5 hash from Keep.put')
 +        self.assertEqual(self.keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3'), b'foo', 'wrong data from Keep.get')
  
      def test_local_collection_writer(self):
          self.assertEqual(self.write_foo_bar_baz(),
          for s in cr.all_streams():
              for f in s.all_files():
                  got += [[f.size(), f.stream_name(), f.name(), f.read(2**26)]]
 -        expected = [[3, '.', 'foo.txt', 'foo'],
 -                    [3, '.', 'bar.txt', 'bar'],
 -                    [3, './baz', 'baz.txt', 'baz']]
 +        expected = [[3, '.', 'foo.txt', b'foo'],
 +                    [3, '.', 'bar.txt', b'bar'],
 +                    [3, './baz', 'baz.txt', b'baz']]
          self.assertEqual(got,
                           expected)
          stream0 = cr.all_streams()[0]
          self.assertEqual(stream0.readfrom(0, 0),
 -                         '',
 +                         b'',
                           'reading zero bytes should have returned empty string')
          self.assertEqual(stream0.readfrom(0, 2**26),
 -                         'foobar',
 +                         b'foobar',
                           'reading entire stream failed')
          self.assertEqual(stream0.readfrom(2**26, 0),
 -                         '',
 +                         b'',
                           'reading zero bytes should have returned empty string')
  
      def _test_subset(self, collection, expected):
      def test_collection_manifest_subset(self):
          foobarbaz = self.write_foo_bar_baz()
          self._test_subset(foobarbaz,
 -                          [[3, '.',     'bar.txt', 'bar'],
 -                           [3, '.',     'foo.txt', 'foo'],
 -                           [3, './baz', 'baz.txt', 'baz']])
 +                          [[3, '.',     'bar.txt', b'bar'],
 +                           [3, '.',     'foo.txt', b'foo'],
 +                           [3, './baz', 'baz.txt', b'baz']])
          self._test_subset((". %s %s 0:3:foo.txt 3:3:bar.txt\n" %
 -                           (self.keep_client.put("foo"),
 -                            self.keep_client.put("bar"))),
 -                          [[3, '.', 'bar.txt', 'bar'],
 -                           [3, '.', 'foo.txt', 'foo']])
 +                           (self.keep_client.put(b"foo"),
 +                            self.keep_client.put(b"bar"))),
 +                          [[3, '.', 'bar.txt', b'bar'],
 +                           [3, '.', 'foo.txt', b'foo']])
          self._test_subset((". %s %s 0:2:fo.txt 2:4:obar.txt\n" %
 -                           (self.keep_client.put("foo"),
 -                            self.keep_client.put("bar"))),
 -                          [[2, '.', 'fo.txt', 'fo'],
 -                           [4, '.', 'obar.txt', 'obar']])
 +                           (self.keep_client.put(b"foo"),
 +                            self.keep_client.put(b"bar"))),
 +                          [[2, '.', 'fo.txt', b'fo'],
 +                           [4, '.', 'obar.txt', b'obar']])
          self._test_subset((". %s %s 0:2:fo.txt 2:0:zero.txt 2:2:ob.txt 4:2:ar.txt\n" %
 -                           (self.keep_client.put("foo"),
 -                            self.keep_client.put("bar"))),
 -                          [[2, '.', 'ar.txt', 'ar'],
 -                           [2, '.', 'fo.txt', 'fo'],
 -                           [2, '.', 'ob.txt', 'ob'],
 -                           [0, '.', 'zero.txt', '']])
 +                           (self.keep_client.put(b"foo"),
 +                            self.keep_client.put(b"bar"))),
 +                          [[2, '.', 'ar.txt', b'ar'],
 +                           [2, '.', 'fo.txt', b'fo'],
 +                           [2, '.', 'ob.txt', b'ob'],
 +                           [0, '.', 'zero.txt', b'']])
  
      def test_collection_empty_file(self):
          cw = arvados.CollectionWriter(self.api_client)
          cw.start_new_file('zero.txt')
 -        cw.write('')
 +        cw.write(b'')
  
          self.assertEqual(cw.manifest_text(), ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:zero.txt\n")
          self.check_manifest_file_sizes(cw.manifest_text(), [0])
          cw = arvados.CollectionWriter(self.api_client)
          cw.start_new_file('zero.txt')
 -        cw.write('')
 +        cw.write(b'')
          cw.start_new_file('one.txt')
 -        cw.write('1')
 +        cw.write(b'1')
          cw.start_new_stream('foo')
          cw.start_new_file('zero.txt')
 -        cw.write('')
 +        cw.write(b'')
          self.check_manifest_file_sizes(cw.manifest_text(), [0,1,0])
  
      def test_no_implicit_normalize(self):
          cw = arvados.CollectionWriter(self.api_client)
          cw.start_new_file('b')
 -        cw.write('b')
 +        cw.write(b'b')
          cw.start_new_file('a')
 -        cw.write('')
 +        cw.write(b'')
          self.check_manifest_file_sizes(cw.manifest_text(), [1,0])
          self.check_manifest_file_sizes(
              arvados.CollectionReader(
              return self.content[locator]
  
      def test_stream_reader(self):
 -        keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
 -                      'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
 -                      'cccccccccccccccccccccccccccccccc+5': 'z0123'}
 +        keepblocks = {
 +            'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': b'abcdefghij',
 +            'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': b'klmnopqrstuvwxy',
 +            'cccccccccccccccccccccccccccccccc+5': b'z0123',
 +        }
          mk = self.MockKeep(keepblocks)
  
          sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:30:foo"], mk)
  
 -        content = 'abcdefghijklmnopqrstuvwxyz0123456789'
 +        content = b'abcdefghijklmnopqrstuvwxyz0123456789'
  
          self.assertEqual(sr.readfrom(0, 30), content[0:30])
          self.assertEqual(sr.readfrom(2, 30), content[2:30])
          self.assertEqual(sr.readfrom(15, 5), content[15:20])
          self.assertEqual(sr.readfrom(20, 5), content[20:25])
          self.assertEqual(sr.readfrom(25, 5), content[25:30])
 -        self.assertEqual(sr.readfrom(30, 5), '')
 +        self.assertEqual(sr.readfrom(30, 5), b'')
  
      def test_extract_file(self):
          m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
      def test_write_multiple_files(self):
          cwriter = arvados.CollectionWriter(self.api_client)
          for letter in 'ABC':
 -            with self.make_test_file(letter) as testfile:
 +            with self.make_test_file(letter.encode()) as testfile:
                  cwriter.write_file(testfile.name, letter)
          self.assertEqual(
              cwriter.manifest_text(),
          with self.make_test_file() as testfile:
              cwriter.write_file(testfile.name, 'test')
              orig_mtime = os.fstat(testfile.fileno()).st_mtime
 -            testfile.write('extra')
 +            testfile.write(b'extra')
              testfile.flush()
              os.utime(testfile.name, (orig_mtime, orig_mtime))
              self.assertRaises(arvados.errors.StaleWriterStateError,
@@@ -521,8 -519,8 +521,8 @@@ class CollectionTestMixin(tutil.ApiClie
  
  @tutil.skip_sleep
  class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
 -    def mock_get_collection(self, api_mock, code, body):
 -        body = self.API_COLLECTIONS.get(body)
 +    def mock_get_collection(self, api_mock, code, fixturename):
 +        body = self.API_COLLECTIONS.get(fixturename)
          self._mock_api_call(api_mock.collections().get, code, body)
  
      def api_client_mock(self, status=200):
          reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
                                            num_retries=3)
          with tutil.mock_keep_responses('foo', 500, 500, 200):
 -            self.assertEqual('foo',
 -                             ''.join(f.read(9) for f in reader.all_files()))
 +            self.assertEqual(b'foo',
 +                             b''.join(f.read(9) for f in reader.all_files()))
  
      def test_read_nonnormalized_manifest_with_collection_reader(self):
          # client should be able to use CollectionReader on a manifest without normalizing it
      def test_open_collection_file_one_argument(self):
          client = self.api_client_mock(200)
          reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
 -        cfile = reader.open('./foo')
 +        cfile = reader.open('./foo', 'rb')
          self.check_open_file(cfile, '.', 'foo', 3)
  
      def test_open_deep_file(self):
          self.mock_get_collection(client, 200, coll_name)
          reader = arvados.CollectionReader(
              self.API_COLLECTIONS[coll_name]['uuid'], api_client=client)
 -        cfile = reader.open('./subdir2/subdir3/file2_in_subdir3.txt')
 +        cfile = reader.open('./subdir2/subdir3/file2_in_subdir3.txt', 'rb')
          self.check_open_file(cfile, './subdir2/subdir3', 'file2_in_subdir3.txt',
                               32)
  
@@@ -680,7 -678,7 +680,7 @@@ class CollectionWriterTestCase(unittest
          kwargs.setdefault('api_client', self.api_client_mock())
          writer = arvados.CollectionWriter(**kwargs)
          writer.start_new_file('foo')
 -        writer.write('foo')
 +        writer.write(b'foo')
          return writer
  
      def test_write_whole_collection(self):
          with writer.open('out') as out_file:
              self.assertEqual('.', writer.current_stream_name())
              self.assertEqual('out', writer.current_file_name())
 -            out_file.write('test data')
 +            out_file.write(b'test data')
              data_loc = tutil.str_keep_locator('test data')
          self.assertTrue(out_file.closed, "writer file not closed after context")
          self.assertRaises(ValueError, out_file.write, 'extra text')
          with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
              writer = arvados.CollectionWriter(client)
              with writer.open('flush_test') as out_file:
 -                out_file.write('flush1')
 +                out_file.write(b'flush1')
                  out_file.flush()
 -                out_file.write('flush2')
 +                out_file.write(b'flush2')
              self.assertEqual(". {} {} 0:12:flush_test\n".format(data_loc1,
                                                                  data_loc2),
                               writer.manifest_text())
          client = self.api_client_mock()
          writer = arvados.CollectionWriter(client)
          with writer.open('.', '1') as out_file:
 -            out_file.write('1st')
 +            out_file.write(b'1st')
          with writer.open('.', '2') as out_file:
 -            out_file.write('2nd')
 +            out_file.write(b'2nd')
          data_loc = tutil.str_keep_locator('1st2nd')
          with self.mock_keep(data_loc, 200) as keep_mock:
              self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
          with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
              writer = arvados.CollectionWriter(client)
              with writer.open('file') as out_file:
 -                out_file.write('file')
 +                out_file.write(b'file')
              with writer.open('./dir', 'indir') as out_file:
 -                out_file.write('indir')
 +                out_file.write(b'indir')
              expected = ". {} 0:4:file\n./dir {} 0:5:indir\n".format(
                  data_loc1, data_loc2)
              self.assertEqual(expected, writer.manifest_text())
          self.assertRaises(arvados.errors.AssertionError, writer.open, 'two')
  
  
 +class CollectionOpenModes(run_test_server.TestCaseWithServers):
 +
 +    def test_open_binary_modes(self):
 +        c = Collection()
 +        for mode in ['wb', 'wb+', 'ab', 'ab+']:
 +            with c.open('foo', 'wb') as f:
 +                f.write(b'foo')
 +
 +    def test_open_invalid_modes(self):
 +        c = Collection()
 +        for mode in ['+r', 'aa', '++', 'r+b', 'beer', '', None]:
 +            with self.assertRaises(Exception):
 +                c.open('foo', mode)
 +
 +    def test_open_text_modes(self):
 +        c = Collection()
 +        with c.open('foo', 'wb') as f:
 +            f.write('foo')
 +        for mode in ['r', 'rt', 'r+', 'rt+', 'w', 'wt', 'a', 'at']:
 +            if sys.version_info >= (3, 0):
 +                with self.assertRaises(NotImplementedError):
 +                    c.open('foo', mode)
 +            else:
 +                with c.open('foo', mode) as f:
 +                    if mode[0] == 'r' and '+' not in mode:
 +                        self.assertEqual('foo', f.read(3))
 +                    else:
 +                        f.write('bar')
 +                        f.seek(-3, os.SEEK_CUR)
 +                        self.assertEqual('bar', f.read(3))
 +
 +
  class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
  
      def test_replication_desired_kept_on_load(self):
          c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
          c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
          d = c2.diff(c1)
 -        self.assertEqual(d, [('del', './count2.txt', c2["count2.txt"]),
 -                             ('add', './count1.txt', c1["count1.txt"])])
 +        self.assertEqual(sorted(d), [
 +            ('add', './count1.txt', c1["count1.txt"]),
 +            ('del', './count2.txt', c2["count2.txt"]),
 +        ])
          d = c1.diff(c2)
 -        self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
 -                             ('add', './count2.txt', c2["count2.txt"])])
 +        self.assertEqual(sorted(d), [
 +            ('add', './count2.txt', c2["count2.txt"]),
 +            ('del', './count1.txt', c1["count1.txt"]),
 +        ])
          self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
          c1.apply(d)
          self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
          c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
          c2 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt 10:20:count2.txt\n')
          d = c2.diff(c1)
 -        self.assertEqual(d, [('del', './count2.txt', c2["count2.txt"]),
 -                             ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
 +        self.assertEqual(sorted(d), [
 +            ('del', './count2.txt', c2["count2.txt"]),
 +            ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
 +        ])
          d = c1.diff(c2)
 -        self.assertEqual(d, [('add', './count2.txt', c2["count2.txt"]),
 -                             ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
 +        self.assertEqual(sorted(d), [
 +            ('add', './count2.txt', c2["count2.txt"]),
 +            ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
 +        ])
  
          self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
          c1.apply(d)
          c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
          c2 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
          d = c2.diff(c1)
 -        self.assertEqual(d, [('del', './foo', c2["foo"]),
 -                             ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
 +        self.assertEqual(sorted(d), [
 +            ('del', './foo', c2["foo"]),
 +            ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
 +        ])
          d = c1.diff(c2)
 -        self.assertEqual(d, [('add', './foo', c2["foo"]),
 -                             ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
 -
 +        self.assertEqual(sorted(d), [
 +            ('add', './foo', c2["foo"]),
 +            ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
 +        ])
          self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
          c1.apply(d)
          self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
      def test_diff_del_add_in_subcollection(self):
          c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
          c2 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:3:count3.txt\n')
 -
          d = c2.diff(c1)
 -        self.assertEqual(d, [('del', './foo/count3.txt', c2.find("foo/count3.txt")),
 -                             ('add', './foo/count2.txt', c1.find("foo/count2.txt")),
 -                             ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
 +        self.assertEqual(sorted(d), [
 +            ('add', './foo/count2.txt', c1.find("foo/count2.txt")),
 +            ('del', './foo/count3.txt', c2.find("foo/count3.txt")),
 +            ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
 +        ])
          d = c1.diff(c2)
 -        self.assertEqual(d, [('del', './foo/count2.txt', c1.find("foo/count2.txt")),
 -                             ('add', './foo/count3.txt', c2.find("foo/count3.txt")),
 -                             ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
 +        self.assertEqual(sorted(d), [
 +            ('add', './foo/count3.txt', c2.find("foo/count3.txt")),
 +            ('del', './foo/count2.txt', c1.find("foo/count2.txt")),
 +            ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
 +        ])
  
          self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
          c1.apply(d)
          c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
          c2 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:3:foo\n')
          d = c2.diff(c1)
 -        self.assertEqual(d, [('mod', './foo', c2["foo"], c1["foo"]),
 -                             ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
 +        self.assertEqual(sorted(d), [
 +            ('mod', './foo', c2["foo"], c1["foo"]),
 +            ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
 +        ])
          d = c1.diff(c2)
 -        self.assertEqual(d, [('mod', './foo', c1["foo"], c2["foo"]),
 -                             ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
 +        self.assertEqual(sorted(d), [
 +            ('mod', './foo', c1["foo"], c2["foo"]),
 +            ('tok', './count1.txt', c2["count1.txt"], c1["count1.txt"]),
 +        ])
  
          self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
          c1.apply(d)
          c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
          c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
          d = c1.diff(c2)
 -        self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
 -                             ('add', './count2.txt', c2["count2.txt"])])
 -        f = c1.open("count1.txt", "w")
 -        f.write("zzzzz")
 +        self.assertEqual(sorted(d), [
 +            ('add', './count2.txt', c2["count2.txt"]),
 +            ('del', './count1.txt', c1["count1.txt"]),
 +        ])
 +        f = c1.open("count1.txt", "wb")
 +        f.write(b"zzzzz")
  
          # c1 changed, so it should not be deleted.
          c1.apply(d)
          c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt')
          d = c1.diff(c2)
          self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])])
 -        f = c1.open("count1.txt", "w")
 -        f.write("zzzzz")
 +        f = c1.open("count1.txt", "wb")
 +        f.write(b"zzzzz")
  
          # c1 changed, so c2 mod will go to a conflict file
          c1.apply(d)
 -        self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
 +        self.assertRegex(
 +            c1.portable_manifest_text(),
 +            r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
  
      def test_conflict_add(self):
          c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
          c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt\n')
          d = c1.diff(c2)
 -        self.assertEqual(d, [('del', './count2.txt', c1["count2.txt"]),
 -                             ('add', './count1.txt', c2["count1.txt"])])
 -        f = c1.open("count1.txt", "w")
 -        f.write("zzzzz")
 +        self.assertEqual(sorted(d), [
 +            ('add', './count1.txt', c2["count1.txt"]),
 +            ('del', './count2.txt', c1["count2.txt"]),
 +        ])
 +        f = c1.open("count1.txt", "wb")
 +        f.write(b"zzzzz")
  
          # c1 added count1.txt, so c2 add will go to a conflict file
          c1.apply(d)
 -        self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
 +        self.assertRegex(
 +            c1.portable_manifest_text(),
 +            r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
  
      def test_conflict_del(self):
          c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
  
          # c1 deleted, so c2 mod will go to a conflict file
          c1.apply(d)
 -        self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
 +        self.assertRegex(
 +            c1.portable_manifest_text(),
 +            r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
  
      def test_notify(self):
          c1 = Collection()
          events = []
          c1.subscribe(lambda event, collection, name, item: events.append((event, collection, name, item)))
 -        f = c1.open("foo.txt", "w")
 +        f = c1.open("foo.txt", "wb")
          self.assertEqual(events[0], (arvados.collection.ADD, c1, "foo.txt", f.arvadosfile))
  
      def test_open_w(self):
          c1 = Collection(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n")
          self.assertEqual(c1["count1.txt"].size(), 10)
 -        c1.open("count1.txt", "w").close()
 +        c1.open("count1.txt", "wb").close()
          self.assertEqual(c1["count1.txt"].size(), 0)
  
  
  class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
      def test_get_manifest_text_only_committed(self):
          c = Collection()
 -        with c.open("count.txt", "w") as f:
 +        with c.open("count.txt", "wb") as f:
              # One file committed
 -            with c.open("foo.txt", "w") as foo:
 -                foo.write("foo")
 +            with c.open("foo.txt", "wb") as foo:
 +                foo.write(b"foo")
                  foo.flush() # Force block commit
 -            f.write("0123456789")
 +            f.write(b"0123456789")
              # Other file not committed. Block not written to keep yet.
              self.assertEqual(
                  c._get_manifest_text(".",
  
      def test_only_small_blocks_are_packed_together(self):
          c = Collection()
-         # Write a couple of small files, 
+         # Write a couple of small files,
 -        f = c.open("count.txt", "w")
 -        f.write("0123456789")
 +        f = c.open("count.txt", "wb")
 +        f.write(b"0123456789")
          f.close(flush=False)
 -        foo = c.open("foo.txt", "w")
 -        foo.write("foo")
 +        foo = c.open("foo.txt", "wb")
 +        foo.write(b"foo")
          foo.close(flush=False)
          # Then, write a big file, it shouldn't be packed with the ones above
 -        big = c.open("bigfile.txt", "w")
 -        big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
 +        big = c.open("bigfile.txt", "wb")
 +        big.write(b"x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
          big.close(flush=False)
          self.assertEqual(
              c.manifest_text("."),
              '. 2d303c138c118af809f39319e5d507e9+34603008 a8430a058b8fbf408e1931b794dbd6fb+13 0:34603008:bigfile.txt 34603008:10:count.txt 34603018:3:foo.txt\n')
  
+     def test_flush_after_small_block_packing(self):
+         c = Collection()
+         # Write a couple of small files,
+         f = c.open("count.txt", "w")
+         f.write("0123456789")
+         f.close(flush=False)
+         foo = c.open("foo.txt", "w")
+         foo.write("foo")
+         foo.close(flush=False)
+         self.assertEqual(
+             c.manifest_text(),
+             '. a8430a058b8fbf408e1931b794dbd6fb+13 0:10:count.txt 10:3:foo.txt\n')
+         f = c.open("count.txt", "r+")
+         f.close(flush=True)
+         self.assertEqual(
+             c.manifest_text(),
+             '. a8430a058b8fbf408e1931b794dbd6fb+13 0:10:count.txt 10:3:foo.txt\n')
+     def test_write_after_small_block_packing2(self):
+         c = Collection()
+         # Write a couple of small files,
+         f = c.open("count.txt", "w")
+         f.write("0123456789")
+         f.close(flush=False)
+         foo = c.open("foo.txt", "w")
+         foo.write("foo")
+         foo.close(flush=False)
+         self.assertEqual(
+             c.manifest_text(),
+             '. a8430a058b8fbf408e1931b794dbd6fb+13 0:10:count.txt 10:3:foo.txt\n')
+         f = c.open("count.txt", "r+")
+         f.write("abc")
+         f.close(flush=False)
+         self.assertEqual(
+             c.manifest_text(),
+             '. 900150983cd24fb0d6963f7d28e17f72+3 a8430a058b8fbf408e1931b794dbd6fb+13 0:3:count.txt 6:7:count.txt 13:3:foo.txt\n')
+     def test_small_block_packing_with_overwrite(self):
+         c = Collection()
+         c.open("b1", "w").close()
+         c["b1"].writeto(0, "b1", 0)
+         c.open("b2", "w").close()
+         c["b2"].writeto(0, "b2", 0)
+         c["b1"].writeto(0, "1b", 0)
+         self.assertEquals(c.manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 0:2:b1 2:2:b2\n")
+         self.assertEquals(c["b1"].manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 0:2:b1\n")
+         self.assertEquals(c["b2"].manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 2:2:b2\n")
  
  class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
      MAIN_SERVER = {}
          self.assertEqual(c.portable_data_hash(), "d41d8cd98f00b204e9800998ecf8427e+0")
          self.assertEqual(c.api_response()["portable_data_hash"], "d41d8cd98f00b204e9800998ecf8427e+0" )
  
 -        with c.open("count.txt", "w") as f:
 -            f.write("0123456789")
 +        with c.open("count.txt", "wb") as f:
 +            f.write(b"0123456789")
  
          self.assertEqual(c.portable_manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n")
  
      def test_create_and_save(self):
          c = self.create_count_txt()
          c.save()
 -        self.assertRegexpMatches(c.manifest_text(), r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
 +        self.assertRegex(
 +            c.manifest_text(),
 +            r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
  
      def test_create_and_save_new(self):
          c = self.create_count_txt()
          c.save_new()
 -        self.assertRegexpMatches(c.manifest_text(), r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
 +        self.assertRegex(
 +            c.manifest_text(),
 +            r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
  
      def test_create_diff_apply(self):
          c1 = self.create_count_txt()
          c1.save()
  
          c2 = Collection(c1.manifest_locator())
 -        with c2.open("count.txt", "w") as f:
 -            f.write("abcdefg")
 +        with c2.open("count.txt", "wb") as f:
 +            f.write(b"abcdefg")
  
          diff = c1.diff(c2)
  
          c1.save()
  
          c2 = arvados.collection.Collection(c1.manifest_locator())
 -        with c2.open("count.txt", "w") as f:
 -            f.write("abcdefg")
 +        with c2.open("count.txt", "wb") as f:
 +            f.write(b"abcdefg")
  
          c2.save()
  
          c1 = self.create_count_txt()
          c1.save()
  
 -        with c1.open("count.txt", "w") as f:
 -            f.write("XYZ")
 +        with c1.open("count.txt", "wb") as f:
 +            f.write(b"XYZ")
  
          c2 = arvados.collection.Collection(c1.manifest_locator())
 -        with c2.open("count.txt", "w") as f:
 -            f.write("abcdefg")
 +        with c2.open("count.txt", "wb") as f:
 +            f.write(b"abcdefg")
  
          c2.save()
  
          c1.update()
 -        self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
 +        self.assertRegex(
 +            c1.manifest_text(),
 +            r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
  
  
  if __name__ == '__main__':