11308: Merge branch 'master' into 11308-python3
authorTom Clegg <tom@curoverse.com>
Tue, 2 May 2017 17:02:32 +0000 (13:02 -0400)
committerTom Clegg <tom@curoverse.com>
Tue, 2 May 2017 17:02:32 +0000 (13:02 -0400)
Conflicts:
sdk/python/arvados/arvfile.py
sdk/python/arvados/commands/put.py
sdk/python/tests/test_collections.py

1  2 
sdk/python/arvados/_ranges.py
sdk/python/arvados/arvfile.py
sdk/python/arvados/commands/put.py
sdk/python/tests/run_test_server.py
sdk/python/tests/test_arv_put.py
sdk/python/tests/test_collections.py

Simple merge
index 931a3a198db9407fd72b274225821cc3f12ca6d7,a2ec76a0761cb43a43731e5e31e9638f0dbf3019..2fc9c73afe031543ab7afb67edcd2e7a5c4c7d6f
@@@ -597,40 -630,52 +643,53 @@@ class _BlockManager(object)
          self._pending_write_size += closed_file_size
  
          # Check if there are enough small blocks for filling up one in full
-         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+             return
  
-             # Search blocks ready for getting packed together before being committed to Keep.
-             # A WRITABLE block always has an owner.
-             # A WRITABLE block with its owner.closed() implies that it's
-             # size is <= KEEP_BLOCK_SIZE/2.
-             try:
-                 small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
-             except AttributeError:
-                 # Writable blocks without owner shouldn't exist.
-                 raise UnownedBlockError()
 -        # Search blocks ready for getting packed together before being committed to Keep.
++        # Search blocks ready for getting packed together before being
++        # committed to Keep.
+         # A WRITABLE block always has an owner.
 -        # A WRITABLE block with its owner.closed() implies that it's
++        # A WRITABLE block with its owner.closed() implies that its
+         # size is <= KEEP_BLOCK_SIZE/2.
+         try:
 -            small_blocks = [b for b in self._bufferblocks.values()
++            small_blocks = [b for b in listvalues(self._bufferblocks)
+                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+         except AttributeError:
+             # Writable blocks without owner shouldn't exist.
+             raise UnownedBlockError()
+         if len(small_blocks) <= 1:
+             # Not enough small blocks for repacking
+             return
  
-             if len(small_blocks) <= 1:
-                 # Not enough small blocks for repacking
-                 return
+         for bb in small_blocks:
+             bb.repack_writes()
  
-             # Update the pending write size count with its true value, just in case
-             # some small file was opened, written and closed several times.
-             self._pending_write_size = sum([b.size() for b in small_blocks])
-             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
-                 return
+         # Update the pending write size count with its true value, just in case
+         # some small file was opened, written and closed several times.
+         self._pending_write_size = sum([b.size() for b in small_blocks])
  
-             new_bb = self._alloc_bufferblock()
-             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
-                 bb = small_blocks.pop(0)
-                 arvfile = bb.owner
-                 self._pending_write_size -= bb.size()
-                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
-                 arvfile.set_segments([Range(new_bb.blockid,
-                                             0,
-                                             bb.size(),
-                                             new_bb.write_pointer - bb.size())])
-                 self._delete_bufferblock(bb.blockid)
-             self.commit_bufferblock(new_bb, sync=sync)
+         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+             return
+         new_bb = self._alloc_bufferblock()
+         files = []
+         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+             bb = small_blocks.pop(0)
+             self._pending_write_size -= bb.size()
+             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+             files.append((bb, new_bb.write_pointer - bb.size()))
+         self.commit_bufferblock(new_bb, sync=sync)
+         for bb, new_bb_segment_offset in files:
+             newsegs = bb.owner.segments()
+             for s in newsegs:
+                 if s.locator == bb.blockid:
+                     s.locator = new_bb.locator()
+                     s.segment_offset = new_bb_segment_offset+s.segment_offset
+             bb.owner.set_segments(newsegs)
+             self._delete_bufferblock(bb.blockid)
  
      def commit_bufferblock(self, block, sync):
          """Initiate a background upload of a bufferblock.
@@@ -1010,39 -1055,8 +1069,8 @@@ class ArvadosFile(object)
                  self.parent._my_block_manager().block_prefetch(lr.locator)
                  locs.add(lr.locator)
  
 -        return ''.join(data)
 +        return b''.join(data)
  
-     def _repack_writes(self, num_retries):
-         """Optimize buffer block by repacking segments in file sequence.
-         When the client makes random writes, they appear in the buffer block in
-         the sequence they were written rather than the sequence they appear in
-         the file.  This makes for inefficient, fragmented manifests.  Attempt
-         to optimize by repacking writes in file sequence.
-         """
-         segs = self._segments
-         # Collect the segments that reference the buffer block.
-         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
-         # Collect total data referenced by segments (could be smaller than
-         # bufferblock size if a portion of the file was written and
-         # then overwritten).
-         write_total = sum([s.range_size for s in bufferblock_segs])
-         if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
-             # If there's more than one segment referencing this block, it is
-             # due to out-of-order writes and will produce a fragmented
-             # manifest, so try to optimize by re-packing into a new buffer.
-             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
-             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
-             for t in bufferblock_segs:
-                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
-                 t.segment_offset = new_bb.size() - t.range_size
-             self._current_bblock = new_bb
      @must_be_writable
      @synchronized
      def writeto(self, offset, data, num_retries):
index ed9d55cfc2df31a2b175254a441b4aa9a046cb02,6836d803886291a57dcc39b34c3dcb2c96a8c515..12f93298bba6b3ef2cf461576b2816e88790cae2
@@@ -452,12 -471,14 +470,16 @@@ class ArvPutUploadJob(object)
          except (SystemExit, Exception) as e:
              self._checkpoint_before_quit = False
              # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
--            # Note: We're expecting SystemExit instead of KeyboardInterrupt because
--            #   we have a custom signal handler in place that raises SystemExit with
--            #   the catched signal's code.
-             if not isinstance(e, SystemExit) or e.code != -2:
++            # Note: We're expecting SystemExit instead of
++            # KeyboardInterrupt because we have a custom signal
++            # handler in place that raises SystemExit with the catched
++            # signal's code.
+             if isinstance(e, PathDoesNotExistError):
+                 # We aren't interested in the traceback for this case
+                 pass
+             elif not isinstance(e, SystemExit) or e.code != -2:
 -                self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
 +                self.logger.warning("Abnormal termination:\n{}".format(
 +                    traceback.format_exc()))
              raise
          finally:
              if not self.dry_run:
Simple merge
index 083b8fc1ade6b7ef6fb0994193aef0b081835b07,320189104ab555dc97be4c618e83adc8d6acdd7f..6d103526f40ebc1b8de64cdf2956f1867cdcf538
@@@ -19,13 -17,16 +19,14 @@@ import yam
  import threading
  import hashlib
  import random
+ import uuid
  
 -from cStringIO import StringIO
 -
  import arvados
  import arvados.commands.put as arv_put
 -import arvados_testutil as tutil
 +from . import arvados_testutil as tutil
  
 -from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 -import run_test_server
 +from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 +from . import run_test_server
  
  class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
      CACHE_ARGSET = [
index 86215f535a21634b18a0167636c79431657abd1f,fd31664a528d1ea3cfe1d03b1852b304bb4c88e7..24f305ac732d562596b12dff2179abe990637601
@@@ -1173,16 -1111,16 +1173,16 @@@ class NewCollectionTestCaseWithServers(
  
      def test_only_small_blocks_are_packed_together(self):
          c = Collection()
-         # Write a couple of small files, 
+         # Write a couple of small files,
 -        f = c.open("count.txt", "w")
 -        f.write("0123456789")
 +        f = c.open("count.txt", "wb")
 +        f.write(b"0123456789")
          f.close(flush=False)
 -        foo = c.open("foo.txt", "w")
 -        foo.write("foo")
 +        foo = c.open("foo.txt", "wb")
 +        foo.write(b"foo")
          foo.close(flush=False)
          # Then, write a big file, it shouldn't be packed with the ones above
 -        big = c.open("bigfile.txt", "w")
 -        big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
 +        big = c.open("bigfile.txt", "wb")
 +        big.write(b"x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
          big.close(flush=False)
          self.assertEqual(
              c.manifest_text("."),