11684: When packing small blocks into one, save references of the files
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 26 May 2017 02:34:59 +0000 (23:34 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 26 May 2017 02:34:59 +0000 (23:34 -0300)
included on the block when committing it asynchronously, so that the
segment's locators can be updated at the put thread after the block is
committed and the permission token is returned from the API Server.

sdk/python/arvados/arvfile.py
sdk/python/tests/test_collections.py

index 2fc9c73afe031543ab7afb67edcd2e7a5c4c7d6f..f00936d74336c6f17aac194fac71df020d8c116d 100644 (file)
@@ -499,6 +499,8 @@ class _BlockManager(object):
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
         self.padding_block = None
+        self._repacked_bb = {}
+        self._repacked_bb_lock = threading.Lock()
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -558,6 +560,16 @@ class _BlockManager(object):
                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
+                with self._repacked_bb_lock:
+                    # Check if this block was created by repacking smaller blocks
+                    if bufferblock.blockid in self._repacked_bb:
+                        # Update segment locators (with its tokens) of files within
+                        # this block
+                        old_loc = self._repacked_bb[bufferblock.blockid]['unsigned_loc']
+                        for f in self._repacked_bb[bufferblock.blockid]['files']:
+                            for s in [x for x in f._segments if x.locator == old_loc]:
+                                s.locator = loc
+                        del(self._repacked_bb[bufferblock.blockid])
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
             finally:
@@ -680,16 +692,27 @@ class _BlockManager(object):
             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
             files.append((bb, new_bb.write_pointer - bb.size()))
 
+        # If this repacked block will be committed asynchronously, take note
+        # of its files so their segments' locators will be updated with
+        # the correct permission token returned by the API server.
+        if not sync:
+            with self._repacked_bb_lock:
+                self._repacked_bb[new_bb.blockid] = {
+                    'unsigned_loc': new_bb.locator(),
+                    'files': [bb.owner for bb, _ in files],
+                }
+
         self.commit_bufferblock(new_bb, sync=sync)
 
-        for bb, new_bb_segment_offset in files:
-            newsegs = bb.owner.segments()
-            for s in newsegs:
-                if s.locator == bb.blockid:
-                    s.locator = new_bb.locator()
-                    s.segment_offset = new_bb_segment_offset+s.segment_offset
-            bb.owner.set_segments(newsegs)
-            self._delete_bufferblock(bb.blockid)
+        with self._repacked_bb_lock:
+            for bb, new_bb_segment_offset in files:
+                newsegs = bb.owner.segments()
+                for s in newsegs:
+                    if s.locator == bb.blockid:
+                        s.locator = new_bb.locator()
+                        s.segment_offset = new_bb_segment_offset+s.segment_offset
+                bb.owner.set_segments(newsegs)
+                self._delete_bufferblock(bb.blockid)
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
index cfc3665f424e32455137af73055966c4238365c2..a992328d70778099f7693e6fdf76158db0cfb6a4 100644 (file)
@@ -1182,7 +1182,7 @@ class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServ
     def setUp(self):
         self.keep_put = getattr(arvados.keep.KeepClient, 'put')
 
-    def test_repacked_block_sumbmission_get_permission_token(self):
+    def test_repacked_block_submission_get_permission_token(self):
         '''
         Make sure that those blocks that are committed after repacking small ones,
         get their permission tokens assigned on the collection manifest.