20937: Simplify use of logger.error
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import subprocess
34 import sys
35 import logging
36 import tempfile
37 import urllib.parse
38 import io
39 import json
40 import queue
41 import threading
42
43 import arvados
44 import arvados.config
45 import arvados.keep
46 import arvados.util
47 import arvados.commands._util as arv_cmd
48 import arvados.commands.keepdocker
49 import arvados.http_to_keep
50 import ruamel.yaml as yaml
51
52 from arvados._version import __version__
53
54 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
55
56 logger = logging.getLogger('arvados.arv-copy')
57
58 # local_repo_dir records which git repositories from the Arvados source
59 # instance have been checked out locally during this run, and to which
60 # directories.
61 # e.g. if repository 'twp' from src_arv has been cloned into
62 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
63 #
64 local_repo_dir = {}
65
66 # List of collections that have been copied in this session, and their
67 # destination collection UUIDs.
68 collections_copied = {}
69
70 # Set of (repository, script_version) two-tuples of commits copied in git.
71 scripts_copied = set()
72
73 # The owner_uuid of the object being copied
74 src_owner_uuid = None
75
76 def main():
77     copy_opts = argparse.ArgumentParser(add_help=False)
78
79     copy_opts.add_argument(
80         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
81         help='Print version and exit.')
82     copy_opts.add_argument(
83         '-v', '--verbose', dest='verbose', action='store_true',
84         help='Verbose output.')
85     copy_opts.add_argument(
86         '--progress', dest='progress', action='store_true',
87         help='Report progress on copying collections. (default)')
88     copy_opts.add_argument(
89         '--no-progress', dest='progress', action='store_false',
90         help='Do not report progress on copying collections.')
91     copy_opts.add_argument(
92         '-f', '--force', dest='force', action='store_true',
93         help='Perform copy even if the object appears to exist at the remote destination.')
94     copy_opts.add_argument(
95         '--src', dest='source_arvados',
96         help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
97     copy_opts.add_argument(
98         '--dst', dest='destination_arvados',
99         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
100     copy_opts.add_argument(
101         '--recursive', dest='recursive', action='store_true',
102         help='Recursively copy any dependencies for this object, and subprojects. (default)')
103     copy_opts.add_argument(
104         '--no-recursive', dest='recursive', action='store_false',
105         help='Do not copy any dependencies or subprojects.')
106     copy_opts.add_argument(
107         '--project-uuid', dest='project_uuid',
108         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
109     copy_opts.add_argument(
110         '--storage-classes', dest='storage_classes',
111         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
112     copy_opts.add_argument("--varying-url-params", type=str, default="",
113                         help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
114
115     copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
116                         help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
117
118     copy_opts.add_argument(
119         'object_uuid',
120         help='The UUID of the object to be copied.')
121     copy_opts.set_defaults(progress=True)
122     copy_opts.set_defaults(recursive=True)
123
124     parser = argparse.ArgumentParser(
125         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
126         parents=[copy_opts, arv_cmd.retry_opt])
127     args = parser.parse_args()
128
129     if args.storage_classes:
130         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
131
132     if args.verbose:
133         logger.setLevel(logging.DEBUG)
134     else:
135         logger.setLevel(logging.INFO)
136
137     if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
138         args.source_arvados = args.object_uuid[:5]
139
140     # Create API clients for the source and destination instances
141     src_arv = api_for_instance(args.source_arvados, args.retries)
142     dst_arv = api_for_instance(args.destination_arvados, args.retries)
143
144     if not args.project_uuid:
145         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
146
147     # Identify the kind of object we have been given, and begin copying.
148     t = uuid_type(src_arv, args.object_uuid)
149
150     try:
151         if t == 'Collection':
152             set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
153             result = copy_collection(args.object_uuid,
154                                      src_arv, dst_arv,
155                                      args)
156         elif t == 'Workflow':
157             set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
158             result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
159         elif t == 'Group':
160             set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
161             result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
162         elif t == 'httpURL':
163             result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
164         else:
165             abort("cannot copy object {} of type {}".format(args.object_uuid, t))
166     except Exception as e:
167         logger.error("%s", e, exc_info=args.verbose)
168         exit(1)
169
170     # Clean up any outstanding temp git repositories.
171     for d in listvalues(local_repo_dir):
172         shutil.rmtree(d, ignore_errors=True)
173
174     # If no exception was thrown and the response does not have an
175     # error_token field, presume success
176     if result is None or 'error_token' in result or 'uuid' not in result:
177         if result:
178             logger.error("API server returned an error result: {}".format(result))
179         exit(1)
180
181     print(result['uuid'])
182
183     if result.get('partial_error'):
184         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
185         exit(1)
186
187     logger.info("Success: created copy with uuid {}".format(result['uuid']))
188     exit(0)
189
190 def set_src_owner_uuid(resource, uuid, args):
191     global src_owner_uuid
192     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
193     src_owner_uuid = c.get("owner_uuid")
194
195 # api_for_instance(instance_name)
196 #
197 #     Creates an API client for the Arvados instance identified by
198 #     instance_name.
199 #
200 #     If instance_name contains a slash, it is presumed to be a path
201 #     (either local or absolute) to a file with Arvados configuration
202 #     settings.
203 #
204 #     Otherwise, it is presumed to be the name of a file in
205 #     $HOME/.config/arvados/instance_name.conf
206 #
207 def api_for_instance(instance_name, num_retries):
208     if not instance_name:
209         # Use environment
210         return arvados.api('v1')
211
212     if '/' in instance_name:
213         config_file = instance_name
214     else:
215         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
216
217     try:
218         cfg = arvados.config.load(config_file)
219     except (IOError, OSError) as e:
220         abort(("Could not open config file {}: {}\n" +
221                "You must make sure that your configuration tokens\n" +
222                "for Arvados instance {} are in {} and that this\n" +
223                "file is readable.").format(
224                    config_file, e, instance_name, config_file))
225
226     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
227         api_is_insecure = (
228             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
229                 ['1', 't', 'true', 'y', 'yes']))
230         client = arvados.api('v1',
231                              host=cfg['ARVADOS_API_HOST'],
232                              token=cfg['ARVADOS_API_TOKEN'],
233                              insecure=api_is_insecure,
234                              num_retries=num_retries,
235                              )
236     else:
237         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
238     return client
239
240 # Check if git is available
241 def check_git_availability():
242     try:
243         subprocess.run(
244             ['git', '--version'],
245             check=True,
246             stdout=subprocess.DEVNULL,
247         )
248     except FileNotFoundError:
249         abort('git command is not available. Please ensure git is installed.')
250
251
252 def filter_iter(arg):
253     """Iterate a filter string-or-list.
254
255     Pass in a filter field that can either be a string or list.
256     This will iterate elements as if the field had been written as a list.
257     """
258     if isinstance(arg, basestring):
259         return iter((arg,))
260     else:
261         return iter(arg)
262
263 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
264     """Update a single repository filter in-place for the destination.
265
266     If the filter checks that the repository is src_repository, it is
267     updated to check that the repository is dst_repository.  If it does
268     anything else, this function raises ValueError.
269     """
270     if src_repository is None:
271         raise ValueError("component does not specify a source repository")
272     elif dst_repository is None:
273         raise ValueError("no destination repository specified to update repository filter")
274     elif repo_filter[1:] == ['=', src_repository]:
275         repo_filter[2] = dst_repository
276     elif repo_filter[1:] == ['in', [src_repository]]:
277         repo_filter[2] = [dst_repository]
278     else:
279         raise ValueError("repository filter is not a simple source match")
280
281 def migrate_script_version_filter(version_filter):
282     """Update a single script_version filter in-place for the destination.
283
284     Currently this function checks that all the filter operands are Git
285     commit hashes.  If they're not, it raises ValueError to indicate that
286     the filter is not portable.  It could be extended to make other
287     transformations in the future.
288     """
289     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
290         raise ValueError("script_version filter is not limited to commit hashes")
291
292 def attr_filtered(filter_, *attr_names):
293     """Return True if filter_ applies to any of attr_names, else False."""
294     return any((name == 'any') or (name in attr_names)
295                for name in filter_iter(filter_[0]))
296
297 @contextlib.contextmanager
298 def exception_handler(handler, *exc_types):
299     """If any exc_types are raised in the block, call handler on the exception."""
300     try:
301         yield
302     except exc_types as error:
303         handler(error)
304
305
306 # copy_workflow(wf_uuid, src, dst, args)
307 #
308 #    Copies a workflow identified by wf_uuid from src to dst.
309 #
310 #    If args.recursive is True, also copy any collections
311 #      referenced in the workflow definition yaml.
312 #
313 #    The owner_uuid of the new workflow is set to any given
314 #      project_uuid or the user who copied the template.
315 #
316 #    Returns the copied workflow object.
317 #
318 def copy_workflow(wf_uuid, src, dst, args):
319     # fetch the workflow from the source instance
320     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
321
322     if not wf["definition"]:
323         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
324
325     # copy collections and docker images
326     if args.recursive and wf["definition"]:
327         wf_def = yaml.safe_load(wf["definition"])
328         if wf_def is not None:
329             locations = []
330             docker_images = {}
331             graph = wf_def.get('$graph', None)
332             if graph is not None:
333                 workflow_collections(graph, locations, docker_images)
334             else:
335                 workflow_collections(wf_def, locations, docker_images)
336
337             if locations:
338                 copy_collections(locations, src, dst, args)
339
340             for image in docker_images:
341                 copy_docker_image(image, docker_images[image], src, dst, args)
342
343     # copy the workflow itself
344     del wf['uuid']
345     wf['owner_uuid'] = args.project_uuid
346
347     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
348                                              ["name", "=", wf["name"]]]).execute()
349     if len(existing["items"]) == 0:
350         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
351     else:
352         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
353
354
355 def workflow_collections(obj, locations, docker_images):
356     if isinstance(obj, dict):
357         loc = obj.get('location', None)
358         if loc is not None:
359             if loc.startswith("keep:"):
360                 locations.append(loc[5:])
361
362         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
363         if docker_image is not None:
364             ds = docker_image.split(":", 1)
365             tag = ds[1] if len(ds)==2 else 'latest'
366             docker_images[ds[0]] = tag
367
368         for x in obj:
369             workflow_collections(obj[x], locations, docker_images)
370     elif isinstance(obj, list):
371         for x in obj:
372             workflow_collections(x, locations, docker_images)
373
374 # copy_collections(obj, src, dst, args)
375 #
376 #    Recursively copies all collections referenced by 'obj' from src
377 #    to dst.  obj may be a dict or a list, in which case we run
378 #    copy_collections on every value it contains. If it is a string,
379 #    search it for any substring that matches a collection hash or uuid
380 #    (this will find hidden references to collections like
381 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
382 #
383 #    Returns a copy of obj with any old collection uuids replaced by
384 #    the new ones.
385 #
386 def copy_collections(obj, src, dst, args):
387
388     def copy_collection_fn(collection_match):
389         """Helper function for regex substitution: copies a single collection,
390         identified by the collection_match MatchObject, to the
391         destination.  Returns the destination collection uuid (or the
392         portable data hash if that's what src_id is).
393
394         """
395         src_id = collection_match.group(0)
396         if src_id not in collections_copied:
397             dst_col = copy_collection(src_id, src, dst, args)
398             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
399                 collections_copied[src_id] = src_id
400             else:
401                 collections_copied[src_id] = dst_col['uuid']
402         return collections_copied[src_id]
403
404     if isinstance(obj, basestring):
405         # Copy any collections identified in this string to dst, replacing
406         # them with the dst uuids as necessary.
407         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
408         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
409         return obj
410     elif isinstance(obj, dict):
411         return type(obj)((v, copy_collections(obj[v], src, dst, args))
412                          for v in obj)
413     elif isinstance(obj, list):
414         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
415     return obj
416
417
418 def total_collection_size(manifest_text):
419     """Return the total number of bytes in this collection (excluding
420     duplicate blocks)."""
421
422     total_bytes = 0
423     locators_seen = {}
424     for line in manifest_text.splitlines():
425         words = line.split()
426         for word in words[1:]:
427             try:
428                 loc = arvados.KeepLocator(word)
429             except ValueError:
430                 continue  # this word isn't a locator, skip it
431             if loc.md5sum not in locators_seen:
432                 locators_seen[loc.md5sum] = True
433                 total_bytes += loc.size
434
435     return total_bytes
436
437 def create_collection_from(c, src, dst, args):
438     """Create a new collection record on dst, and copy Docker metadata if
439     available."""
440
441     collection_uuid = c['uuid']
442     body = {}
443     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
444         body[d] = c[d]
445
446     if not body["name"]:
447         body['name'] = "copied from " + collection_uuid
448
449     if args.storage_classes:
450         body['storage_classes_desired'] = args.storage_classes
451
452     body['owner_uuid'] = args.project_uuid
453
454     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
455
456     # Create docker_image_repo+tag and docker_image_hash links
457     # at the destination.
458     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
459         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
460
461         for src_link in docker_links:
462             body = {key: src_link[key]
463                     for key in ['link_class', 'name', 'properties']}
464             body['head_uuid'] = dst_collection['uuid']
465             body['owner_uuid'] = args.project_uuid
466
467             lk = dst.links().create(body=body).execute(num_retries=args.retries)
468             logger.debug('created dst link {}'.format(lk))
469
470     return dst_collection
471
472 # copy_collection(obj_uuid, src, dst, args)
473 #
474 #    Copies the collection identified by obj_uuid from src to dst.
475 #    Returns the collection object created at dst.
476 #
477 #    If args.progress is True, produce a human-friendly progress
478 #    report.
479 #
480 #    If a collection with the desired portable_data_hash already
481 #    exists at dst, and args.force is False, copy_collection returns
482 #    the existing collection without copying any blocks.  Otherwise
483 #    (if no collection exists or if args.force is True)
484 #    copy_collection copies all of the collection data blocks from src
485 #    to dst.
486 #
487 #    For this application, it is critical to preserve the
488 #    collection's manifest hash, which is not guaranteed with the
489 #    arvados.CollectionReader and arvados.CollectionWriter classes.
490 #    Copying each block in the collection manually, followed by
491 #    the manifest block, ensures that the collection's manifest
492 #    hash will not change.
493 #
494 def copy_collection(obj_uuid, src, dst, args):
495     if arvados.util.keep_locator_pattern.match(obj_uuid):
496         # If the obj_uuid is a portable data hash, it might not be
497         # uniquely identified with a particular collection.  As a
498         # result, it is ambiguous as to what name to use for the copy.
499         # Apply some heuristics to pick which collection to get the
500         # name from.
501         srccol = src.collections().list(
502             filters=[['portable_data_hash', '=', obj_uuid]],
503             order="created_at asc"
504             ).execute(num_retries=args.retries)
505
506         items = srccol.get("items")
507
508         if not items:
509             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
510             return
511
512         c = None
513
514         if len(items) == 1:
515             # There's only one collection with the PDH, so use that.
516             c = items[0]
517         if not c:
518             # See if there is a collection that's in the same project
519             # as the root item (usually a workflow) being copied.
520             for i in items:
521                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
522                     c = i
523                     break
524         if not c:
525             # Didn't find any collections located in the same project, so
526             # pick the oldest collection that has a name assigned to it.
527             for i in items:
528                 if i.get("name"):
529                     c = i
530                     break
531         if not c:
532             # None of the collections have names (?!), so just pick the
533             # first one.
534             c = items[0]
535
536         # list() doesn't return manifest text (and we don't want it to,
537         # because we don't need the same maninfest text sent to us 50
538         # times) so go and retrieve the collection object directly
539         # which will include the manifest text.
540         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
541     else:
542         # Assume this is an actual collection uuid, so fetch it directly.
543         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
544
545     # If a collection with this hash already exists at the
546     # destination, and 'force' is not true, just return that
547     # collection.
548     if not args.force:
549         if 'portable_data_hash' in c:
550             colhash = c['portable_data_hash']
551         else:
552             colhash = c['uuid']
553         dstcol = dst.collections().list(
554             filters=[['portable_data_hash', '=', colhash]]
555         ).execute(num_retries=args.retries)
556         if dstcol['items_available'] > 0:
557             for d in dstcol['items']:
558                 if ((args.project_uuid == d['owner_uuid']) and
559                     (c.get('name') == d['name']) and
560                     (c['portable_data_hash'] == d['portable_data_hash'])):
561                     return d
562             c['manifest_text'] = dst.collections().get(
563                 uuid=dstcol['items'][0]['uuid']
564             ).execute(num_retries=args.retries)['manifest_text']
565             return create_collection_from(c, src, dst, args)
566
567     # Fetch the collection's manifest.
568     manifest = c['manifest_text']
569     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
570
571     # Copy each block from src_keep to dst_keep.
572     # Use the newly signed locators returned from dst_keep to build
573     # a new manifest as we go.
574     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
575     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
576     dst_manifest = io.StringIO()
577     dst_locators = {}
578     bytes_written = 0
579     bytes_expected = total_collection_size(manifest)
580     if args.progress:
581         progress_writer = ProgressWriter(human_progress)
582     else:
583         progress_writer = None
584
585     # go through the words
586     # put each block loc into 'get' queue
587     # 'get' threads get block and put it into 'put' queue
588     # 'put' threads put block and then update dst_locators
589     #
590     # after going through the whole manifest we go back through it
591     # again and build dst_manifest
592
593     lock = threading.Lock()
594
595     # the get queue should be unbounded because we'll add all the
596     # block hashes we want to get, but these are small
597     get_queue = queue.Queue()
598
599     threadcount = 4
600
601     # the put queue contains full data blocks
602     # and if 'get' is faster than 'put' we could end up consuming
603     # a great deal of RAM if it isn't bounded.
604     put_queue = queue.Queue(threadcount)
605     transfer_error = []
606
607     def get_thread():
608         while True:
609             word = get_queue.get()
610             if word is None:
611                 put_queue.put(None)
612                 get_queue.task_done()
613                 return
614
615             blockhash = arvados.KeepLocator(word).md5sum
616             with lock:
617                 if blockhash in dst_locators:
618                     # Already uploaded
619                     get_queue.task_done()
620                     continue
621
622             try:
623                 logger.debug("Getting block %s", word)
624                 data = src_keep.get(word)
625                 put_queue.put((word, data))
626             except e:
627                 logger.error("Error getting block %s: %s", word, e)
628                 transfer_error.append(e)
629                 try:
630                     # Drain the 'get' queue so we end early
631                     while True:
632                         get_queue.get(False)
633                         get_queue.task_done()
634                 except queue.Empty:
635                     pass
636             finally:
637                 get_queue.task_done()
638
639     def put_thread():
640         nonlocal bytes_written
641         while True:
642             item = put_queue.get()
643             if item is None:
644                 put_queue.task_done()
645                 return
646
647             word, data = item
648             loc = arvados.KeepLocator(word)
649             blockhash = loc.md5sum
650             with lock:
651                 if blockhash in dst_locators:
652                     # Already uploaded
653                     put_queue.task_done()
654                     continue
655
656             try:
657                 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
658                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
659                 with lock:
660                     dst_locators[blockhash] = dst_locator
661                     bytes_written += loc.size
662                     if progress_writer:
663                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
664             except e:
665                 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
666                 try:
667                     # Drain the 'get' queue so we end early
668                     while True:
669                         get_queue.get(False)
670                         get_queue.task_done()
671                 except queue.Empty:
672                     pass
673                 transfer_error.append(e)
674             finally:
675                 put_queue.task_done()
676
677     for line in manifest.splitlines():
678         words = line.split()
679         for word in words[1:]:
680             try:
681                 loc = arvados.KeepLocator(word)
682             except ValueError:
683                 # If 'word' can't be parsed as a locator,
684                 # presume it's a filename.
685                 continue
686
687             get_queue.put(word)
688
689     for i in range(0, threadcount):
690         get_queue.put(None)
691
692     for i in range(0, threadcount):
693         threading.Thread(target=get_thread, daemon=True).start()
694
695     for i in range(0, threadcount):
696         threading.Thread(target=put_thread, daemon=True).start()
697
698     get_queue.join()
699     put_queue.join()
700
701     if len(transfer_error) > 0:
702         return {"error_token": "Failed to transfer blocks"}
703
704     for line in manifest.splitlines():
705         words = line.split()
706         dst_manifest.write(words[0])
707         for word in words[1:]:
708             try:
709                 loc = arvados.KeepLocator(word)
710             except ValueError:
711                 # If 'word' can't be parsed as a locator,
712                 # presume it's a filename.
713                 dst_manifest.write(' ')
714                 dst_manifest.write(word)
715                 continue
716             blockhash = loc.md5sum
717             dst_manifest.write(' ')
718             dst_manifest.write(dst_locators[blockhash])
719         dst_manifest.write("\n")
720
721     if progress_writer:
722         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
723         progress_writer.finish()
724
725     # Copy the manifest and save the collection.
726     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
727
728     c['manifest_text'] = dst_manifest.getvalue()
729     return create_collection_from(c, src, dst, args)
730
731 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
732     r = api.repositories().list(
733         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
734     if r['items_available'] != 1:
735         raise Exception('cannot identify repo {}; {} repos found'
736                         .format(repo_name, r['items_available']))
737
738     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
739     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
740     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
741
742     priority = https_url + other_url + http_url
743
744     for url in priority:
745         if url.startswith("http"):
746             u = urllib.parse.urlsplit(url)
747             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
748             git_config = ["-c", "credential.%s/.username=none" % baseurl,
749                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
750         else:
751             git_config = []
752
753         try:
754             logger.debug("trying %s", url)
755             subprocess.run(
756                 ['git', *git_config, 'ls-remote', url],
757                 check=True,
758                 env={
759                     'ARVADOS_API_TOKEN': api.api_token,
760                     'GIT_ASKPASS': '/bin/false',
761                     'HOME': os.environ['HOME'],
762                 },
763                 stdout=subprocess.DEVNULL,
764             )
765         except subprocess.CalledProcessError:
766             pass
767         else:
768             git_url = url
769             break
770     else:
771         raise Exception('Cannot access git repository, tried {}'
772                         .format(priority))
773
774     if git_url.startswith("http:"):
775         if allow_insecure_http:
776             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
777         else:
778             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
779
780     return (git_url, git_config)
781
782
783 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
784     """Copy the docker image identified by docker_image and
785     docker_image_tag from src to dst. Create appropriate
786     docker_image_repo+tag and docker_image_hash links at dst.
787
788     """
789
790     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
791
792     # Find the link identifying this docker image.
793     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
794         src, args.retries, docker_image, docker_image_tag)
795     if docker_image_list:
796         image_uuid, image_info = docker_image_list[0]
797         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
798
799         # Copy the collection it refers to.
800         dst_image_col = copy_collection(image_uuid, src, dst, args)
801     elif arvados.util.keep_locator_pattern.match(docker_image):
802         dst_image_col = copy_collection(docker_image, src, dst, args)
803     else:
804         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
805
806 def copy_project(obj_uuid, src, dst, owner_uuid, args):
807
808     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
809
810     # Create/update the destination project
811     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
812                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
813     if len(existing["items"]) == 0:
814         project_record = dst.groups().create(body={"group": {"group_class": "project",
815                                                              "owner_uuid": owner_uuid,
816                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
817     else:
818         project_record = existing["items"][0]
819
820     dst.groups().update(uuid=project_record["uuid"],
821                         body={"group": {
822                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
823
824     args.project_uuid = project_record["uuid"]
825
826     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
827
828
829     partial_error = ""
830
831     # Copy collections
832     try:
833         copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
834                          src, dst, args)
835     except Exception as e:
836         partial_error += "\n" + str(e)
837
838     # Copy workflows
839     for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
840         try:
841             copy_workflow(w["uuid"], src, dst, args)
842         except Exception as e:
843             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
844
845     if args.recursive:
846         for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
847             try:
848                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
849             except Exception as e:
850                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
851
852     project_record["partial_error"] = partial_error
853
854     return project_record
855
856 # git_rev_parse(rev, repo)
857 #
858 #    Returns the 40-character commit hash corresponding to 'rev' in
859 #    git repository 'repo' (which must be the path of a local git
860 #    repository)
861 #
862 def git_rev_parse(rev, repo):
863     proc = subprocess.run(
864         ['git', 'rev-parse', rev],
865         check=True,
866         cwd=repo,
867         stdout=subprocess.PIPE,
868         text=True,
869     )
870     return proc.stdout.read().strip()
871
872 # uuid_type(api, object_uuid)
873 #
874 #    Returns the name of the class that object_uuid belongs to, based on
875 #    the second field of the uuid.  This function consults the api's
876 #    schema to identify the object class.
877 #
878 #    It returns a string such as 'Collection', 'Workflow', etc.
879 #
880 #    Special case: if handed a Keep locator hash, return 'Collection'.
881 #
882 def uuid_type(api, object_uuid):
883     if re.match(arvados.util.keep_locator_pattern, object_uuid):
884         return 'Collection'
885
886     if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
887         return 'httpURL'
888
889     p = object_uuid.split('-')
890     if len(p) == 3:
891         type_prefix = p[1]
892         for k in api._schema.schemas:
893             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
894             if type_prefix == obj_class:
895                 return k
896     return None
897
898
899 def copy_from_http(url, src, dst, args):
900
901     project_uuid = args.project_uuid
902     varying_url_params = args.varying_url_params
903     prefer_cached_downloads = args.prefer_cached_downloads
904
905     cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
906                                                    varying_url_params=varying_url_params,
907                                                    prefer_cached_downloads=prefer_cached_downloads)
908     if cached[2] is not None:
909         return copy_collection(cached[2], src, dst, args)
910
911     cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
912                                                varying_url_params=varying_url_params,
913                                                prefer_cached_downloads=prefer_cached_downloads)
914
915     if cached is not None:
916         return {"uuid": cached[2]}
917
918
919 def abort(msg, code=1):
920     logger.info("arv-copy: %s", msg)
921     exit(code)
922
923
924 # Code for reporting on the progress of a collection upload.
925 # Stolen from arvados.commands.put.ArvPutCollectionWriter
926 # TODO(twp): figure out how to refactor into a shared library
927 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
928 # code)
929
930 def machine_progress(obj_uuid, bytes_written, bytes_expected):
931     return "{} {}: {} {} written {} total\n".format(
932         sys.argv[0],
933         os.getpid(),
934         obj_uuid,
935         bytes_written,
936         -1 if (bytes_expected is None) else bytes_expected)
937
938 def human_progress(obj_uuid, bytes_written, bytes_expected):
939     if bytes_expected:
940         return "\r{}: {}M / {}M {:.1%} ".format(
941             obj_uuid,
942             bytes_written >> 20, bytes_expected >> 20,
943             float(bytes_written) / bytes_expected)
944     else:
945         return "\r{}: {} ".format(obj_uuid, bytes_written)
946
947 class ProgressWriter(object):
948     _progress_func = None
949     outfile = sys.stderr
950
951     def __init__(self, progress_func):
952         self._progress_func = progress_func
953
954     def report(self, obj_uuid, bytes_written, bytes_expected):
955         if self._progress_func is not None:
956             self.outfile.write(
957                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
958
959     def finish(self):
960         self.outfile.write("\n")
961
962 if __name__ == '__main__':
963     main()