ef3936e267ce803f48f1ba4bf537d06f446d90b1
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import subprocess
34 import sys
35 import logging
36 import tempfile
37 import urllib.parse
38 import io
39 import json
40 import queue
41 import threading
42
43 import arvados
44 import arvados.config
45 import arvados.keep
46 import arvados.util
47 import arvados.commands._util as arv_cmd
48 import arvados.commands.keepdocker
49 import arvados.http_to_keep
50 import ruamel.yaml as yaml
51
52 from arvados.api import OrderedJsonModel
53 from arvados._version import __version__
54
55 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
56
57 logger = logging.getLogger('arvados.arv-copy')
58
59 # local_repo_dir records which git repositories from the Arvados source
60 # instance have been checked out locally during this run, and to which
61 # directories.
62 # e.g. if repository 'twp' from src_arv has been cloned into
63 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
64 #
65 local_repo_dir = {}
66
67 # List of collections that have been copied in this session, and their
68 # destination collection UUIDs.
69 collections_copied = {}
70
71 # Set of (repository, script_version) two-tuples of commits copied in git.
72 scripts_copied = set()
73
74 # The owner_uuid of the object being copied
75 src_owner_uuid = None
76
77 def main():
78     copy_opts = argparse.ArgumentParser(add_help=False)
79
80     copy_opts.add_argument(
81         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
82         help='Print version and exit.')
83     copy_opts.add_argument(
84         '-v', '--verbose', dest='verbose', action='store_true',
85         help='Verbose output.')
86     copy_opts.add_argument(
87         '--progress', dest='progress', action='store_true',
88         help='Report progress on copying collections. (default)')
89     copy_opts.add_argument(
90         '--no-progress', dest='progress', action='store_false',
91         help='Do not report progress on copying collections.')
92     copy_opts.add_argument(
93         '-f', '--force', dest='force', action='store_true',
94         help='Perform copy even if the object appears to exist at the remote destination.')
95     copy_opts.add_argument(
96         '--src', dest='source_arvados',
97         help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
98     copy_opts.add_argument(
99         '--dst', dest='destination_arvados',
100         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
101     copy_opts.add_argument(
102         '--recursive', dest='recursive', action='store_true',
103         help='Recursively copy any dependencies for this object, and subprojects. (default)')
104     copy_opts.add_argument(
105         '--no-recursive', dest='recursive', action='store_false',
106         help='Do not copy any dependencies or subprojects.')
107     copy_opts.add_argument(
108         '--project-uuid', dest='project_uuid',
109         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
110     copy_opts.add_argument(
111         '--storage-classes', dest='storage_classes',
112         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
113     copy_opts.add_argument("--varying-url-params", type=str, default="",
114                         help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
115
116     copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
117                         help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
118
119     copy_opts.add_argument(
120         'object_uuid',
121         help='The UUID of the object to be copied.')
122     copy_opts.set_defaults(progress=True)
123     copy_opts.set_defaults(recursive=True)
124
125     parser = argparse.ArgumentParser(
126         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
127         parents=[copy_opts, arv_cmd.retry_opt])
128     args = parser.parse_args()
129
130     if args.storage_classes:
131         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
132
133     if args.verbose:
134         logger.setLevel(logging.DEBUG)
135     else:
136         logger.setLevel(logging.INFO)
137
138     if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
139         args.source_arvados = args.object_uuid[:5]
140
141     # Create API clients for the source and destination instances
142     src_arv = api_for_instance(args.source_arvados, args.retries)
143     dst_arv = api_for_instance(args.destination_arvados, args.retries)
144
145     if not args.project_uuid:
146         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
147
148     # Identify the kind of object we have been given, and begin copying.
149     t = uuid_type(src_arv, args.object_uuid)
150
151     try:
152         if t == 'Collection':
153             set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
154             result = copy_collection(args.object_uuid,
155                                      src_arv, dst_arv,
156                                      args)
157         elif t == 'Workflow':
158             set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
159             result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
160         elif t == 'Group':
161             set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
162             result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
163         elif t == 'httpURL':
164             result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
165         else:
166             abort("cannot copy object {} of type {}".format(args.object_uuid, t))
167     except Exception as e:
168         logger.error("%s", e, exc_info=args.verbose)
169         exit(1)
170
171     # Clean up any outstanding temp git repositories.
172     for d in listvalues(local_repo_dir):
173         shutil.rmtree(d, ignore_errors=True)
174
175     # If no exception was thrown and the response does not have an
176     # error_token field, presume success
177     if result is None or 'error_token' in result or 'uuid' not in result:
178         if result:
179             logger.error("API server returned an error result: {}".format(result))
180         exit(1)
181
182     print(result['uuid'])
183
184     if result.get('partial_error'):
185         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
186         exit(1)
187
188     logger.info("Success: created copy with uuid {}".format(result['uuid']))
189     exit(0)
190
191 def set_src_owner_uuid(resource, uuid, args):
192     global src_owner_uuid
193     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
194     src_owner_uuid = c.get("owner_uuid")
195
196 # api_for_instance(instance_name)
197 #
198 #     Creates an API client for the Arvados instance identified by
199 #     instance_name.
200 #
201 #     If instance_name contains a slash, it is presumed to be a path
202 #     (either local or absolute) to a file with Arvados configuration
203 #     settings.
204 #
205 #     Otherwise, it is presumed to be the name of a file in
206 #     $HOME/.config/arvados/instance_name.conf
207 #
208 def api_for_instance(instance_name, num_retries):
209     if not instance_name:
210         # Use environment
211         return arvados.api('v1', model=OrderedJsonModel())
212
213     if '/' in instance_name:
214         config_file = instance_name
215     else:
216         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
217
218     try:
219         cfg = arvados.config.load(config_file)
220     except (IOError, OSError) as e:
221         abort(("Could not open config file {}: {}\n" +
222                "You must make sure that your configuration tokens\n" +
223                "for Arvados instance {} are in {} and that this\n" +
224                "file is readable.").format(
225                    config_file, e, instance_name, config_file))
226
227     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
228         api_is_insecure = (
229             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
230                 ['1', 't', 'true', 'y', 'yes']))
231         client = arvados.api('v1',
232                              host=cfg['ARVADOS_API_HOST'],
233                              token=cfg['ARVADOS_API_TOKEN'],
234                              insecure=api_is_insecure,
235                              model=OrderedJsonModel(),
236                              num_retries=num_retries,
237                              )
238     else:
239         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
240     return client
241
242 # Check if git is available
243 def check_git_availability():
244     try:
245         subprocess.run(
246             ['git', '--version'],
247             check=True,
248             stdout=subprocess.DEVNULL,
249         )
250     except FileNotFoundError:
251         abort('git command is not available. Please ensure git is installed.')
252
253
254 def filter_iter(arg):
255     """Iterate a filter string-or-list.
256
257     Pass in a filter field that can either be a string or list.
258     This will iterate elements as if the field had been written as a list.
259     """
260     if isinstance(arg, basestring):
261         return iter((arg,))
262     else:
263         return iter(arg)
264
265 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
266     """Update a single repository filter in-place for the destination.
267
268     If the filter checks that the repository is src_repository, it is
269     updated to check that the repository is dst_repository.  If it does
270     anything else, this function raises ValueError.
271     """
272     if src_repository is None:
273         raise ValueError("component does not specify a source repository")
274     elif dst_repository is None:
275         raise ValueError("no destination repository specified to update repository filter")
276     elif repo_filter[1:] == ['=', src_repository]:
277         repo_filter[2] = dst_repository
278     elif repo_filter[1:] == ['in', [src_repository]]:
279         repo_filter[2] = [dst_repository]
280     else:
281         raise ValueError("repository filter is not a simple source match")
282
283 def migrate_script_version_filter(version_filter):
284     """Update a single script_version filter in-place for the destination.
285
286     Currently this function checks that all the filter operands are Git
287     commit hashes.  If they're not, it raises ValueError to indicate that
288     the filter is not portable.  It could be extended to make other
289     transformations in the future.
290     """
291     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
292         raise ValueError("script_version filter is not limited to commit hashes")
293
294 def attr_filtered(filter_, *attr_names):
295     """Return True if filter_ applies to any of attr_names, else False."""
296     return any((name == 'any') or (name in attr_names)
297                for name in filter_iter(filter_[0]))
298
299 @contextlib.contextmanager
300 def exception_handler(handler, *exc_types):
301     """If any exc_types are raised in the block, call handler on the exception."""
302     try:
303         yield
304     except exc_types as error:
305         handler(error)
306
307
308 # copy_workflow(wf_uuid, src, dst, args)
309 #
310 #    Copies a workflow identified by wf_uuid from src to dst.
311 #
312 #    If args.recursive is True, also copy any collections
313 #      referenced in the workflow definition yaml.
314 #
315 #    The owner_uuid of the new workflow is set to any given
316 #      project_uuid or the user who copied the template.
317 #
318 #    Returns the copied workflow object.
319 #
320 def copy_workflow(wf_uuid, src, dst, args):
321     # fetch the workflow from the source instance
322     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
323
324     if not wf["definition"]:
325         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
326
327     # copy collections and docker images
328     if args.recursive and wf["definition"]:
329         wf_def = yaml.safe_load(wf["definition"])
330         if wf_def is not None:
331             locations = []
332             docker_images = {}
333             graph = wf_def.get('$graph', None)
334             if graph is not None:
335                 workflow_collections(graph, locations, docker_images)
336             else:
337                 workflow_collections(wf_def, locations, docker_images)
338
339             if locations:
340                 copy_collections(locations, src, dst, args)
341
342             for image in docker_images:
343                 copy_docker_image(image, docker_images[image], src, dst, args)
344
345     # copy the workflow itself
346     del wf['uuid']
347     wf['owner_uuid'] = args.project_uuid
348
349     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
350                                              ["name", "=", wf["name"]]]).execute()
351     if len(existing["items"]) == 0:
352         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
353     else:
354         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
355
356
357 def workflow_collections(obj, locations, docker_images):
358     if isinstance(obj, dict):
359         loc = obj.get('location', None)
360         if loc is not None:
361             if loc.startswith("keep:"):
362                 locations.append(loc[5:])
363
364         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
365         if docker_image is not None:
366             ds = docker_image.split(":", 1)
367             tag = ds[1] if len(ds)==2 else 'latest'
368             docker_images[ds[0]] = tag
369
370         for x in obj:
371             workflow_collections(obj[x], locations, docker_images)
372     elif isinstance(obj, list):
373         for x in obj:
374             workflow_collections(x, locations, docker_images)
375
376 # copy_collections(obj, src, dst, args)
377 #
378 #    Recursively copies all collections referenced by 'obj' from src
379 #    to dst.  obj may be a dict or a list, in which case we run
380 #    copy_collections on every value it contains. If it is a string,
381 #    search it for any substring that matches a collection hash or uuid
382 #    (this will find hidden references to collections like
383 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
384 #
385 #    Returns a copy of obj with any old collection uuids replaced by
386 #    the new ones.
387 #
388 def copy_collections(obj, src, dst, args):
389
390     def copy_collection_fn(collection_match):
391         """Helper function for regex substitution: copies a single collection,
392         identified by the collection_match MatchObject, to the
393         destination.  Returns the destination collection uuid (or the
394         portable data hash if that's what src_id is).
395
396         """
397         src_id = collection_match.group(0)
398         if src_id not in collections_copied:
399             dst_col = copy_collection(src_id, src, dst, args)
400             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
401                 collections_copied[src_id] = src_id
402             else:
403                 collections_copied[src_id] = dst_col['uuid']
404         return collections_copied[src_id]
405
406     if isinstance(obj, basestring):
407         # Copy any collections identified in this string to dst, replacing
408         # them with the dst uuids as necessary.
409         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
410         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
411         return obj
412     elif isinstance(obj, dict):
413         return type(obj)((v, copy_collections(obj[v], src, dst, args))
414                          for v in obj)
415     elif isinstance(obj, list):
416         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
417     return obj
418
419
420 def total_collection_size(manifest_text):
421     """Return the total number of bytes in this collection (excluding
422     duplicate blocks)."""
423
424     total_bytes = 0
425     locators_seen = {}
426     for line in manifest_text.splitlines():
427         words = line.split()
428         for word in words[1:]:
429             try:
430                 loc = arvados.KeepLocator(word)
431             except ValueError:
432                 continue  # this word isn't a locator, skip it
433             if loc.md5sum not in locators_seen:
434                 locators_seen[loc.md5sum] = True
435                 total_bytes += loc.size
436
437     return total_bytes
438
439 def create_collection_from(c, src, dst, args):
440     """Create a new collection record on dst, and copy Docker metadata if
441     available."""
442
443     collection_uuid = c['uuid']
444     body = {}
445     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
446         body[d] = c[d]
447
448     if not body["name"]:
449         body['name'] = "copied from " + collection_uuid
450
451     if args.storage_classes:
452         body['storage_classes_desired'] = args.storage_classes
453
454     body['owner_uuid'] = args.project_uuid
455
456     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
457
458     # Create docker_image_repo+tag and docker_image_hash links
459     # at the destination.
460     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
461         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
462
463         for src_link in docker_links:
464             body = {key: src_link[key]
465                     for key in ['link_class', 'name', 'properties']}
466             body['head_uuid'] = dst_collection['uuid']
467             body['owner_uuid'] = args.project_uuid
468
469             lk = dst.links().create(body=body).execute(num_retries=args.retries)
470             logger.debug('created dst link {}'.format(lk))
471
472     return dst_collection
473
474 # copy_collection(obj_uuid, src, dst, args)
475 #
476 #    Copies the collection identified by obj_uuid from src to dst.
477 #    Returns the collection object created at dst.
478 #
479 #    If args.progress is True, produce a human-friendly progress
480 #    report.
481 #
482 #    If a collection with the desired portable_data_hash already
483 #    exists at dst, and args.force is False, copy_collection returns
484 #    the existing collection without copying any blocks.  Otherwise
485 #    (if no collection exists or if args.force is True)
486 #    copy_collection copies all of the collection data blocks from src
487 #    to dst.
488 #
489 #    For this application, it is critical to preserve the
490 #    collection's manifest hash, which is not guaranteed with the
491 #    arvados.CollectionReader and arvados.CollectionWriter classes.
492 #    Copying each block in the collection manually, followed by
493 #    the manifest block, ensures that the collection's manifest
494 #    hash will not change.
495 #
496 def copy_collection(obj_uuid, src, dst, args):
497     if arvados.util.keep_locator_pattern.match(obj_uuid):
498         # If the obj_uuid is a portable data hash, it might not be
499         # uniquely identified with a particular collection.  As a
500         # result, it is ambiguous as to what name to use for the copy.
501         # Apply some heuristics to pick which collection to get the
502         # name from.
503         srccol = src.collections().list(
504             filters=[['portable_data_hash', '=', obj_uuid]],
505             order="created_at asc"
506             ).execute(num_retries=args.retries)
507
508         items = srccol.get("items")
509
510         if not items:
511             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
512             return
513
514         c = None
515
516         if len(items) == 1:
517             # There's only one collection with the PDH, so use that.
518             c = items[0]
519         if not c:
520             # See if there is a collection that's in the same project
521             # as the root item (usually a workflow) being copied.
522             for i in items:
523                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
524                     c = i
525                     break
526         if not c:
527             # Didn't find any collections located in the same project, so
528             # pick the oldest collection that has a name assigned to it.
529             for i in items:
530                 if i.get("name"):
531                     c = i
532                     break
533         if not c:
534             # None of the collections have names (?!), so just pick the
535             # first one.
536             c = items[0]
537
538         # list() doesn't return manifest text (and we don't want it to,
539         # because we don't need the same maninfest text sent to us 50
540         # times) so go and retrieve the collection object directly
541         # which will include the manifest text.
542         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
543     else:
544         # Assume this is an actual collection uuid, so fetch it directly.
545         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
546
547     # If a collection with this hash already exists at the
548     # destination, and 'force' is not true, just return that
549     # collection.
550     if not args.force:
551         if 'portable_data_hash' in c:
552             colhash = c['portable_data_hash']
553         else:
554             colhash = c['uuid']
555         dstcol = dst.collections().list(
556             filters=[['portable_data_hash', '=', colhash]]
557         ).execute(num_retries=args.retries)
558         if dstcol['items_available'] > 0:
559             for d in dstcol['items']:
560                 if ((args.project_uuid == d['owner_uuid']) and
561                     (c.get('name') == d['name']) and
562                     (c['portable_data_hash'] == d['portable_data_hash'])):
563                     return d
564             c['manifest_text'] = dst.collections().get(
565                 uuid=dstcol['items'][0]['uuid']
566             ).execute(num_retries=args.retries)['manifest_text']
567             return create_collection_from(c, src, dst, args)
568
569     # Fetch the collection's manifest.
570     manifest = c['manifest_text']
571     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
572
573     # Copy each block from src_keep to dst_keep.
574     # Use the newly signed locators returned from dst_keep to build
575     # a new manifest as we go.
576     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
577     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
578     dst_manifest = io.StringIO()
579     dst_locators = {}
580     bytes_written = 0
581     bytes_expected = total_collection_size(manifest)
582     if args.progress:
583         progress_writer = ProgressWriter(human_progress)
584     else:
585         progress_writer = None
586
587     # go through the words
588     # put each block loc into 'get' queue
589     # 'get' threads get block and put it into 'put' queue
590     # 'put' threads put block and then update dst_locators
591     #
592     # after going through the whole manifest we go back through it
593     # again and build dst_manifest
594
595     lock = threading.Lock()
596
597     # the get queue should be unbounded because we'll add all the
598     # block hashes we want to get, but these are small
599     get_queue = queue.Queue()
600
601     threadcount = 4
602
603     # the put queue contains full data blocks
604     # and if 'get' is faster than 'put' we could end up consuming
605     # a great deal of RAM if it isn't bounded.
606     put_queue = queue.Queue(threadcount)
607     transfer_error = []
608
609     def get_thread():
610         while True:
611             word = get_queue.get()
612             if word is None:
613                 put_queue.put(None)
614                 get_queue.task_done()
615                 return
616
617             blockhash = arvados.KeepLocator(word).md5sum
618             with lock:
619                 if blockhash in dst_locators:
620                     # Already uploaded
621                     get_queue.task_done()
622                     continue
623
624             try:
625                 logger.debug("Getting block %s", word)
626                 data = src_keep.get(word)
627                 put_queue.put((word, data))
628             except e:
629                 logger.error("Error getting block %s: %s", word, e)
630                 transfer_error.append(e)
631                 try:
632                     # Drain the 'get' queue so we end early
633                     while True:
634                         get_queue.get(False)
635                         get_queue.task_done()
636                 except queue.Empty:
637                     pass
638             finally:
639                 get_queue.task_done()
640
641     def put_thread():
642         nonlocal bytes_written
643         while True:
644             item = put_queue.get()
645             if item is None:
646                 put_queue.task_done()
647                 return
648
649             word, data = item
650             loc = arvados.KeepLocator(word)
651             blockhash = loc.md5sum
652             with lock:
653                 if blockhash in dst_locators:
654                     # Already uploaded
655                     put_queue.task_done()
656                     continue
657
658             try:
659                 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
660                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
661                 with lock:
662                     dst_locators[blockhash] = dst_locator
663                     bytes_written += loc.size
664                     if progress_writer:
665                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
666             except e:
667                 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
668                 try:
669                     # Drain the 'get' queue so we end early
670                     while True:
671                         get_queue.get(False)
672                         get_queue.task_done()
673                 except queue.Empty:
674                     pass
675                 transfer_error.append(e)
676             finally:
677                 put_queue.task_done()
678
679     for line in manifest.splitlines():
680         words = line.split()
681         for word in words[1:]:
682             try:
683                 loc = arvados.KeepLocator(word)
684             except ValueError:
685                 # If 'word' can't be parsed as a locator,
686                 # presume it's a filename.
687                 continue
688
689             get_queue.put(word)
690
691     for i in range(0, threadcount):
692         get_queue.put(None)
693
694     for i in range(0, threadcount):
695         threading.Thread(target=get_thread, daemon=True).start()
696
697     for i in range(0, threadcount):
698         threading.Thread(target=put_thread, daemon=True).start()
699
700     get_queue.join()
701     put_queue.join()
702
703     if len(transfer_error) > 0:
704         return {"error_token": "Failed to transfer blocks"}
705
706     for line in manifest.splitlines():
707         words = line.split()
708         dst_manifest.write(words[0])
709         for word in words[1:]:
710             try:
711                 loc = arvados.KeepLocator(word)
712             except ValueError:
713                 # If 'word' can't be parsed as a locator,
714                 # presume it's a filename.
715                 dst_manifest.write(' ')
716                 dst_manifest.write(word)
717                 continue
718             blockhash = loc.md5sum
719             dst_manifest.write(' ')
720             dst_manifest.write(dst_locators[blockhash])
721         dst_manifest.write("\n")
722
723     if progress_writer:
724         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
725         progress_writer.finish()
726
727     # Copy the manifest and save the collection.
728     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
729
730     c['manifest_text'] = dst_manifest.getvalue()
731     return create_collection_from(c, src, dst, args)
732
733 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
734     r = api.repositories().list(
735         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
736     if r['items_available'] != 1:
737         raise Exception('cannot identify repo {}; {} repos found'
738                         .format(repo_name, r['items_available']))
739
740     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
741     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
742     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
743
744     priority = https_url + other_url + http_url
745
746     for url in priority:
747         if url.startswith("http"):
748             u = urllib.parse.urlsplit(url)
749             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
750             git_config = ["-c", "credential.%s/.username=none" % baseurl,
751                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
752         else:
753             git_config = []
754
755         try:
756             logger.debug("trying %s", url)
757             subprocess.run(
758                 ['git', *git_config, 'ls-remote', url],
759                 check=True,
760                 env={
761                     'ARVADOS_API_TOKEN': api.api_token,
762                     'GIT_ASKPASS': '/bin/false',
763                     'HOME': os.environ['HOME'],
764                 },
765                 stdout=subprocess.DEVNULL,
766             )
767         except subprocess.CalledProcessError:
768             pass
769         else:
770             git_url = url
771             break
772     else:
773         raise Exception('Cannot access git repository, tried {}'
774                         .format(priority))
775
776     if git_url.startswith("http:"):
777         if allow_insecure_http:
778             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
779         else:
780             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
781
782     return (git_url, git_config)
783
784
785 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
786     """Copy the docker image identified by docker_image and
787     docker_image_tag from src to dst. Create appropriate
788     docker_image_repo+tag and docker_image_hash links at dst.
789
790     """
791
792     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
793
794     # Find the link identifying this docker image.
795     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
796         src, args.retries, docker_image, docker_image_tag)
797     if docker_image_list:
798         image_uuid, image_info = docker_image_list[0]
799         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
800
801         # Copy the collection it refers to.
802         dst_image_col = copy_collection(image_uuid, src, dst, args)
803     elif arvados.util.keep_locator_pattern.match(docker_image):
804         dst_image_col = copy_collection(docker_image, src, dst, args)
805     else:
806         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
807
808 def copy_project(obj_uuid, src, dst, owner_uuid, args):
809
810     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
811
812     # Create/update the destination project
813     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
814                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
815     if len(existing["items"]) == 0:
816         project_record = dst.groups().create(body={"group": {"group_class": "project",
817                                                              "owner_uuid": owner_uuid,
818                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
819     else:
820         project_record = existing["items"][0]
821
822     dst.groups().update(uuid=project_record["uuid"],
823                         body={"group": {
824                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
825
826     args.project_uuid = project_record["uuid"]
827
828     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
829
830
831     partial_error = ""
832
833     # Copy collections
834     try:
835         copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
836                          src, dst, args)
837     except Exception as e:
838         partial_error += "\n" + str(e)
839
840     # Copy workflows
841     for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
842         try:
843             copy_workflow(w["uuid"], src, dst, args)
844         except Exception as e:
845             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
846
847     if args.recursive:
848         for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
849             try:
850                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
851             except Exception as e:
852                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
853
854     project_record["partial_error"] = partial_error
855
856     return project_record
857
858 # git_rev_parse(rev, repo)
859 #
860 #    Returns the 40-character commit hash corresponding to 'rev' in
861 #    git repository 'repo' (which must be the path of a local git
862 #    repository)
863 #
864 def git_rev_parse(rev, repo):
865     proc = subprocess.run(
866         ['git', 'rev-parse', rev],
867         check=True,
868         cwd=repo,
869         stdout=subprocess.PIPE,
870         text=True,
871     )
872     return proc.stdout.read().strip()
873
874 # uuid_type(api, object_uuid)
875 #
876 #    Returns the name of the class that object_uuid belongs to, based on
877 #    the second field of the uuid.  This function consults the api's
878 #    schema to identify the object class.
879 #
880 #    It returns a string such as 'Collection', 'Workflow', etc.
881 #
882 #    Special case: if handed a Keep locator hash, return 'Collection'.
883 #
884 def uuid_type(api, object_uuid):
885     if re.match(arvados.util.keep_locator_pattern, object_uuid):
886         return 'Collection'
887
888     if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
889         return 'httpURL'
890
891     p = object_uuid.split('-')
892     if len(p) == 3:
893         type_prefix = p[1]
894         for k in api._schema.schemas:
895             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
896             if type_prefix == obj_class:
897                 return k
898     return None
899
900
901 def copy_from_http(url, src, dst, args):
902
903     project_uuid = args.project_uuid
904     varying_url_params = args.varying_url_params
905     prefer_cached_downloads = args.prefer_cached_downloads
906
907     cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
908                                                    varying_url_params=varying_url_params,
909                                                    prefer_cached_downloads=prefer_cached_downloads)
910     if cached[2] is not None:
911         return copy_collection(cached[2], src, dst, args)
912
913     cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
914                                                varying_url_params=varying_url_params,
915                                                prefer_cached_downloads=prefer_cached_downloads)
916
917     if cached is not None:
918         return {"uuid": cached[2]}
919
920
921 def abort(msg, code=1):
922     logger.info("arv-copy: %s", msg)
923     exit(code)
924
925
926 # Code for reporting on the progress of a collection upload.
927 # Stolen from arvados.commands.put.ArvPutCollectionWriter
928 # TODO(twp): figure out how to refactor into a shared library
929 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
930 # code)
931
932 def machine_progress(obj_uuid, bytes_written, bytes_expected):
933     return "{} {}: {} {} written {} total\n".format(
934         sys.argv[0],
935         os.getpid(),
936         obj_uuid,
937         bytes_written,
938         -1 if (bytes_expected is None) else bytes_expected)
939
940 def human_progress(obj_uuid, bytes_written, bytes_expected):
941     if bytes_expected:
942         return "\r{}: {}M / {}M {:.1%} ".format(
943             obj_uuid,
944             bytes_written >> 20, bytes_expected >> 20,
945             float(bytes_written) / bytes_expected)
946     else:
947         return "\r{}: {} ".format(obj_uuid, bytes_written)
948
949 class ProgressWriter(object):
950     _progress_func = None
951     outfile = sys.stderr
952
953     def __init__(self, progress_func):
954         self._progress_func = progress_func
955
956     def report(self, obj_uuid, bytes_written, bytes_expected):
957         if self._progress_func is not None:
958             self.outfile.write(
959                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
960
961     def finish(self):
962         self.outfile.write("\n")
963
964 if __name__ == '__main__':
965     main()