h2. arv-copy
-@arv-copy@ allows users to copy collections, workflow definitions and projects from one cluster to another.
+@arv-copy@ allows users to copy collections, workflow definitions and projects from one cluster to another. You can also use @arv-copy@ to import resources from HTTP URLs into Keep.
For projects, @arv-copy@ will copy all the collections workflow definitions owned by the project, and recursively copy subprojects.
We will use the uuid @jutro-j7d0g-xj19djofle3aryq@ as an example project.
<notextile>
-<pre><code>~$ <span class="userinput">peteramstutz@shell:~$ arv-copy --project-uuid pirca-j7d0g-lr8sq3tx3ovn68k jutro-j7d0g-xj19djofle3aryq
+<pre><code>~$ <span class="userinput">~$ arv-copy --project-uuid pirca-j7d0g-lr8sq3tx3ovn68k jutro-j7d0g-xj19djofle3aryq
2021-09-08 21:29:32 arvados.arv-copy[6377] INFO:
2021-09-08 21:29:32 arvados.arv-copy[6377] INFO: Success: created copy with uuid pirca-j7d0g-ig9gvu5piznducp
</code></pre>
The name and description of the original project will be used for the destination copy. If a project already exists with the same name, collections and workflow definitions will be copied into the project with the same name.
If you would like to copy the project but not its subproject, you can use the @--no-recursive@ flag.
+
+h3. Importing HTTP resources to Keep
+
+You can also use @arv-copy@ to copy the contents of a HTTP URL into Keep. When you do this, Arvados keeps track of the original URL the resource came from. This allows you to refer to the resource by its original URL in Workflow inputs, but actually read from the local copy in Keep.
+
+<notextile>
+<pre><code>~$ <span class="userinput">~$ arv-copy --project-uuid tordo-j7d0g-lr8sq3tx3ovn68k https://example.com/index.html
+tordo-4zz18-dhpb6y9km2byb94
+2023-10-06 10:15:36 arvados.arv-copy[374147] INFO: Success: created copy with uuid tordo-4zz18-dhpb6y9km2byb94
+</code></pre>
+</notextile>
+
+In addition, when importing from HTTP URLs, you may provide a different cluster than the destination in @--src@. This tells @arv-copy@ to search the other cluster for a collection associated with that URL, and if found, copy the collection from that cluster instead of downloading from the original URL.
+
+The following @arv-copy@ command line options affect the behavior of HTTP import.
+
+table(table table-bordered table-condensed).
+|_. Option |_. Description |
+|==--varying-url-params== VARYING_URL_PARAMS|A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.|
+|==--prefer-cached-downloads==|If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).|
# passthrough, we'll download it later.
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
else:
- keepref = "keep:%s/%s" % http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+ results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
+ keepref = "keep:%s/%s" % (results[0], results[1])
logger.info("%s is %s", src, keepref)
self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
except Exception as e:
import tempfile
import urllib.parse
import io
+import json
+import queue
+import threading
import arvados
import arvados.config
import arvados.util
import arvados.commands._util as arv_cmd
import arvados.commands.keepdocker
+import arvados.http_to_keep
import ruamel.yaml as yaml
from arvados.api import OrderedJsonModel
copy_opts.add_argument(
'--storage-classes', dest='storage_classes',
help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
+ copy_opts.add_argument("--varying-url-params", type=str, default="",
+ help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+ copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+ help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
copy_opts.add_argument(
'object_uuid',
else:
logger.setLevel(logging.INFO)
- if not args.source_arvados:
+ if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
args.source_arvados = args.object_uuid[:5]
# Create API clients for the source and destination instances
# Identify the kind of object we have been given, and begin copying.
t = uuid_type(src_arv, args.object_uuid)
- if t == 'Collection':
- set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
- result = copy_collection(args.object_uuid,
- src_arv, dst_arv,
- args)
- elif t == 'Workflow':
- set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
- result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
- elif t == 'Group':
- set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
- result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
- else:
- abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+
+ try:
+ if t == 'Collection':
+ set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
+ result = copy_collection(args.object_uuid,
+ src_arv, dst_arv,
+ args)
+ elif t == 'Workflow':
+ set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
+ result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
+ elif t == 'Group':
+ set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
+ result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
+ elif t == 'httpURL':
+ result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
+ else:
+ abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+ except Exception as e:
+ logger.error("%s", e, exc_info=args.verbose)
+ exit(1)
# Clean up any outstanding temp git repositories.
for d in listvalues(local_repo_dir):
# If no exception was thrown and the response does not have an
# error_token field, presume success
- if 'error_token' in result or 'uuid' not in result:
- logger.error("API server returned an error result: {}".format(result))
+ if result is None or 'error_token' in result or 'uuid' not in result:
+ if result:
+ logger.error("API server returned an error result: {}".format(result))
exit(1)
print(result['uuid'])
else:
progress_writer = None
+ # go through the words
+ # put each block loc into 'get' queue
+ # 'get' threads get block and put it into 'put' queue
+ # 'put' threads put block and then update dst_locators
+ #
+ # after going through the whole manifest we go back through it
+ # again and build dst_manifest
+
+ lock = threading.Lock()
+
+ # the get queue should be unbounded because we'll add all the
+ # block hashes we want to get, but these are small
+ get_queue = queue.Queue()
+
+ threadcount = 4
+
+ # the put queue contains full data blocks
+ # and if 'get' is faster than 'put' we could end up consuming
+ # a great deal of RAM if it isn't bounded.
+ put_queue = queue.Queue(threadcount)
+ transfer_error = []
+
+ def get_thread():
+ while True:
+ word = get_queue.get()
+ if word is None:
+ put_queue.put(None)
+ get_queue.task_done()
+ return
+
+ blockhash = arvados.KeepLocator(word).md5sum
+ with lock:
+ if blockhash in dst_locators:
+ # Already uploaded
+ get_queue.task_done()
+ continue
+
+ try:
+ logger.debug("Getting block %s", word)
+ data = src_keep.get(word)
+ put_queue.put((word, data))
+ except e:
+ logger.error("Error getting block %s: %s", word, e)
+ transfer_error.append(e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ get_queue.task_done()
+ except queue.Empty:
+ pass
+ finally:
+ get_queue.task_done()
+
+ def put_thread():
+ nonlocal bytes_written
+ while True:
+ item = put_queue.get()
+ if item is None:
+ put_queue.task_done()
+ return
+
+ word, data = item
+ loc = arvados.KeepLocator(word)
+ blockhash = loc.md5sum
+ with lock:
+ if blockhash in dst_locators:
+ # Already uploaded
+ put_queue.task_done()
+ continue
+
+ try:
+ logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+ dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+ with lock:
+ dst_locators[blockhash] = dst_locator
+ bytes_written += loc.size
+ if progress_writer:
+ progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+ except e:
+ logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ get_queue.task_done()
+ except queue.Empty:
+ pass
+ transfer_error.append(e)
+ finally:
+ put_queue.task_done()
+
+ for line in manifest.splitlines():
+ words = line.split()
+ for word in words[1:]:
+ try:
+ loc = arvados.KeepLocator(word)
+ except ValueError:
+ # If 'word' can't be parsed as a locator,
+ # presume it's a filename.
+ continue
+
+ get_queue.put(word)
+
+ for i in range(0, threadcount):
+ get_queue.put(None)
+
+ for i in range(0, threadcount):
+ threading.Thread(target=get_thread, daemon=True).start()
+
+ for i in range(0, threadcount):
+ threading.Thread(target=put_thread, daemon=True).start()
+
+ get_queue.join()
+ put_queue.join()
+
+ if len(transfer_error) > 0:
+ return {"error_token": "Failed to transfer blocks"}
+
for line in manifest.splitlines():
words = line.split()
dst_manifest.write(words[0])
dst_manifest.write(word)
continue
blockhash = loc.md5sum
- # copy this block if we haven't seen it before
- # (otherwise, just reuse the existing dst_locator)
- if blockhash not in dst_locators:
- logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
- if progress_writer:
- progress_writer.report(obj_uuid, bytes_written, bytes_expected)
- data = src_keep.get(word)
- dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
- dst_locators[blockhash] = dst_locator
- bytes_written += loc.size
dst_manifest.write(' ')
dst_manifest.write(dst_locators[blockhash])
dst_manifest.write("\n")
def uuid_type(api, object_uuid):
if re.match(arvados.util.keep_locator_pattern, object_uuid):
return 'Collection'
+
+ if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
+ return 'httpURL'
+
p = object_uuid.split('-')
if len(p) == 3:
type_prefix = p[1]
return k
return None
+
+def copy_from_http(url, src, dst, args):
+
+ project_uuid = args.project_uuid
+ varying_url_params = args.varying_url_params
+ prefer_cached_downloads = args.prefer_cached_downloads
+
+ cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
+ varying_url_params=varying_url_params,
+ prefer_cached_downloads=prefer_cached_downloads)
+ if cached[2] is not None:
+ return copy_collection(cached[2], src, dst, args)
+
+ cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
+ varying_url_params=varying_url_params,
+ prefer_cached_downloads=prefer_cached_downloads)
+
+ if cached is not None:
+ return {"uuid": cached[2]}
+
+
def abort(msg, code=1):
logger.info("arv-copy: %s", msg)
exit(code)
mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
code = int(mt.group(3))
+ if not self.name:
+ logger.error("Cannot determine filename from URL or headers")
+ return
+
if code == 200:
self.target = self.collection.open(self.name, "wb")
self._first_chunk = False
self.count += len(chunk)
+
+ if self.target is None:
+ # "If this number is not equal to the size of the byte
+ # string, this signifies an error and libcurl will abort
+ # the request."
+ return 0
+
self.target.write(chunk)
loopnow = time.time()
if (loopnow - self.checkpoint) < 20:
return '"' + etag + '"'
-def http_to_keep(api, project_uuid, url,
- utcnow=datetime.datetime.utcnow, varying_url_params="",
- prefer_cached_downloads=False):
- """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
-
- Before downloading the URL, checks to see if the URL already
- exists in Keep and applies HTTP caching policy, the
- varying_url_params and prefer_cached_downloads flags in order to
- decide whether to use the version in Keep or re-download it.
- """
+def check_cached_url(api, project_uuid, url, etags,
+ utcnow=datetime.datetime.utcnow,
+ varying_url_params="",
+ prefer_cached_downloads=False):
logger.info("Checking Keep for %s", url)
now = utcnow()
- etags = {}
-
curldownloader = _Downloader(api)
for item in items:
if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
# HTTP caching rules say we should use the cache
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
- return (item["portable_data_hash"], next(iter(cr.keys())) )
+ return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
if not _changed(cache_url, clean_url, properties, now, curldownloader):
# Etag didn't change, same content, just update headers
api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
- return (item["portable_data_hash"], next(iter(cr.keys())))
+ return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
for etagstr in ("Etag", "ETag"):
if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
logger.debug("Found ETag values %s", etags)
+ return (None, None, None, clean_url, now)
+
+
+def http_to_keep(api, project_uuid, url,
+ utcnow=datetime.datetime.utcnow, varying_url_params="",
+ prefer_cached_downloads=False):
+ """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
+
+ Before downloading the URL, checks to see if the URL already
+ exists in Keep and applies HTTP caching policy, the
+ varying_url_params and prefer_cached_downloads flags in order to
+ decide whether to use the version in Keep or re-download it.
+ """
+
+ etags = {}
+ cache_result = check_cached_url(api, project_uuid, url, etags,
+ utcnow, varying_url_params,
+ prefer_cached_downloads)
+
+ if cache_result[0] is not None:
+ return cache_result
+
+ clean_url = cache_result[3]
+ now = cache_result[4]
+
properties = {}
headers = {}
if etags:
logger.info("Beginning download of %s", url)
+ curldownloader = _Downloader(api)
+
req = curldownloader.download(url, headers)
c = curldownloader.collection
item["properties"].update(properties)
api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
- return (item["portable_data_hash"], list(cr.keys())[0])
+ return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now)
logger.info("Download complete")
api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
- return (c.portable_data_hash(), curldownloader.name)
+ return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)
utcnow.return_value = datetime.datetime(2018, 5, 15)
r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt",
+ 'zzzzz-4zz18-zzzzzzzzzzzzzz3', 'http://example.com/file1.txt',
+ datetime.datetime(2018, 5, 15, 0, 0)))
assert mockobj.url == b"http://example.com/file1.txt"
assert mockobj.perform_was_called is True
utcnow.return_value = datetime.datetime(2018, 5, 16)
r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt",
+ 'zzzzz-4zz18-zzzzzzzzzzzzzz3', 'http://example.com/file1.txt',
+ datetime.datetime(2018, 5, 16, 0, 0)))
assert mockobj.perform_was_called is False
utcnow.return_value = datetime.datetime(2018, 5, 16)
r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt", 'zzzzz-4zz18-zzzzzzzzzzzzzz3',
+ 'http://example.com/file1.txt', datetime.datetime(2018, 5, 16, 0, 0)))
assert mockobj.perform_was_called is False
utcnow.return_value = datetime.datetime(2018, 5, 17)
r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, ("99999999999999999999999999999997+99", "file1.txt"))
+ self.assertEqual(r, ("99999999999999999999999999999997+99", "file1.txt",
+ 'zzzzz-4zz18-zzzzzzzzzzzzzz4',
+ 'http://example.com/file1.txt', datetime.datetime(2018, 5, 17, 0, 0)))
+
assert mockobj.url == b"http://example.com/file1.txt"
assert mockobj.perform_was_called is True
utcnow.return_value = datetime.datetime(2018, 5, 17)
r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt",
+ 'zzzzz-4zz18-zzzzzzzzzzzzzz3', 'http://example.com/file1.txt',
+ datetime.datetime(2018, 5, 17, 0, 0)))
cm.open.assert_not_called()
utcnow.return_value = datetime.datetime(2018, 5, 15)
r = http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
- self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt",
+ 'zzzzz-4zz18-zzzzzzzzzzzzzz3',
+ 'http://example.com/download?fn=/file1.txt',
+ datetime.datetime(2018, 5, 15, 0, 0)))
assert mockobj.url == b"http://example.com/download?fn=/file1.txt"
utcnow.return_value = datetime.datetime(2018, 5, 17)
r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt",
+ 'zzzzz-4zz18-zzzzzzzzzzzzzz3', 'http://example.com/file1.txt',
+ datetime.datetime(2018, 5, 17, 0, 0)))
print(mockobj.req_headers)
assert mockobj.req_headers == ["Accept: application/octet-stream", "If-None-Match: \"123456\""]
utcnow.return_value = datetime.datetime(2018, 5, 17)
r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow, prefer_cached_downloads=True)
- self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt", 'zzzzz-4zz18-zzzzzzzzzzzzzz3',
+ 'http://example.com/file1.txt', datetime.datetime(2018, 5, 17, 0, 0)))
assert mockobj.perform_was_called is False
cm.open.assert_not_called()
r = http_to_keep(api, None, "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789",
utcnow=utcnow, varying_url_params="KeyId,Signature,Expires")
- self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt", 'zzzzz-4zz18-zzzzzzzzzzzzzz3',
+ 'http://example.com/file1.txt', datetime.datetime(2018, 5, 17, 0, 0)))
assert mockobj.perform_was_called is True
cm.open.assert_not_called()