import tempfile
import urllib.parse
import io
+import json
+import queue
+import threading
import arvados
import arvados.config
# Identify the kind of object we have been given, and begin copying.
t = uuid_type(src_arv, args.object_uuid)
- if t == 'Collection':
- set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
- result = copy_collection(args.object_uuid,
- src_arv, dst_arv,
- args)
- elif t == 'Workflow':
- set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
- result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
- elif t == 'Group':
- set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
- result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
- elif t == 'httpURL':
- result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
- else:
- abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+
+ try:
+ if t == 'Collection':
+ set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
+ result = copy_collection(args.object_uuid,
+ src_arv, dst_arv,
+ args)
+ elif t == 'Workflow':
+ set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
+ result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
+ elif t == 'Group':
+ set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
+ result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
+ elif t == 'httpURL':
+ result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
+ else:
+ abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+ except Exception as e:
+ logger.error("%s", e, exc_info=args.verbose)
+ exit(1)
# Clean up any outstanding temp git repositories.
for d in listvalues(local_repo_dir):
shutil.rmtree(d, ignore_errors=True)
+ if not result:
+ exit(1)
+
# If no exception was thrown and the response does not have an
# error_token field, presume success
- if 'error_token' in result or 'uuid' not in result:
- logger.error("API server returned an error result: {}".format(result))
+ if result is None or 'error_token' in result or 'uuid' not in result:
+ if result:
+ logger.error("API server returned an error result: {}".format(result))
exit(1)
print(result['uuid'])
# copy collections and docker images
if args.recursive and wf["definition"]:
- wf_def = yaml.safe_load(wf["definition"])
- if wf_def is not None:
- locations = []
- docker_images = {}
- graph = wf_def.get('$graph', None)
- if graph is not None:
- workflow_collections(graph, locations, docker_images)
- else:
- workflow_collections(wf_def, locations, docker_images)
+ env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
+ "ARVADOS_API_TOKEN": src.api_token,
+ "PATH": os.environ["PATH"]}
+ try:
+ result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
+ capture_output=True, env=env)
+ except (FileNotFoundError, subprocess.CalledProcessError):
+ logger.error('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
+ return
- if locations:
- copy_collections(locations, src, dst, args)
+ locations = json.loads(result.stdout)
- for image in docker_images:
- copy_docker_image(image, docker_images[image], src, dst, args)
+ if locations:
+ copy_collections(locations, src, dst, args)
# copy the workflow itself
del wf['uuid']
else:
progress_writer = None
+ # go through the words
+ # put each block loc into 'get' queue
+ # 'get' threads get block and put it into 'put' queue
+ # 'put' threads put block and then update dst_locators
+ #
+ # after going through the whole manifest we go back through it
+ # again and build dst_manifest
+
+ lock = threading.Lock()
+
+ # the get queue should be unbounded because we'll add all the
+ # block hashes we want to get, but these are small
+ get_queue = queue.Queue()
+
+ threadcount = 4
+
+ # the put queue contains full data blocks
+ # and if 'get' is faster than 'put' we could end up consuming
+ # a great deal of RAM if it isn't bounded.
+ put_queue = queue.Queue(threadcount)
+ transfer_error = []
+
+ def get_thread():
+ while True:
+ word = get_queue.get()
+ if word is None:
+ put_queue.put(None)
+ get_queue.task_done()
+ return
+
+ blockhash = arvados.KeepLocator(word).md5sum
+ with lock:
+ if blockhash in dst_locators:
+ # Already uploaded
+ get_queue.task_done()
+ continue
+
+ try:
+ logger.debug("Getting block %s", word)
+ data = src_keep.get(word)
+ put_queue.put((word, data))
+ except e:
+ logger.error("Error getting block %s: %s", word, e)
+ transfer_error.append(e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ get_queue.task_done()
+ except queue.Empty:
+ pass
+ finally:
+ get_queue.task_done()
+
+ def put_thread():
+ nonlocal bytes_written
+ while True:
+ item = put_queue.get()
+ if item is None:
+ put_queue.task_done()
+ return
+
+ word, data = item
+ loc = arvados.KeepLocator(word)
+ blockhash = loc.md5sum
+ with lock:
+ if blockhash in dst_locators:
+ # Already uploaded
+ put_queue.task_done()
+ continue
+
+ try:
+ logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+ dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+ with lock:
+ dst_locators[blockhash] = dst_locator
+ bytes_written += loc.size
+ if progress_writer:
+ progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+ except e:
+ logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ get_queue.task_done()
+ except queue.Empty:
+ pass
+ transfer_error.append(e)
+ finally:
+ put_queue.task_done()
+
+ for line in manifest.splitlines():
+ words = line.split()
+ for word in words[1:]:
+ try:
+ loc = arvados.KeepLocator(word)
+ except ValueError:
+ # If 'word' can't be parsed as a locator,
+ # presume it's a filename.
+ continue
+
+ get_queue.put(word)
+
+ for i in range(0, threadcount):
+ get_queue.put(None)
+
+ for i in range(0, threadcount):
+ threading.Thread(target=get_thread, daemon=True).start()
+
+ for i in range(0, threadcount):
+ threading.Thread(target=put_thread, daemon=True).start()
+
+ get_queue.join()
+ put_queue.join()
+
+ if len(transfer_error) > 0:
+ return {"error_token": "Failed to transfer blocks"}
+
for line in manifest.splitlines():
words = line.split()
dst_manifest.write(words[0])
dst_manifest.write(word)
continue
blockhash = loc.md5sum
- # copy this block if we haven't seen it before
- # (otherwise, just reuse the existing dst_locator)
- if blockhash not in dst_locators:
- logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
- if progress_writer:
- progress_writer.report(obj_uuid, bytes_written, bytes_expected)
- data = src_keep.get(word)
- dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
- dst_locators[blockhash] = dst_locator
- bytes_written += loc.size
dst_manifest.write(' ')
dst_manifest.write(dst_locators[blockhash])
dst_manifest.write("\n")