20846: Merge branch '19213-ubuntu2204-support' into 20846-ubuntu2204
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import subprocess
34 import sys
35 import logging
36 import tempfile
37 import urllib.parse
38 import io
39 import json
40 import queue
41 import threading
42
43 import arvados
44 import arvados.config
45 import arvados.keep
46 import arvados.util
47 import arvados.commands._util as arv_cmd
48 import arvados.commands.keepdocker
49 import arvados.http_to_keep
50 import ruamel.yaml as yaml
51
52 from arvados._version import __version__
53
54 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
55
56 logger = logging.getLogger('arvados.arv-copy')
57
58 # local_repo_dir records which git repositories from the Arvados source
59 # instance have been checked out locally during this run, and to which
60 # directories.
61 # e.g. if repository 'twp' from src_arv has been cloned into
62 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
63 #
64 local_repo_dir = {}
65
66 # List of collections that have been copied in this session, and their
67 # destination collection UUIDs.
68 collections_copied = {}
69
70 # Set of (repository, script_version) two-tuples of commits copied in git.
71 scripts_copied = set()
72
73 # The owner_uuid of the object being copied
74 src_owner_uuid = None
75
76 def main():
77     copy_opts = argparse.ArgumentParser(add_help=False)
78
79     copy_opts.add_argument(
80         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
81         help='Print version and exit.')
82     copy_opts.add_argument(
83         '-v', '--verbose', dest='verbose', action='store_true',
84         help='Verbose output.')
85     copy_opts.add_argument(
86         '--progress', dest='progress', action='store_true',
87         help='Report progress on copying collections. (default)')
88     copy_opts.add_argument(
89         '--no-progress', dest='progress', action='store_false',
90         help='Do not report progress on copying collections.')
91     copy_opts.add_argument(
92         '-f', '--force', dest='force', action='store_true',
93         help='Perform copy even if the object appears to exist at the remote destination.')
94     copy_opts.add_argument(
95         '--src', dest='source_arvados',
96         help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
97     copy_opts.add_argument(
98         '--dst', dest='destination_arvados',
99         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
100     copy_opts.add_argument(
101         '--recursive', dest='recursive', action='store_true',
102         help='Recursively copy any dependencies for this object, and subprojects. (default)')
103     copy_opts.add_argument(
104         '--no-recursive', dest='recursive', action='store_false',
105         help='Do not copy any dependencies or subprojects.')
106     copy_opts.add_argument(
107         '--project-uuid', dest='project_uuid',
108         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
109     copy_opts.add_argument(
110         '--storage-classes', dest='storage_classes',
111         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
112     copy_opts.add_argument("--varying-url-params", type=str, default="",
113                         help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
114
115     copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
116                         help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
117
118     copy_opts.add_argument(
119         'object_uuid',
120         help='The UUID of the object to be copied.')
121     copy_opts.set_defaults(progress=True)
122     copy_opts.set_defaults(recursive=True)
123
124     parser = argparse.ArgumentParser(
125         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
126         parents=[copy_opts, arv_cmd.retry_opt])
127     args = parser.parse_args()
128
129     if args.storage_classes:
130         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
131
132     if args.verbose:
133         logger.setLevel(logging.DEBUG)
134     else:
135         logger.setLevel(logging.INFO)
136
137     if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
138         args.source_arvados = args.object_uuid[:5]
139
140     # Create API clients for the source and destination instances
141     src_arv = api_for_instance(args.source_arvados, args.retries)
142     dst_arv = api_for_instance(args.destination_arvados, args.retries)
143
144     if not args.project_uuid:
145         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
146
147     # Identify the kind of object we have been given, and begin copying.
148     t = uuid_type(src_arv, args.object_uuid)
149
150     try:
151         if t == 'Collection':
152             set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
153             result = copy_collection(args.object_uuid,
154                                      src_arv, dst_arv,
155                                      args)
156         elif t == 'Workflow':
157             set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
158             result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
159         elif t == 'Group':
160             set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
161             result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
162         elif t == 'httpURL':
163             result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
164         else:
165             abort("cannot copy object {} of type {}".format(args.object_uuid, t))
166     except Exception as e:
167         logger.error("%s", e, exc_info=args.verbose)
168         exit(1)
169
170     # Clean up any outstanding temp git repositories.
171     for d in listvalues(local_repo_dir):
172         shutil.rmtree(d, ignore_errors=True)
173
174     if not result:
175         exit(1)
176
177     # If no exception was thrown and the response does not have an
178     # error_token field, presume success
179     if result is None or 'error_token' in result or 'uuid' not in result:
180         if result:
181             logger.error("API server returned an error result: {}".format(result))
182         exit(1)
183
184     print(result['uuid'])
185
186     if result.get('partial_error'):
187         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
188         exit(1)
189
190     logger.info("Success: created copy with uuid {}".format(result['uuid']))
191     exit(0)
192
193 def set_src_owner_uuid(resource, uuid, args):
194     global src_owner_uuid
195     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
196     src_owner_uuid = c.get("owner_uuid")
197
198 # api_for_instance(instance_name)
199 #
200 #     Creates an API client for the Arvados instance identified by
201 #     instance_name.
202 #
203 #     If instance_name contains a slash, it is presumed to be a path
204 #     (either local or absolute) to a file with Arvados configuration
205 #     settings.
206 #
207 #     Otherwise, it is presumed to be the name of a file in
208 #     $HOME/.config/arvados/instance_name.conf
209 #
210 def api_for_instance(instance_name, num_retries):
211     if not instance_name:
212         # Use environment
213         return arvados.api('v1')
214
215     if '/' in instance_name:
216         config_file = instance_name
217     else:
218         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
219
220     try:
221         cfg = arvados.config.load(config_file)
222     except (IOError, OSError) as e:
223         abort(("Could not open config file {}: {}\n" +
224                "You must make sure that your configuration tokens\n" +
225                "for Arvados instance {} are in {} and that this\n" +
226                "file is readable.").format(
227                    config_file, e, instance_name, config_file))
228
229     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
230         api_is_insecure = (
231             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
232                 ['1', 't', 'true', 'y', 'yes']))
233         client = arvados.api('v1',
234                              host=cfg['ARVADOS_API_HOST'],
235                              token=cfg['ARVADOS_API_TOKEN'],
236                              insecure=api_is_insecure,
237                              num_retries=num_retries,
238                              )
239     else:
240         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
241     return client
242
243 # Check if git is available
244 def check_git_availability():
245     try:
246         subprocess.run(
247             ['git', '--version'],
248             check=True,
249             stdout=subprocess.DEVNULL,
250         )
251     except FileNotFoundError:
252         abort('git command is not available. Please ensure git is installed.')
253
254
255 def filter_iter(arg):
256     """Iterate a filter string-or-list.
257
258     Pass in a filter field that can either be a string or list.
259     This will iterate elements as if the field had been written as a list.
260     """
261     if isinstance(arg, basestring):
262         return iter((arg,))
263     else:
264         return iter(arg)
265
266 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
267     """Update a single repository filter in-place for the destination.
268
269     If the filter checks that the repository is src_repository, it is
270     updated to check that the repository is dst_repository.  If it does
271     anything else, this function raises ValueError.
272     """
273     if src_repository is None:
274         raise ValueError("component does not specify a source repository")
275     elif dst_repository is None:
276         raise ValueError("no destination repository specified to update repository filter")
277     elif repo_filter[1:] == ['=', src_repository]:
278         repo_filter[2] = dst_repository
279     elif repo_filter[1:] == ['in', [src_repository]]:
280         repo_filter[2] = [dst_repository]
281     else:
282         raise ValueError("repository filter is not a simple source match")
283
284 def migrate_script_version_filter(version_filter):
285     """Update a single script_version filter in-place for the destination.
286
287     Currently this function checks that all the filter operands are Git
288     commit hashes.  If they're not, it raises ValueError to indicate that
289     the filter is not portable.  It could be extended to make other
290     transformations in the future.
291     """
292     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
293         raise ValueError("script_version filter is not limited to commit hashes")
294
295 def attr_filtered(filter_, *attr_names):
296     """Return True if filter_ applies to any of attr_names, else False."""
297     return any((name == 'any') or (name in attr_names)
298                for name in filter_iter(filter_[0]))
299
300 @contextlib.contextmanager
301 def exception_handler(handler, *exc_types):
302     """If any exc_types are raised in the block, call handler on the exception."""
303     try:
304         yield
305     except exc_types as error:
306         handler(error)
307
308
309 # copy_workflow(wf_uuid, src, dst, args)
310 #
311 #    Copies a workflow identified by wf_uuid from src to dst.
312 #
313 #    If args.recursive is True, also copy any collections
314 #      referenced in the workflow definition yaml.
315 #
316 #    The owner_uuid of the new workflow is set to any given
317 #      project_uuid or the user who copied the template.
318 #
319 #    Returns the copied workflow object.
320 #
321 def copy_workflow(wf_uuid, src, dst, args):
322     # fetch the workflow from the source instance
323     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
324
325     if not wf["definition"]:
326         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
327
328     # copy collections and docker images
329     if args.recursive and wf["definition"]:
330         env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
331                "ARVADOS_API_TOKEN": src.api_token,
332                "PATH": os.environ["PATH"]}
333         try:
334             result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
335                                     capture_output=True, env=env)
336         except FileNotFoundError:
337             no_arv_copy = True
338         else:
339             no_arv_copy = result.returncode == 2
340
341         if no_arv_copy:
342             raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
343         elif result.returncode != 0:
344             raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
345
346         locations = json.loads(result.stdout)
347
348         if locations:
349             copy_collections(locations, src, dst, args)
350
351     # copy the workflow itself
352     del wf['uuid']
353     wf['owner_uuid'] = args.project_uuid
354
355     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
356                                              ["name", "=", wf["name"]]]).execute()
357     if len(existing["items"]) == 0:
358         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
359     else:
360         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
361
362
363 def workflow_collections(obj, locations, docker_images):
364     if isinstance(obj, dict):
365         loc = obj.get('location', None)
366         if loc is not None:
367             if loc.startswith("keep:"):
368                 locations.append(loc[5:])
369
370         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
371         if docker_image is not None:
372             ds = docker_image.split(":", 1)
373             tag = ds[1] if len(ds)==2 else 'latest'
374             docker_images[ds[0]] = tag
375
376         for x in obj:
377             workflow_collections(obj[x], locations, docker_images)
378     elif isinstance(obj, list):
379         for x in obj:
380             workflow_collections(x, locations, docker_images)
381
382 # copy_collections(obj, src, dst, args)
383 #
384 #    Recursively copies all collections referenced by 'obj' from src
385 #    to dst.  obj may be a dict or a list, in which case we run
386 #    copy_collections on every value it contains. If it is a string,
387 #    search it for any substring that matches a collection hash or uuid
388 #    (this will find hidden references to collections like
389 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
390 #
391 #    Returns a copy of obj with any old collection uuids replaced by
392 #    the new ones.
393 #
394 def copy_collections(obj, src, dst, args):
395
396     def copy_collection_fn(collection_match):
397         """Helper function for regex substitution: copies a single collection,
398         identified by the collection_match MatchObject, to the
399         destination.  Returns the destination collection uuid (or the
400         portable data hash if that's what src_id is).
401
402         """
403         src_id = collection_match.group(0)
404         if src_id not in collections_copied:
405             dst_col = copy_collection(src_id, src, dst, args)
406             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
407                 collections_copied[src_id] = src_id
408             else:
409                 collections_copied[src_id] = dst_col['uuid']
410         return collections_copied[src_id]
411
412     if isinstance(obj, basestring):
413         # Copy any collections identified in this string to dst, replacing
414         # them with the dst uuids as necessary.
415         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
416         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
417         return obj
418     elif isinstance(obj, dict):
419         return type(obj)((v, copy_collections(obj[v], src, dst, args))
420                          for v in obj)
421     elif isinstance(obj, list):
422         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
423     return obj
424
425
426 def total_collection_size(manifest_text):
427     """Return the total number of bytes in this collection (excluding
428     duplicate blocks)."""
429
430     total_bytes = 0
431     locators_seen = {}
432     for line in manifest_text.splitlines():
433         words = line.split()
434         for word in words[1:]:
435             try:
436                 loc = arvados.KeepLocator(word)
437             except ValueError:
438                 continue  # this word isn't a locator, skip it
439             if loc.md5sum not in locators_seen:
440                 locators_seen[loc.md5sum] = True
441                 total_bytes += loc.size
442
443     return total_bytes
444
445 def create_collection_from(c, src, dst, args):
446     """Create a new collection record on dst, and copy Docker metadata if
447     available."""
448
449     collection_uuid = c['uuid']
450     body = {}
451     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
452         body[d] = c[d]
453
454     if not body["name"]:
455         body['name'] = "copied from " + collection_uuid
456
457     if args.storage_classes:
458         body['storage_classes_desired'] = args.storage_classes
459
460     body['owner_uuid'] = args.project_uuid
461
462     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
463
464     # Create docker_image_repo+tag and docker_image_hash links
465     # at the destination.
466     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
467         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
468
469         for src_link in docker_links:
470             body = {key: src_link[key]
471                     for key in ['link_class', 'name', 'properties']}
472             body['head_uuid'] = dst_collection['uuid']
473             body['owner_uuid'] = args.project_uuid
474
475             lk = dst.links().create(body=body).execute(num_retries=args.retries)
476             logger.debug('created dst link {}'.format(lk))
477
478     return dst_collection
479
480 # copy_collection(obj_uuid, src, dst, args)
481 #
482 #    Copies the collection identified by obj_uuid from src to dst.
483 #    Returns the collection object created at dst.
484 #
485 #    If args.progress is True, produce a human-friendly progress
486 #    report.
487 #
488 #    If a collection with the desired portable_data_hash already
489 #    exists at dst, and args.force is False, copy_collection returns
490 #    the existing collection without copying any blocks.  Otherwise
491 #    (if no collection exists or if args.force is True)
492 #    copy_collection copies all of the collection data blocks from src
493 #    to dst.
494 #
495 #    For this application, it is critical to preserve the
496 #    collection's manifest hash, which is not guaranteed with the
497 #    arvados.CollectionReader and arvados.CollectionWriter classes.
498 #    Copying each block in the collection manually, followed by
499 #    the manifest block, ensures that the collection's manifest
500 #    hash will not change.
501 #
502 def copy_collection(obj_uuid, src, dst, args):
503     if arvados.util.keep_locator_pattern.match(obj_uuid):
504         # If the obj_uuid is a portable data hash, it might not be
505         # uniquely identified with a particular collection.  As a
506         # result, it is ambiguous as to what name to use for the copy.
507         # Apply some heuristics to pick which collection to get the
508         # name from.
509         srccol = src.collections().list(
510             filters=[['portable_data_hash', '=', obj_uuid]],
511             order="created_at asc"
512             ).execute(num_retries=args.retries)
513
514         items = srccol.get("items")
515
516         if not items:
517             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
518             return
519
520         c = None
521
522         if len(items) == 1:
523             # There's only one collection with the PDH, so use that.
524             c = items[0]
525         if not c:
526             # See if there is a collection that's in the same project
527             # as the root item (usually a workflow) being copied.
528             for i in items:
529                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
530                     c = i
531                     break
532         if not c:
533             # Didn't find any collections located in the same project, so
534             # pick the oldest collection that has a name assigned to it.
535             for i in items:
536                 if i.get("name"):
537                     c = i
538                     break
539         if not c:
540             # None of the collections have names (?!), so just pick the
541             # first one.
542             c = items[0]
543
544         # list() doesn't return manifest text (and we don't want it to,
545         # because we don't need the same maninfest text sent to us 50
546         # times) so go and retrieve the collection object directly
547         # which will include the manifest text.
548         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
549     else:
550         # Assume this is an actual collection uuid, so fetch it directly.
551         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
552
553     # If a collection with this hash already exists at the
554     # destination, and 'force' is not true, just return that
555     # collection.
556     if not args.force:
557         if 'portable_data_hash' in c:
558             colhash = c['portable_data_hash']
559         else:
560             colhash = c['uuid']
561         dstcol = dst.collections().list(
562             filters=[['portable_data_hash', '=', colhash]]
563         ).execute(num_retries=args.retries)
564         if dstcol['items_available'] > 0:
565             for d in dstcol['items']:
566                 if ((args.project_uuid == d['owner_uuid']) and
567                     (c.get('name') == d['name']) and
568                     (c['portable_data_hash'] == d['portable_data_hash'])):
569                     return d
570             c['manifest_text'] = dst.collections().get(
571                 uuid=dstcol['items'][0]['uuid']
572             ).execute(num_retries=args.retries)['manifest_text']
573             return create_collection_from(c, src, dst, args)
574
575     # Fetch the collection's manifest.
576     manifest = c['manifest_text']
577     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
578
579     # Copy each block from src_keep to dst_keep.
580     # Use the newly signed locators returned from dst_keep to build
581     # a new manifest as we go.
582     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
583     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
584     dst_manifest = io.StringIO()
585     dst_locators = {}
586     bytes_written = 0
587     bytes_expected = total_collection_size(manifest)
588     if args.progress:
589         progress_writer = ProgressWriter(human_progress)
590     else:
591         progress_writer = None
592
593     # go through the words
594     # put each block loc into 'get' queue
595     # 'get' threads get block and put it into 'put' queue
596     # 'put' threads put block and then update dst_locators
597     #
598     # after going through the whole manifest we go back through it
599     # again and build dst_manifest
600
601     lock = threading.Lock()
602
603     # the get queue should be unbounded because we'll add all the
604     # block hashes we want to get, but these are small
605     get_queue = queue.Queue()
606
607     threadcount = 4
608
609     # the put queue contains full data blocks
610     # and if 'get' is faster than 'put' we could end up consuming
611     # a great deal of RAM if it isn't bounded.
612     put_queue = queue.Queue(threadcount)
613     transfer_error = []
614
615     def get_thread():
616         while True:
617             word = get_queue.get()
618             if word is None:
619                 put_queue.put(None)
620                 get_queue.task_done()
621                 return
622
623             blockhash = arvados.KeepLocator(word).md5sum
624             with lock:
625                 if blockhash in dst_locators:
626                     # Already uploaded
627                     get_queue.task_done()
628                     continue
629
630             try:
631                 logger.debug("Getting block %s", word)
632                 data = src_keep.get(word)
633                 put_queue.put((word, data))
634             except e:
635                 logger.error("Error getting block %s: %s", word, e)
636                 transfer_error.append(e)
637                 try:
638                     # Drain the 'get' queue so we end early
639                     while True:
640                         get_queue.get(False)
641                         get_queue.task_done()
642                 except queue.Empty:
643                     pass
644             finally:
645                 get_queue.task_done()
646
647     def put_thread():
648         nonlocal bytes_written
649         while True:
650             item = put_queue.get()
651             if item is None:
652                 put_queue.task_done()
653                 return
654
655             word, data = item
656             loc = arvados.KeepLocator(word)
657             blockhash = loc.md5sum
658             with lock:
659                 if blockhash in dst_locators:
660                     # Already uploaded
661                     put_queue.task_done()
662                     continue
663
664             try:
665                 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
666                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
667                 with lock:
668                     dst_locators[blockhash] = dst_locator
669                     bytes_written += loc.size
670                     if progress_writer:
671                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
672             except e:
673                 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
674                 try:
675                     # Drain the 'get' queue so we end early
676                     while True:
677                         get_queue.get(False)
678                         get_queue.task_done()
679                 except queue.Empty:
680                     pass
681                 transfer_error.append(e)
682             finally:
683                 put_queue.task_done()
684
685     for line in manifest.splitlines():
686         words = line.split()
687         for word in words[1:]:
688             try:
689                 loc = arvados.KeepLocator(word)
690             except ValueError:
691                 # If 'word' can't be parsed as a locator,
692                 # presume it's a filename.
693                 continue
694
695             get_queue.put(word)
696
697     for i in range(0, threadcount):
698         get_queue.put(None)
699
700     for i in range(0, threadcount):
701         threading.Thread(target=get_thread, daemon=True).start()
702
703     for i in range(0, threadcount):
704         threading.Thread(target=put_thread, daemon=True).start()
705
706     get_queue.join()
707     put_queue.join()
708
709     if len(transfer_error) > 0:
710         return {"error_token": "Failed to transfer blocks"}
711
712     for line in manifest.splitlines():
713         words = line.split()
714         dst_manifest.write(words[0])
715         for word in words[1:]:
716             try:
717                 loc = arvados.KeepLocator(word)
718             except ValueError:
719                 # If 'word' can't be parsed as a locator,
720                 # presume it's a filename.
721                 dst_manifest.write(' ')
722                 dst_manifest.write(word)
723                 continue
724             blockhash = loc.md5sum
725             dst_manifest.write(' ')
726             dst_manifest.write(dst_locators[blockhash])
727         dst_manifest.write("\n")
728
729     if progress_writer:
730         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
731         progress_writer.finish()
732
733     # Copy the manifest and save the collection.
734     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
735
736     c['manifest_text'] = dst_manifest.getvalue()
737     return create_collection_from(c, src, dst, args)
738
739 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
740     r = api.repositories().list(
741         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
742     if r['items_available'] != 1:
743         raise Exception('cannot identify repo {}; {} repos found'
744                         .format(repo_name, r['items_available']))
745
746     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
747     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
748     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
749
750     priority = https_url + other_url + http_url
751
752     for url in priority:
753         if url.startswith("http"):
754             u = urllib.parse.urlsplit(url)
755             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
756             git_config = ["-c", "credential.%s/.username=none" % baseurl,
757                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
758         else:
759             git_config = []
760
761         try:
762             logger.debug("trying %s", url)
763             subprocess.run(
764                 ['git', *git_config, 'ls-remote', url],
765                 check=True,
766                 env={
767                     'ARVADOS_API_TOKEN': api.api_token,
768                     'GIT_ASKPASS': '/bin/false',
769                     'HOME': os.environ['HOME'],
770                 },
771                 stdout=subprocess.DEVNULL,
772             )
773         except subprocess.CalledProcessError:
774             pass
775         else:
776             git_url = url
777             break
778     else:
779         raise Exception('Cannot access git repository, tried {}'
780                         .format(priority))
781
782     if git_url.startswith("http:"):
783         if allow_insecure_http:
784             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
785         else:
786             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
787
788     return (git_url, git_config)
789
790
791 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
792     """Copy the docker image identified by docker_image and
793     docker_image_tag from src to dst. Create appropriate
794     docker_image_repo+tag and docker_image_hash links at dst.
795
796     """
797
798     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
799
800     # Find the link identifying this docker image.
801     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
802         src, args.retries, docker_image, docker_image_tag)
803     if docker_image_list:
804         image_uuid, image_info = docker_image_list[0]
805         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
806
807         # Copy the collection it refers to.
808         dst_image_col = copy_collection(image_uuid, src, dst, args)
809     elif arvados.util.keep_locator_pattern.match(docker_image):
810         dst_image_col = copy_collection(docker_image, src, dst, args)
811     else:
812         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
813
814 def copy_project(obj_uuid, src, dst, owner_uuid, args):
815
816     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
817
818     # Create/update the destination project
819     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
820                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
821     if len(existing["items"]) == 0:
822         project_record = dst.groups().create(body={"group": {"group_class": "project",
823                                                              "owner_uuid": owner_uuid,
824                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
825     else:
826         project_record = existing["items"][0]
827
828     dst.groups().update(uuid=project_record["uuid"],
829                         body={"group": {
830                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
831
832     args.project_uuid = project_record["uuid"]
833
834     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
835
836
837     partial_error = ""
838
839     # Copy collections
840     try:
841         copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
842                          src, dst, args)
843     except Exception as e:
844         partial_error += "\n" + str(e)
845
846     # Copy workflows
847     for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
848         try:
849             copy_workflow(w["uuid"], src, dst, args)
850         except Exception as e:
851             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
852
853     if args.recursive:
854         for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
855             try:
856                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
857             except Exception as e:
858                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
859
860     project_record["partial_error"] = partial_error
861
862     return project_record
863
864 # git_rev_parse(rev, repo)
865 #
866 #    Returns the 40-character commit hash corresponding to 'rev' in
867 #    git repository 'repo' (which must be the path of a local git
868 #    repository)
869 #
870 def git_rev_parse(rev, repo):
871     proc = subprocess.run(
872         ['git', 'rev-parse', rev],
873         check=True,
874         cwd=repo,
875         stdout=subprocess.PIPE,
876         text=True,
877     )
878     return proc.stdout.read().strip()
879
880 # uuid_type(api, object_uuid)
881 #
882 #    Returns the name of the class that object_uuid belongs to, based on
883 #    the second field of the uuid.  This function consults the api's
884 #    schema to identify the object class.
885 #
886 #    It returns a string such as 'Collection', 'Workflow', etc.
887 #
888 #    Special case: if handed a Keep locator hash, return 'Collection'.
889 #
890 def uuid_type(api, object_uuid):
891     if re.match(arvados.util.keep_locator_pattern, object_uuid):
892         return 'Collection'
893
894     if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
895         return 'httpURL'
896
897     p = object_uuid.split('-')
898     if len(p) == 3:
899         type_prefix = p[1]
900         for k in api._schema.schemas:
901             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
902             if type_prefix == obj_class:
903                 return k
904     return None
905
906
907 def copy_from_http(url, src, dst, args):
908
909     project_uuid = args.project_uuid
910     varying_url_params = args.varying_url_params
911     prefer_cached_downloads = args.prefer_cached_downloads
912
913     cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
914                                                    varying_url_params=varying_url_params,
915                                                    prefer_cached_downloads=prefer_cached_downloads)
916     if cached[2] is not None:
917         return copy_collection(cached[2], src, dst, args)
918
919     cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
920                                                varying_url_params=varying_url_params,
921                                                prefer_cached_downloads=prefer_cached_downloads)
922
923     if cached is not None:
924         return {"uuid": cached[2]}
925
926
927 def abort(msg, code=1):
928     logger.info("arv-copy: %s", msg)
929     exit(code)
930
931
932 # Code for reporting on the progress of a collection upload.
933 # Stolen from arvados.commands.put.ArvPutCollectionWriter
934 # TODO(twp): figure out how to refactor into a shared library
935 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
936 # code)
937
938 def machine_progress(obj_uuid, bytes_written, bytes_expected):
939     return "{} {}: {} {} written {} total\n".format(
940         sys.argv[0],
941         os.getpid(),
942         obj_uuid,
943         bytes_written,
944         -1 if (bytes_expected is None) else bytes_expected)
945
946 def human_progress(obj_uuid, bytes_written, bytes_expected):
947     if bytes_expected:
948         return "\r{}: {}M / {}M {:.1%} ".format(
949             obj_uuid,
950             bytes_written >> 20, bytes_expected >> 20,
951             float(bytes_written) / bytes_expected)
952     else:
953         return "\r{}: {} ".format(obj_uuid, bytes_written)
954
955 class ProgressWriter(object):
956     _progress_func = None
957     outfile = sys.stderr
958
959     def __init__(self, progress_func):
960         self._progress_func = progress_func
961
962     def report(self, obj_uuid, bytes_written, bytes_expected):
963         if self._progress_func is not None:
964             self.outfile.write(
965                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
966
967     def finish(self):
968         self.outfile.write("\n")
969
970 if __name__ == '__main__':
971     main()