3198: Apply StreamFileReader tests to ArvadosFileReader
[arvados.git] / sdk / python / arvados / arvfile.py
index c46019a0d412cd3cc7ec426fd2271634add55d9d..1c21d832c0408c2233f5bf8e5ad4f8126271535c 100644 (file)
@@ -304,21 +304,23 @@ class BlockManager(object):
         if self._put_threads is None:
             self._put_queue = Queue.Queue(maxsize=2)
             self._put_errors = Queue.Queue()
-            self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))]
+            self._put_threads = [threading.Thread(target=worker, args=(self,)),
+                                 threading.Thread(target=worker, args=(self,))]
             for t in self._put_threads:
+                t.daemon = True
                 t.start()
 
         block.state = BufferBlock.PENDING
         self._put_queue.put(block)
 
-    def get_block(self, locator, num_retries):
+    def get_block(self, locator, num_retries, cache_only=False):
         if locator in self._bufferblocks:
             bb = self._bufferblocks[locator]
             if bb.state != BufferBlock.COMMITTED:
                 return bb.buffer_view[0:bb.write_pointer].tobytes()
             else:
                 locator = bb._locator
-        return self._keep.get(locator, num_retries=num_retries)
+        return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
 
     def commit_all(self):
         for k,v in self._bufferblocks.items():
@@ -352,8 +354,9 @@ class BlockManager(object):
             self._prefetch_queue = Queue.Queue()
             self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
                                       threading.Thread(target=worker, args=(self,))]
-            self._prefetch_threads[0].start()
-            self._prefetch_threads[1].start()
+            for t in self._prefetch_threads:
+                t.daemon = True
+                t.start()
         self._prefetch_queue.put(locator)
 
 class ArvadosFile(object):
@@ -366,7 +369,7 @@ class ArvadosFile(object):
         self._modified = True
         self.segments = []
         for s in segments:
-            self.add_segment(stream, s.range_start, s.range_size)
+            self.add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
 
     def set_unmodified(self):
@@ -402,9 +405,11 @@ class ArvadosFile(object):
             self.parent._my_block_manager().block_prefetch(lr.locator)
 
         for lr in locators_and_ranges(self.segments, offset, size):
-            # TODO: if data is empty, wait on block get, otherwise only
-            # get more data if the block is already in the cache.
-            data.append(self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
+            d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
+            if d:
+                data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
+            else:
+                break
         return ''.join(data)
 
     def _repack_writes(self):