Merge branch '21356-clean-imports'
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 import argparse
22 import contextlib
23 import getpass
24 import os
25 import re
26 import shutil
27 import subprocess
28 import sys
29 import logging
30 import tempfile
31 import urllib.parse
32 import io
33 import json
34 import queue
35 import threading
36
37 import arvados
38 import arvados.config
39 import arvados.keep
40 import arvados.util
41 import arvados.commands._util as arv_cmd
42 import arvados.commands.keepdocker
43 import arvados.http_to_keep
44
45 from arvados._version import __version__
46
47 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
48
49 logger = logging.getLogger('arvados.arv-copy')
50
51 # local_repo_dir records which git repositories from the Arvados source
52 # instance have been checked out locally during this run, and to which
53 # directories.
54 # e.g. if repository 'twp' from src_arv has been cloned into
55 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
56 #
57 local_repo_dir = {}
58
59 # List of collections that have been copied in this session, and their
60 # destination collection UUIDs.
61 collections_copied = {}
62
63 # Set of (repository, script_version) two-tuples of commits copied in git.
64 scripts_copied = set()
65
66 # The owner_uuid of the object being copied
67 src_owner_uuid = None
68
69 def main():
70     copy_opts = argparse.ArgumentParser(add_help=False)
71
72     copy_opts.add_argument(
73         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
74         help='Print version and exit.')
75     copy_opts.add_argument(
76         '-v', '--verbose', dest='verbose', action='store_true',
77         help='Verbose output.')
78     copy_opts.add_argument(
79         '--progress', dest='progress', action='store_true',
80         help='Report progress on copying collections. (default)')
81     copy_opts.add_argument(
82         '--no-progress', dest='progress', action='store_false',
83         help='Do not report progress on copying collections.')
84     copy_opts.add_argument(
85         '-f', '--force', dest='force', action='store_true',
86         help='Perform copy even if the object appears to exist at the remote destination.')
87     copy_opts.add_argument(
88         '--src', dest='source_arvados',
89         help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
90     copy_opts.add_argument(
91         '--dst', dest='destination_arvados',
92         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
93     copy_opts.add_argument(
94         '--recursive', dest='recursive', action='store_true',
95         help='Recursively copy any dependencies for this object, and subprojects. (default)')
96     copy_opts.add_argument(
97         '--no-recursive', dest='recursive', action='store_false',
98         help='Do not copy any dependencies or subprojects.')
99     copy_opts.add_argument(
100         '--project-uuid', dest='project_uuid',
101         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
102     copy_opts.add_argument(
103         '--storage-classes', dest='storage_classes',
104         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
105     copy_opts.add_argument("--varying-url-params", type=str, default="",
106                         help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
107
108     copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
109                         help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
110
111     copy_opts.add_argument(
112         'object_uuid',
113         help='The UUID of the object to be copied.')
114     copy_opts.set_defaults(progress=True)
115     copy_opts.set_defaults(recursive=True)
116
117     parser = argparse.ArgumentParser(
118         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
119         parents=[copy_opts, arv_cmd.retry_opt])
120     args = parser.parse_args()
121
122     if args.storage_classes:
123         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
124
125     if args.verbose:
126         logger.setLevel(logging.DEBUG)
127     else:
128         logger.setLevel(logging.INFO)
129
130     if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
131         args.source_arvados = args.object_uuid[:5]
132
133     # Create API clients for the source and destination instances
134     src_arv = api_for_instance(args.source_arvados, args.retries)
135     dst_arv = api_for_instance(args.destination_arvados, args.retries)
136
137     if not args.project_uuid:
138         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
139
140     # Identify the kind of object we have been given, and begin copying.
141     t = uuid_type(src_arv, args.object_uuid)
142
143     try:
144         if t == 'Collection':
145             set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
146             result = copy_collection(args.object_uuid,
147                                      src_arv, dst_arv,
148                                      args)
149         elif t == 'Workflow':
150             set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
151             result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
152         elif t == 'Group':
153             set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
154             result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
155         elif t == 'httpURL':
156             result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
157         else:
158             abort("cannot copy object {} of type {}".format(args.object_uuid, t))
159     except Exception as e:
160         logger.error("%s", e, exc_info=args.verbose)
161         exit(1)
162
163     # Clean up any outstanding temp git repositories.
164     for d in local_repo_dir.values():
165         shutil.rmtree(d, ignore_errors=True)
166
167     if not result:
168         exit(1)
169
170     # If no exception was thrown and the response does not have an
171     # error_token field, presume success
172     if result is None or 'error_token' in result or 'uuid' not in result:
173         if result:
174             logger.error("API server returned an error result: {}".format(result))
175         exit(1)
176
177     print(result['uuid'])
178
179     if result.get('partial_error'):
180         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
181         exit(1)
182
183     logger.info("Success: created copy with uuid {}".format(result['uuid']))
184     exit(0)
185
186 def set_src_owner_uuid(resource, uuid, args):
187     global src_owner_uuid
188     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
189     src_owner_uuid = c.get("owner_uuid")
190
191 # api_for_instance(instance_name)
192 #
193 #     Creates an API client for the Arvados instance identified by
194 #     instance_name.
195 #
196 #     If instance_name contains a slash, it is presumed to be a path
197 #     (either local or absolute) to a file with Arvados configuration
198 #     settings.
199 #
200 #     Otherwise, it is presumed to be the name of a file in
201 #     $HOME/.config/arvados/instance_name.conf
202 #
203 def api_for_instance(instance_name, num_retries):
204     if not instance_name:
205         # Use environment
206         return arvados.api('v1')
207
208     if '/' in instance_name:
209         config_file = instance_name
210     else:
211         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
212
213     try:
214         cfg = arvados.config.load(config_file)
215     except (IOError, OSError) as e:
216         abort(("Could not open config file {}: {}\n" +
217                "You must make sure that your configuration tokens\n" +
218                "for Arvados instance {} are in {} and that this\n" +
219                "file is readable.").format(
220                    config_file, e, instance_name, config_file))
221
222     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
223         api_is_insecure = (
224             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
225                 ['1', 't', 'true', 'y', 'yes']))
226         client = arvados.api('v1',
227                              host=cfg['ARVADOS_API_HOST'],
228                              token=cfg['ARVADOS_API_TOKEN'],
229                              insecure=api_is_insecure,
230                              num_retries=num_retries,
231                              )
232     else:
233         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
234     return client
235
236 # Check if git is available
237 def check_git_availability():
238     try:
239         subprocess.run(
240             ['git', '--version'],
241             check=True,
242             stdout=subprocess.DEVNULL,
243         )
244     except FileNotFoundError:
245         abort('git command is not available. Please ensure git is installed.')
246
247
248 def filter_iter(arg):
249     """Iterate a filter string-or-list.
250
251     Pass in a filter field that can either be a string or list.
252     This will iterate elements as if the field had been written as a list.
253     """
254     if isinstance(arg, str):
255         yield arg
256     else:
257         yield from arg
258
259 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
260     """Update a single repository filter in-place for the destination.
261
262     If the filter checks that the repository is src_repository, it is
263     updated to check that the repository is dst_repository.  If it does
264     anything else, this function raises ValueError.
265     """
266     if src_repository is None:
267         raise ValueError("component does not specify a source repository")
268     elif dst_repository is None:
269         raise ValueError("no destination repository specified to update repository filter")
270     elif repo_filter[1:] == ['=', src_repository]:
271         repo_filter[2] = dst_repository
272     elif repo_filter[1:] == ['in', [src_repository]]:
273         repo_filter[2] = [dst_repository]
274     else:
275         raise ValueError("repository filter is not a simple source match")
276
277 def migrate_script_version_filter(version_filter):
278     """Update a single script_version filter in-place for the destination.
279
280     Currently this function checks that all the filter operands are Git
281     commit hashes.  If they're not, it raises ValueError to indicate that
282     the filter is not portable.  It could be extended to make other
283     transformations in the future.
284     """
285     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
286         raise ValueError("script_version filter is not limited to commit hashes")
287
288 def attr_filtered(filter_, *attr_names):
289     """Return True if filter_ applies to any of attr_names, else False."""
290     return any((name == 'any') or (name in attr_names)
291                for name in filter_iter(filter_[0]))
292
293 @contextlib.contextmanager
294 def exception_handler(handler, *exc_types):
295     """If any exc_types are raised in the block, call handler on the exception."""
296     try:
297         yield
298     except exc_types as error:
299         handler(error)
300
301
302 # copy_workflow(wf_uuid, src, dst, args)
303 #
304 #    Copies a workflow identified by wf_uuid from src to dst.
305 #
306 #    If args.recursive is True, also copy any collections
307 #      referenced in the workflow definition yaml.
308 #
309 #    The owner_uuid of the new workflow is set to any given
310 #      project_uuid or the user who copied the template.
311 #
312 #    Returns the copied workflow object.
313 #
314 def copy_workflow(wf_uuid, src, dst, args):
315     # fetch the workflow from the source instance
316     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
317
318     if not wf["definition"]:
319         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
320
321     # copy collections and docker images
322     if args.recursive and wf["definition"]:
323         env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
324                "ARVADOS_API_TOKEN": src.api_token,
325                "PATH": os.environ["PATH"]}
326         try:
327             result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
328                                     capture_output=True, env=env)
329         except FileNotFoundError:
330             no_arv_copy = True
331         else:
332             no_arv_copy = result.returncode == 2
333
334         if no_arv_copy:
335             raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
336         elif result.returncode != 0:
337             raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
338
339         locations = json.loads(result.stdout)
340
341         if locations:
342             copy_collections(locations, src, dst, args)
343
344     # copy the workflow itself
345     del wf['uuid']
346     wf['owner_uuid'] = args.project_uuid
347
348     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
349                                              ["name", "=", wf["name"]]]).execute()
350     if len(existing["items"]) == 0:
351         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
352     else:
353         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
354
355
356 def workflow_collections(obj, locations, docker_images):
357     if isinstance(obj, dict):
358         loc = obj.get('location', None)
359         if loc is not None:
360             if loc.startswith("keep:"):
361                 locations.append(loc[5:])
362
363         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
364         if docker_image is not None:
365             ds = docker_image.split(":", 1)
366             tag = ds[1] if len(ds)==2 else 'latest'
367             docker_images[ds[0]] = tag
368
369         for x in obj:
370             workflow_collections(obj[x], locations, docker_images)
371     elif isinstance(obj, list):
372         for x in obj:
373             workflow_collections(x, locations, docker_images)
374
375 # copy_collections(obj, src, dst, args)
376 #
377 #    Recursively copies all collections referenced by 'obj' from src
378 #    to dst.  obj may be a dict or a list, in which case we run
379 #    copy_collections on every value it contains. If it is a string,
380 #    search it for any substring that matches a collection hash or uuid
381 #    (this will find hidden references to collections like
382 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
383 #
384 #    Returns a copy of obj with any old collection uuids replaced by
385 #    the new ones.
386 #
387 def copy_collections(obj, src, dst, args):
388
389     def copy_collection_fn(collection_match):
390         """Helper function for regex substitution: copies a single collection,
391         identified by the collection_match MatchObject, to the
392         destination.  Returns the destination collection uuid (or the
393         portable data hash if that's what src_id is).
394
395         """
396         src_id = collection_match.group(0)
397         if src_id not in collections_copied:
398             dst_col = copy_collection(src_id, src, dst, args)
399             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
400                 collections_copied[src_id] = src_id
401             else:
402                 collections_copied[src_id] = dst_col['uuid']
403         return collections_copied[src_id]
404
405     if isinstance(obj, str):
406         # Copy any collections identified in this string to dst, replacing
407         # them with the dst uuids as necessary.
408         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
409         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
410         return obj
411     elif isinstance(obj, dict):
412         return type(obj)((v, copy_collections(obj[v], src, dst, args))
413                          for v in obj)
414     elif isinstance(obj, list):
415         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
416     return obj
417
418
419 def total_collection_size(manifest_text):
420     """Return the total number of bytes in this collection (excluding
421     duplicate blocks)."""
422
423     total_bytes = 0
424     locators_seen = {}
425     for line in manifest_text.splitlines():
426         words = line.split()
427         for word in words[1:]:
428             try:
429                 loc = arvados.KeepLocator(word)
430             except ValueError:
431                 continue  # this word isn't a locator, skip it
432             if loc.md5sum not in locators_seen:
433                 locators_seen[loc.md5sum] = True
434                 total_bytes += loc.size
435
436     return total_bytes
437
438 def create_collection_from(c, src, dst, args):
439     """Create a new collection record on dst, and copy Docker metadata if
440     available."""
441
442     collection_uuid = c['uuid']
443     body = {}
444     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
445         body[d] = c[d]
446
447     if not body["name"]:
448         body['name'] = "copied from " + collection_uuid
449
450     if args.storage_classes:
451         body['storage_classes_desired'] = args.storage_classes
452
453     body['owner_uuid'] = args.project_uuid
454
455     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
456
457     # Create docker_image_repo+tag and docker_image_hash links
458     # at the destination.
459     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
460         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
461
462         for src_link in docker_links:
463             body = {key: src_link[key]
464                     for key in ['link_class', 'name', 'properties']}
465             body['head_uuid'] = dst_collection['uuid']
466             body['owner_uuid'] = args.project_uuid
467
468             lk = dst.links().create(body=body).execute(num_retries=args.retries)
469             logger.debug('created dst link {}'.format(lk))
470
471     return dst_collection
472
473 # copy_collection(obj_uuid, src, dst, args)
474 #
475 #    Copies the collection identified by obj_uuid from src to dst.
476 #    Returns the collection object created at dst.
477 #
478 #    If args.progress is True, produce a human-friendly progress
479 #    report.
480 #
481 #    If a collection with the desired portable_data_hash already
482 #    exists at dst, and args.force is False, copy_collection returns
483 #    the existing collection without copying any blocks.  Otherwise
484 #    (if no collection exists or if args.force is True)
485 #    copy_collection copies all of the collection data blocks from src
486 #    to dst.
487 #
488 #    For this application, it is critical to preserve the
489 #    collection's manifest hash, which is not guaranteed with the
490 #    arvados.CollectionReader and arvados.CollectionWriter classes.
491 #    Copying each block in the collection manually, followed by
492 #    the manifest block, ensures that the collection's manifest
493 #    hash will not change.
494 #
495 def copy_collection(obj_uuid, src, dst, args):
496     if arvados.util.keep_locator_pattern.match(obj_uuid):
497         # If the obj_uuid is a portable data hash, it might not be
498         # uniquely identified with a particular collection.  As a
499         # result, it is ambiguous as to what name to use for the copy.
500         # Apply some heuristics to pick which collection to get the
501         # name from.
502         srccol = src.collections().list(
503             filters=[['portable_data_hash', '=', obj_uuid]],
504             order="created_at asc"
505             ).execute(num_retries=args.retries)
506
507         items = srccol.get("items")
508
509         if not items:
510             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
511             return
512
513         c = None
514
515         if len(items) == 1:
516             # There's only one collection with the PDH, so use that.
517             c = items[0]
518         if not c:
519             # See if there is a collection that's in the same project
520             # as the root item (usually a workflow) being copied.
521             for i in items:
522                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
523                     c = i
524                     break
525         if not c:
526             # Didn't find any collections located in the same project, so
527             # pick the oldest collection that has a name assigned to it.
528             for i in items:
529                 if i.get("name"):
530                     c = i
531                     break
532         if not c:
533             # None of the collections have names (?!), so just pick the
534             # first one.
535             c = items[0]
536
537         # list() doesn't return manifest text (and we don't want it to,
538         # because we don't need the same maninfest text sent to us 50
539         # times) so go and retrieve the collection object directly
540         # which will include the manifest text.
541         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
542     else:
543         # Assume this is an actual collection uuid, so fetch it directly.
544         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
545
546     # If a collection with this hash already exists at the
547     # destination, and 'force' is not true, just return that
548     # collection.
549     if not args.force:
550         if 'portable_data_hash' in c:
551             colhash = c['portable_data_hash']
552         else:
553             colhash = c['uuid']
554         dstcol = dst.collections().list(
555             filters=[['portable_data_hash', '=', colhash]]
556         ).execute(num_retries=args.retries)
557         if dstcol['items_available'] > 0:
558             for d in dstcol['items']:
559                 if ((args.project_uuid == d['owner_uuid']) and
560                     (c.get('name') == d['name']) and
561                     (c['portable_data_hash'] == d['portable_data_hash'])):
562                     return d
563             c['manifest_text'] = dst.collections().get(
564                 uuid=dstcol['items'][0]['uuid']
565             ).execute(num_retries=args.retries)['manifest_text']
566             return create_collection_from(c, src, dst, args)
567
568     # Fetch the collection's manifest.
569     manifest = c['manifest_text']
570     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
571
572     # Copy each block from src_keep to dst_keep.
573     # Use the newly signed locators returned from dst_keep to build
574     # a new manifest as we go.
575     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
576     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
577     dst_manifest = io.StringIO()
578     dst_locators = {}
579     bytes_written = 0
580     bytes_expected = total_collection_size(manifest)
581     if args.progress:
582         progress_writer = ProgressWriter(human_progress)
583     else:
584         progress_writer = None
585
586     # go through the words
587     # put each block loc into 'get' queue
588     # 'get' threads get block and put it into 'put' queue
589     # 'put' threads put block and then update dst_locators
590     #
591     # after going through the whole manifest we go back through it
592     # again and build dst_manifest
593
594     lock = threading.Lock()
595
596     # the get queue should be unbounded because we'll add all the
597     # block hashes we want to get, but these are small
598     get_queue = queue.Queue()
599
600     threadcount = 4
601
602     # the put queue contains full data blocks
603     # and if 'get' is faster than 'put' we could end up consuming
604     # a great deal of RAM if it isn't bounded.
605     put_queue = queue.Queue(threadcount)
606     transfer_error = []
607
608     def get_thread():
609         while True:
610             word = get_queue.get()
611             if word is None:
612                 put_queue.put(None)
613                 get_queue.task_done()
614                 return
615
616             blockhash = arvados.KeepLocator(word).md5sum
617             with lock:
618                 if blockhash in dst_locators:
619                     # Already uploaded
620                     get_queue.task_done()
621                     continue
622
623             try:
624                 logger.debug("Getting block %s", word)
625                 data = src_keep.get(word)
626                 put_queue.put((word, data))
627             except e:
628                 logger.error("Error getting block %s: %s", word, e)
629                 transfer_error.append(e)
630                 try:
631                     # Drain the 'get' queue so we end early
632                     while True:
633                         get_queue.get(False)
634                         get_queue.task_done()
635                 except queue.Empty:
636                     pass
637             finally:
638                 get_queue.task_done()
639
640     def put_thread():
641         nonlocal bytes_written
642         while True:
643             item = put_queue.get()
644             if item is None:
645                 put_queue.task_done()
646                 return
647
648             word, data = item
649             loc = arvados.KeepLocator(word)
650             blockhash = loc.md5sum
651             with lock:
652                 if blockhash in dst_locators:
653                     # Already uploaded
654                     put_queue.task_done()
655                     continue
656
657             try:
658                 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
659                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
660                 with lock:
661                     dst_locators[blockhash] = dst_locator
662                     bytes_written += loc.size
663                     if progress_writer:
664                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
665             except e:
666                 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
667                 try:
668                     # Drain the 'get' queue so we end early
669                     while True:
670                         get_queue.get(False)
671                         get_queue.task_done()
672                 except queue.Empty:
673                     pass
674                 transfer_error.append(e)
675             finally:
676                 put_queue.task_done()
677
678     for line in manifest.splitlines():
679         words = line.split()
680         for word in words[1:]:
681             try:
682                 loc = arvados.KeepLocator(word)
683             except ValueError:
684                 # If 'word' can't be parsed as a locator,
685                 # presume it's a filename.
686                 continue
687
688             get_queue.put(word)
689
690     for i in range(0, threadcount):
691         get_queue.put(None)
692
693     for i in range(0, threadcount):
694         threading.Thread(target=get_thread, daemon=True).start()
695
696     for i in range(0, threadcount):
697         threading.Thread(target=put_thread, daemon=True).start()
698
699     get_queue.join()
700     put_queue.join()
701
702     if len(transfer_error) > 0:
703         return {"error_token": "Failed to transfer blocks"}
704
705     for line in manifest.splitlines():
706         words = line.split()
707         dst_manifest.write(words[0])
708         for word in words[1:]:
709             try:
710                 loc = arvados.KeepLocator(word)
711             except ValueError:
712                 # If 'word' can't be parsed as a locator,
713                 # presume it's a filename.
714                 dst_manifest.write(' ')
715                 dst_manifest.write(word)
716                 continue
717             blockhash = loc.md5sum
718             dst_manifest.write(' ')
719             dst_manifest.write(dst_locators[blockhash])
720         dst_manifest.write("\n")
721
722     if progress_writer:
723         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
724         progress_writer.finish()
725
726     # Copy the manifest and save the collection.
727     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
728
729     c['manifest_text'] = dst_manifest.getvalue()
730     return create_collection_from(c, src, dst, args)
731
732 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
733     r = api.repositories().list(
734         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
735     if r['items_available'] != 1:
736         raise Exception('cannot identify repo {}; {} repos found'
737                         .format(repo_name, r['items_available']))
738
739     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
740     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
741     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
742
743     priority = https_url + other_url + http_url
744
745     for url in priority:
746         if url.startswith("http"):
747             u = urllib.parse.urlsplit(url)
748             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
749             git_config = ["-c", "credential.%s/.username=none" % baseurl,
750                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
751         else:
752             git_config = []
753
754         try:
755             logger.debug("trying %s", url)
756             subprocess.run(
757                 ['git', *git_config, 'ls-remote', url],
758                 check=True,
759                 env={
760                     'ARVADOS_API_TOKEN': api.api_token,
761                     'GIT_ASKPASS': '/bin/false',
762                     'HOME': os.environ['HOME'],
763                 },
764                 stdout=subprocess.DEVNULL,
765             )
766         except subprocess.CalledProcessError:
767             pass
768         else:
769             git_url = url
770             break
771     else:
772         raise Exception('Cannot access git repository, tried {}'
773                         .format(priority))
774
775     if git_url.startswith("http:"):
776         if allow_insecure_http:
777             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
778         else:
779             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
780
781     return (git_url, git_config)
782
783
784 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
785     """Copy the docker image identified by docker_image and
786     docker_image_tag from src to dst. Create appropriate
787     docker_image_repo+tag and docker_image_hash links at dst.
788
789     """
790
791     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
792
793     # Find the link identifying this docker image.
794     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
795         src, args.retries, docker_image, docker_image_tag)
796     if docker_image_list:
797         image_uuid, image_info = docker_image_list[0]
798         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
799
800         # Copy the collection it refers to.
801         dst_image_col = copy_collection(image_uuid, src, dst, args)
802     elif arvados.util.keep_locator_pattern.match(docker_image):
803         dst_image_col = copy_collection(docker_image, src, dst, args)
804     else:
805         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
806
807 def copy_project(obj_uuid, src, dst, owner_uuid, args):
808
809     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
810
811     # Create/update the destination project
812     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
813                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
814     if len(existing["items"]) == 0:
815         project_record = dst.groups().create(body={"group": {"group_class": "project",
816                                                              "owner_uuid": owner_uuid,
817                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
818     else:
819         project_record = existing["items"][0]
820
821     dst.groups().update(uuid=project_record["uuid"],
822                         body={"group": {
823                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
824
825     args.project_uuid = project_record["uuid"]
826
827     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
828
829
830     partial_error = ""
831
832     # Copy collections
833     try:
834         copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
835                          src, dst, args)
836     except Exception as e:
837         partial_error += "\n" + str(e)
838
839     # Copy workflows
840     for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
841         try:
842             copy_workflow(w["uuid"], src, dst, args)
843         except Exception as e:
844             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
845
846     if args.recursive:
847         for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
848             try:
849                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
850             except Exception as e:
851                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
852
853     project_record["partial_error"] = partial_error
854
855     return project_record
856
857 # git_rev_parse(rev, repo)
858 #
859 #    Returns the 40-character commit hash corresponding to 'rev' in
860 #    git repository 'repo' (which must be the path of a local git
861 #    repository)
862 #
863 def git_rev_parse(rev, repo):
864     proc = subprocess.run(
865         ['git', 'rev-parse', rev],
866         check=True,
867         cwd=repo,
868         stdout=subprocess.PIPE,
869         text=True,
870     )
871     return proc.stdout.read().strip()
872
873 # uuid_type(api, object_uuid)
874 #
875 #    Returns the name of the class that object_uuid belongs to, based on
876 #    the second field of the uuid.  This function consults the api's
877 #    schema to identify the object class.
878 #
879 #    It returns a string such as 'Collection', 'Workflow', etc.
880 #
881 #    Special case: if handed a Keep locator hash, return 'Collection'.
882 #
883 def uuid_type(api, object_uuid):
884     if re.match(arvados.util.keep_locator_pattern, object_uuid):
885         return 'Collection'
886
887     if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
888         return 'httpURL'
889
890     p = object_uuid.split('-')
891     if len(p) == 3:
892         type_prefix = p[1]
893         for k in api._schema.schemas:
894             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
895             if type_prefix == obj_class:
896                 return k
897     return None
898
899
900 def copy_from_http(url, src, dst, args):
901
902     project_uuid = args.project_uuid
903     varying_url_params = args.varying_url_params
904     prefer_cached_downloads = args.prefer_cached_downloads
905
906     cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
907                                                    varying_url_params=varying_url_params,
908                                                    prefer_cached_downloads=prefer_cached_downloads)
909     if cached[2] is not None:
910         return copy_collection(cached[2], src, dst, args)
911
912     cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
913                                                varying_url_params=varying_url_params,
914                                                prefer_cached_downloads=prefer_cached_downloads)
915
916     if cached is not None:
917         return {"uuid": cached[2]}
918
919
920 def abort(msg, code=1):
921     logger.info("arv-copy: %s", msg)
922     exit(code)
923
924
925 # Code for reporting on the progress of a collection upload.
926 # Stolen from arvados.commands.put.ArvPutCollectionWriter
927 # TODO(twp): figure out how to refactor into a shared library
928 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
929 # code)
930
931 def machine_progress(obj_uuid, bytes_written, bytes_expected):
932     return "{} {}: {} {} written {} total\n".format(
933         sys.argv[0],
934         os.getpid(),
935         obj_uuid,
936         bytes_written,
937         -1 if (bytes_expected is None) else bytes_expected)
938
939 def human_progress(obj_uuid, bytes_written, bytes_expected):
940     if bytes_expected:
941         return "\r{}: {}M / {}M {:.1%} ".format(
942             obj_uuid,
943             bytes_written >> 20, bytes_expected >> 20,
944             float(bytes_written) / bytes_expected)
945     else:
946         return "\r{}: {} ".format(obj_uuid, bytes_written)
947
948 class ProgressWriter(object):
949     _progress_func = None
950     outfile = sys.stderr
951
952     def __init__(self, progress_func):
953         self._progress_func = progress_func
954
955     def report(self, obj_uuid, bytes_written, bytes_expected):
956         if self._progress_func is not None:
957             self.outfile.write(
958                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
959
960     def finish(self):
961         self.outfile.write("\n")
962
963 if __name__ == '__main__':
964     main()