+ # go through the words
+ # put each block loc into 'get' queue
+ # 'get' threads get block and put it into 'put' queue
+ # 'put' threads put block and then update dst_locators
+ #
+ # after going through the whole manifest we go back through it
+ # again and build dst_manifest
+
+ lock = threading.Lock()
+
+ # the get queue should be unbounded because we'll add all the
+ # block hashes we want to get, but these are small
+ get_queue = queue.Queue()
+
+ threadcount = 4
+
+ # the put queue contains full data blocks
+ # and if 'get' is faster than 'put' we could end up consuming
+ # a great deal of RAM if it isn't bounded.
+ put_queue = queue.Queue(threadcount)
+ transfer_error = []
+
+ def get_thread():
+ while True:
+ word = get_queue.get()
+ if word is None:
+ put_queue.put(None)
+ get_queue.task_done()
+ return
+
+ blockhash = arvados.KeepLocator(word).md5sum
+ with lock:
+ if blockhash in dst_locators:
+ # Already uploaded
+ get_queue.task_done()
+ continue
+
+ try:
+ logger.debug("Getting block %s", word)
+ data = src_keep.get(word)
+ put_queue.put((word, data))
+ except e:
+ logger.error("Error getting block %s: %s", word, e)
+ transfer_error.append(e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ get_queue.task_done()
+ except queue.Empty:
+ pass
+ finally:
+ get_queue.task_done()
+
+ def put_thread():
+ nonlocal bytes_written
+ while True:
+ item = put_queue.get()
+ if item is None:
+ put_queue.task_done()
+ return
+
+ word, data = item
+ loc = arvados.KeepLocator(word)
+ blockhash = loc.md5sum
+ with lock:
+ if blockhash in dst_locators:
+ # Already uploaded
+ put_queue.task_done()
+ continue
+
+ try:
+ logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+ dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+ with lock:
+ dst_locators[blockhash] = dst_locator
+ bytes_written += loc.size
+ if progress_writer:
+ progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+ except e:
+ logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ get_queue.task_done()
+ except queue.Empty:
+ pass
+ transfer_error.append(e)
+ finally:
+ put_queue.task_done()
+
+ for line in manifest.splitlines():
+ words = line.split()
+ for word in words[1:]:
+ try:
+ loc = arvados.KeepLocator(word)
+ except ValueError:
+ # If 'word' can't be parsed as a locator,
+ # presume it's a filename.
+ continue
+
+ get_queue.put(word)
+
+ for i in range(0, threadcount):
+ get_queue.put(None)
+
+ for i in range(0, threadcount):
+ threading.Thread(target=get_thread, daemon=True).start()
+
+ for i in range(0, threadcount):
+ threading.Thread(target=put_thread, daemon=True).start()
+
+ get_queue.join()
+ put_queue.join()
+
+ if len(transfer_error) > 0:
+ return {"error_token": "Failed to transfer blocks"}
+