From: Tom Clegg Date: Tue, 2 May 2017 17:02:32 +0000 (-0400) Subject: 11308: Merge branch 'master' into 11308-python3 X-Git-Tag: 1.1.0~266^2~4 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/607fe087f6167061714a524dd53cbbc21b974973 11308: Merge branch 'master' into 11308-python3 Conflicts: sdk/python/arvados/arvfile.py sdk/python/arvados/commands/put.py sdk/python/tests/test_collections.py --- 607fe087f6167061714a524dd53cbbc21b974973 diff --cc sdk/python/arvados/arvfile.py index 931a3a198d,a2ec76a076..2fc9c73afe --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@@ -597,40 -630,52 +643,53 @@@ class _BlockManager(object) self._pending_write_size += closed_file_size # Check if there are enough small blocks for filling up one in full - if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE): + if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): + return - # Search blocks ready for getting packed together before being committed to Keep. - # A WRITABLE block always has an owner. - # A WRITABLE block with its owner.closed() implies that it's - # size is <= KEEP_BLOCK_SIZE/2. - try: - small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] - except AttributeError: - # Writable blocks without owner shouldn't exist. - raise UnownedBlockError() - # Search blocks ready for getting packed together before being committed to Keep. ++ # Search blocks ready for getting packed together before being ++ # committed to Keep. + # A WRITABLE block always has an owner. - # A WRITABLE block with its owner.closed() implies that it's ++ # A WRITABLE block with its owner.closed() implies that its + # size is <= KEEP_BLOCK_SIZE/2. + try: - small_blocks = [b for b in self._bufferblocks.values() ++ small_blocks = [b for b in listvalues(self._bufferblocks) + if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + except AttributeError: + # Writable blocks without owner shouldn't exist. + raise UnownedBlockError() + + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return - if len(small_blocks) <= 1: - # Not enough small blocks for repacking - return + for bb in small_blocks: + bb.repack_writes() - # Update the pending write size count with its true value, just in case - # some small file was opened, written and closed several times. - self._pending_write_size = sum([b.size() for b in small_blocks]) - if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: - return + # Update the pending write size count with its true value, just in case + # some small file was opened, written and closed several times. + self._pending_write_size = sum([b.size() for b in small_blocks]) - new_bb = self._alloc_bufferblock() - while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: - bb = small_blocks.pop(0) - arvfile = bb.owner - self._pending_write_size -= bb.size() - new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) - arvfile.set_segments([Range(new_bb.blockid, - 0, - bb.size(), - new_bb.write_pointer - bb.size())]) - self._delete_bufferblock(bb.blockid) - self.commit_bufferblock(new_bb, sync=sync) + if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: + return + + new_bb = self._alloc_bufferblock() + files = [] + while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + self._pending_write_size -= bb.size() + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + files.append((bb, new_bb.write_pointer - bb.size())) + + self.commit_bufferblock(new_bb, sync=sync) + + for bb, new_bb_segment_offset in files: + newsegs = bb.owner.segments() + for s in newsegs: + if s.locator == bb.blockid: + s.locator = new_bb.locator() + s.segment_offset = new_bb_segment_offset+s.segment_offset + bb.owner.set_segments(newsegs) + self._delete_bufferblock(bb.blockid) def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@@ -1010,39 -1055,8 +1069,8 @@@ class ArvadosFile(object) self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) - return ''.join(data) + return b''.join(data) - def _repack_writes(self, num_retries): - """Optimize buffer block by repacking segments in file sequence. - - When the client makes random writes, they appear in the buffer block in - the sequence they were written rather than the sequence they appear in - the file. This makes for inefficient, fragmented manifests. Attempt - to optimize by repacking writes in file sequence. - - """ - segs = self._segments - - # Collect the segments that reference the buffer block. - bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid] - - # Collect total data referenced by segments (could be smaller than - # bufferblock size if a portion of the file was written and - # then overwritten). - write_total = sum([s.range_size for s in bufferblock_segs]) - - if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1: - # If there's more than one segment referencing this block, it is - # due to out-of-order writes and will produce a fragmented - # manifest, so try to optimize by re-packing into a new buffer. - contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries) - new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self) - for t in bufferblock_segs: - new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) - t.segment_offset = new_bb.size() - t.range_size - - self._current_bblock = new_bb - @must_be_writable @synchronized def writeto(self, offset, data, num_retries): diff --cc sdk/python/arvados/commands/put.py index ed9d55cfc2,6836d80388..12f93298bb --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@@ -452,12 -471,14 +470,16 @@@ class ArvPutUploadJob(object) except (SystemExit, Exception) as e: self._checkpoint_before_quit = False # Log stack trace only when Ctrl-C isn't pressed (SIGINT) -- # Note: We're expecting SystemExit instead of KeyboardInterrupt because -- # we have a custom signal handler in place that raises SystemExit with -- # the catched signal's code. - if not isinstance(e, SystemExit) or e.code != -2: ++ # Note: We're expecting SystemExit instead of ++ # KeyboardInterrupt because we have a custom signal ++ # handler in place that raises SystemExit with the catched ++ # signal's code. + if isinstance(e, PathDoesNotExistError): + # We aren't interested in the traceback for this case + pass + elif not isinstance(e, SystemExit) or e.code != -2: - self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e))) + self.logger.warning("Abnormal termination:\n{}".format( + traceback.format_exc())) raise finally: if not self.dry_run: diff --cc sdk/python/tests/test_arv_put.py index 083b8fc1ad,320189104a..6d103526f4 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@@ -19,13 -17,16 +19,14 @@@ import yam import threading import hashlib import random + import uuid -from cStringIO import StringIO - import arvados import arvados.commands.put as arv_put -import arvados_testutil as tutil +from . import arvados_testutil as tutil -from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response -import run_test_server +from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response +from . import run_test_server class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): CACHE_ARGSET = [ diff --cc sdk/python/tests/test_collections.py index 86215f535a,fd31664a52..24f305ac73 --- a/sdk/python/tests/test_collections.py +++ b/sdk/python/tests/test_collections.py @@@ -1173,16 -1111,16 +1173,16 @@@ class NewCollectionTestCaseWithServers( def test_only_small_blocks_are_packed_together(self): c = Collection() - # Write a couple of small files, + # Write a couple of small files, - f = c.open("count.txt", "w") - f.write("0123456789") + f = c.open("count.txt", "wb") + f.write(b"0123456789") f.close(flush=False) - foo = c.open("foo.txt", "w") - foo.write("foo") + foo = c.open("foo.txt", "wb") + foo.write(b"foo") foo.close(flush=False) # Then, write a big file, it shouldn't be packed with the ones above - big = c.open("bigfile.txt", "w") - big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2 + big = c.open("bigfile.txt", "wb") + big.write(b"x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2 big.close(flush=False) self.assertEqual( c.manifest_text("."),