20937: Fix in -> is typo
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import subprocess
34 import sys
35 import logging
36 import tempfile
37 import urllib.parse
38 import io
39 import json
40 import queue
41 import threading
42
43 import arvados
44 import arvados.config
45 import arvados.keep
46 import arvados.util
47 import arvados.commands._util as arv_cmd
48 import arvados.commands.keepdocker
49 import arvados.http_to_keep
50 import ruamel.yaml as yaml
51
52 from arvados._version import __version__
53
54 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
55
56 logger = logging.getLogger('arvados.arv-copy')
57
58 # local_repo_dir records which git repositories from the Arvados source
59 # instance have been checked out locally during this run, and to which
60 # directories.
61 # e.g. if repository 'twp' from src_arv has been cloned into
62 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
63 #
64 local_repo_dir = {}
65
66 # List of collections that have been copied in this session, and their
67 # destination collection UUIDs.
68 collections_copied = {}
69
70 # Set of (repository, script_version) two-tuples of commits copied in git.
71 scripts_copied = set()
72
73 # The owner_uuid of the object being copied
74 src_owner_uuid = None
75
76 def main():
77     copy_opts = argparse.ArgumentParser(add_help=False)
78
79     copy_opts.add_argument(
80         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
81         help='Print version and exit.')
82     copy_opts.add_argument(
83         '-v', '--verbose', dest='verbose', action='store_true',
84         help='Verbose output.')
85     copy_opts.add_argument(
86         '--progress', dest='progress', action='store_true',
87         help='Report progress on copying collections. (default)')
88     copy_opts.add_argument(
89         '--no-progress', dest='progress', action='store_false',
90         help='Do not report progress on copying collections.')
91     copy_opts.add_argument(
92         '-f', '--force', dest='force', action='store_true',
93         help='Perform copy even if the object appears to exist at the remote destination.')
94     copy_opts.add_argument(
95         '--src', dest='source_arvados',
96         help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
97     copy_opts.add_argument(
98         '--dst', dest='destination_arvados',
99         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
100     copy_opts.add_argument(
101         '--recursive', dest='recursive', action='store_true',
102         help='Recursively copy any dependencies for this object, and subprojects. (default)')
103     copy_opts.add_argument(
104         '--no-recursive', dest='recursive', action='store_false',
105         help='Do not copy any dependencies or subprojects.')
106     copy_opts.add_argument(
107         '--project-uuid', dest='project_uuid',
108         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
109     copy_opts.add_argument(
110         '--storage-classes', dest='storage_classes',
111         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
112     copy_opts.add_argument("--varying-url-params", type=str, default="",
113                         help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
114
115     copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
116                         help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
117
118     copy_opts.add_argument(
119         'object_uuid',
120         help='The UUID of the object to be copied.')
121     copy_opts.set_defaults(progress=True)
122     copy_opts.set_defaults(recursive=True)
123
124     parser = argparse.ArgumentParser(
125         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
126         parents=[copy_opts, arv_cmd.retry_opt])
127     args = parser.parse_args()
128
129     if args.storage_classes:
130         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
131
132     if args.verbose:
133         logger.setLevel(logging.DEBUG)
134     else:
135         logger.setLevel(logging.INFO)
136
137     if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
138         args.source_arvados = args.object_uuid[:5]
139
140     # Create API clients for the source and destination instances
141     src_arv = api_for_instance(args.source_arvados, args.retries)
142     dst_arv = api_for_instance(args.destination_arvados, args.retries)
143
144     if not args.project_uuid:
145         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
146
147     # Identify the kind of object we have been given, and begin copying.
148     t = uuid_type(src_arv, args.object_uuid)
149     if t == 'Collection':
150         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
151         result = copy_collection(args.object_uuid,
152                                  src_arv, dst_arv,
153                                  args)
154     elif t == 'Workflow':
155         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
156         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
157     elif t == 'Group':
158         set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
159         result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
160     elif t == 'httpURL':
161         result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
162     else:
163         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
164
165     # Clean up any outstanding temp git repositories.
166     for d in listvalues(local_repo_dir):
167         shutil.rmtree(d, ignore_errors=True)
168
169     # If no exception was thrown and the response does not have an
170     # error_token field, presume success
171     if result is None or 'error_token' in result or 'uuid' not in result:
172         if result:
173             logger.error("API server returned an error result: {}".format(result))
174         exit(1)
175
176     print(result['uuid'])
177
178     if result.get('partial_error'):
179         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
180         exit(1)
181
182     logger.info("Success: created copy with uuid {}".format(result['uuid']))
183     exit(0)
184
185 def set_src_owner_uuid(resource, uuid, args):
186     global src_owner_uuid
187     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
188     src_owner_uuid = c.get("owner_uuid")
189
190 # api_for_instance(instance_name)
191 #
192 #     Creates an API client for the Arvados instance identified by
193 #     instance_name.
194 #
195 #     If instance_name contains a slash, it is presumed to be a path
196 #     (either local or absolute) to a file with Arvados configuration
197 #     settings.
198 #
199 #     Otherwise, it is presumed to be the name of a file in
200 #     $HOME/.config/arvados/instance_name.conf
201 #
202 def api_for_instance(instance_name, num_retries):
203     if not instance_name:
204         # Use environment
205         return arvados.api('v1')
206
207     if '/' in instance_name:
208         config_file = instance_name
209     else:
210         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
211
212     try:
213         cfg = arvados.config.load(config_file)
214     except (IOError, OSError) as e:
215         abort(("Could not open config file {}: {}\n" +
216                "You must make sure that your configuration tokens\n" +
217                "for Arvados instance {} are in {} and that this\n" +
218                "file is readable.").format(
219                    config_file, e, instance_name, config_file))
220
221     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
222         api_is_insecure = (
223             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
224                 ['1', 't', 'true', 'y', 'yes']))
225         client = arvados.api('v1',
226                              host=cfg['ARVADOS_API_HOST'],
227                              token=cfg['ARVADOS_API_TOKEN'],
228                              insecure=api_is_insecure,
229                              num_retries=num_retries,
230                              )
231     else:
232         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
233     return client
234
235 # Check if git is available
236 def check_git_availability():
237     try:
238         subprocess.run(
239             ['git', '--version'],
240             check=True,
241             stdout=subprocess.DEVNULL,
242         )
243     except FileNotFoundError:
244         abort('git command is not available. Please ensure git is installed.')
245
246
247 def filter_iter(arg):
248     """Iterate a filter string-or-list.
249
250     Pass in a filter field that can either be a string or list.
251     This will iterate elements as if the field had been written as a list.
252     """
253     if isinstance(arg, basestring):
254         return iter((arg,))
255     else:
256         return iter(arg)
257
258 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
259     """Update a single repository filter in-place for the destination.
260
261     If the filter checks that the repository is src_repository, it is
262     updated to check that the repository is dst_repository.  If it does
263     anything else, this function raises ValueError.
264     """
265     if src_repository is None:
266         raise ValueError("component does not specify a source repository")
267     elif dst_repository is None:
268         raise ValueError("no destination repository specified to update repository filter")
269     elif repo_filter[1:] == ['=', src_repository]:
270         repo_filter[2] = dst_repository
271     elif repo_filter[1:] == ['in', [src_repository]]:
272         repo_filter[2] = [dst_repository]
273     else:
274         raise ValueError("repository filter is not a simple source match")
275
276 def migrate_script_version_filter(version_filter):
277     """Update a single script_version filter in-place for the destination.
278
279     Currently this function checks that all the filter operands are Git
280     commit hashes.  If they're not, it raises ValueError to indicate that
281     the filter is not portable.  It could be extended to make other
282     transformations in the future.
283     """
284     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
285         raise ValueError("script_version filter is not limited to commit hashes")
286
287 def attr_filtered(filter_, *attr_names):
288     """Return True if filter_ applies to any of attr_names, else False."""
289     return any((name == 'any') or (name in attr_names)
290                for name in filter_iter(filter_[0]))
291
292 @contextlib.contextmanager
293 def exception_handler(handler, *exc_types):
294     """If any exc_types are raised in the block, call handler on the exception."""
295     try:
296         yield
297     except exc_types as error:
298         handler(error)
299
300
301 # copy_workflow(wf_uuid, src, dst, args)
302 #
303 #    Copies a workflow identified by wf_uuid from src to dst.
304 #
305 #    If args.recursive is True, also copy any collections
306 #      referenced in the workflow definition yaml.
307 #
308 #    The owner_uuid of the new workflow is set to any given
309 #      project_uuid or the user who copied the template.
310 #
311 #    Returns the copied workflow object.
312 #
313 def copy_workflow(wf_uuid, src, dst, args):
314     # fetch the workflow from the source instance
315     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
316
317     if not wf["definition"]:
318         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
319
320     # copy collections and docker images
321     if args.recursive and wf["definition"]:
322         wf_def = yaml.safe_load(wf["definition"])
323         if wf_def is not None:
324             locations = []
325             docker_images = {}
326             graph = wf_def.get('$graph', None)
327             if graph is not None:
328                 workflow_collections(graph, locations, docker_images)
329             else:
330                 workflow_collections(wf_def, locations, docker_images)
331
332             if locations:
333                 copy_collections(locations, src, dst, args)
334
335             for image in docker_images:
336                 copy_docker_image(image, docker_images[image], src, dst, args)
337
338     # copy the workflow itself
339     del wf['uuid']
340     wf['owner_uuid'] = args.project_uuid
341
342     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
343                                              ["name", "=", wf["name"]]]).execute()
344     if len(existing["items"]) == 0:
345         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
346     else:
347         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
348
349
350 def workflow_collections(obj, locations, docker_images):
351     if isinstance(obj, dict):
352         loc = obj.get('location', None)
353         if loc is not None:
354             if loc.startswith("keep:"):
355                 locations.append(loc[5:])
356
357         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
358         if docker_image is not None:
359             ds = docker_image.split(":", 1)
360             tag = ds[1] if len(ds)==2 else 'latest'
361             docker_images[ds[0]] = tag
362
363         for x in obj:
364             workflow_collections(obj[x], locations, docker_images)
365     elif isinstance(obj, list):
366         for x in obj:
367             workflow_collections(x, locations, docker_images)
368
369 # copy_collections(obj, src, dst, args)
370 #
371 #    Recursively copies all collections referenced by 'obj' from src
372 #    to dst.  obj may be a dict or a list, in which case we run
373 #    copy_collections on every value it contains. If it is a string,
374 #    search it for any substring that matches a collection hash or uuid
375 #    (this will find hidden references to collections like
376 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
377 #
378 #    Returns a copy of obj with any old collection uuids replaced by
379 #    the new ones.
380 #
381 def copy_collections(obj, src, dst, args):
382
383     def copy_collection_fn(collection_match):
384         """Helper function for regex substitution: copies a single collection,
385         identified by the collection_match MatchObject, to the
386         destination.  Returns the destination collection uuid (or the
387         portable data hash if that's what src_id is).
388
389         """
390         src_id = collection_match.group(0)
391         if src_id not in collections_copied:
392             dst_col = copy_collection(src_id, src, dst, args)
393             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
394                 collections_copied[src_id] = src_id
395             else:
396                 collections_copied[src_id] = dst_col['uuid']
397         return collections_copied[src_id]
398
399     if isinstance(obj, basestring):
400         # Copy any collections identified in this string to dst, replacing
401         # them with the dst uuids as necessary.
402         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
403         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
404         return obj
405     elif isinstance(obj, dict):
406         return type(obj)((v, copy_collections(obj[v], src, dst, args))
407                          for v in obj)
408     elif isinstance(obj, list):
409         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
410     return obj
411
412
413 def total_collection_size(manifest_text):
414     """Return the total number of bytes in this collection (excluding
415     duplicate blocks)."""
416
417     total_bytes = 0
418     locators_seen = {}
419     for line in manifest_text.splitlines():
420         words = line.split()
421         for word in words[1:]:
422             try:
423                 loc = arvados.KeepLocator(word)
424             except ValueError:
425                 continue  # this word isn't a locator, skip it
426             if loc.md5sum not in locators_seen:
427                 locators_seen[loc.md5sum] = True
428                 total_bytes += loc.size
429
430     return total_bytes
431
432 def create_collection_from(c, src, dst, args):
433     """Create a new collection record on dst, and copy Docker metadata if
434     available."""
435
436     collection_uuid = c['uuid']
437     body = {}
438     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
439         body[d] = c[d]
440
441     if not body["name"]:
442         body['name'] = "copied from " + collection_uuid
443
444     if args.storage_classes:
445         body['storage_classes_desired'] = args.storage_classes
446
447     body['owner_uuid'] = args.project_uuid
448
449     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
450
451     # Create docker_image_repo+tag and docker_image_hash links
452     # at the destination.
453     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
454         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
455
456         for src_link in docker_links:
457             body = {key: src_link[key]
458                     for key in ['link_class', 'name', 'properties']}
459             body['head_uuid'] = dst_collection['uuid']
460             body['owner_uuid'] = args.project_uuid
461
462             lk = dst.links().create(body=body).execute(num_retries=args.retries)
463             logger.debug('created dst link {}'.format(lk))
464
465     return dst_collection
466
467 # copy_collection(obj_uuid, src, dst, args)
468 #
469 #    Copies the collection identified by obj_uuid from src to dst.
470 #    Returns the collection object created at dst.
471 #
472 #    If args.progress is True, produce a human-friendly progress
473 #    report.
474 #
475 #    If a collection with the desired portable_data_hash already
476 #    exists at dst, and args.force is False, copy_collection returns
477 #    the existing collection without copying any blocks.  Otherwise
478 #    (if no collection exists or if args.force is True)
479 #    copy_collection copies all of the collection data blocks from src
480 #    to dst.
481 #
482 #    For this application, it is critical to preserve the
483 #    collection's manifest hash, which is not guaranteed with the
484 #    arvados.CollectionReader and arvados.CollectionWriter classes.
485 #    Copying each block in the collection manually, followed by
486 #    the manifest block, ensures that the collection's manifest
487 #    hash will not change.
488 #
489 def copy_collection(obj_uuid, src, dst, args):
490     if arvados.util.keep_locator_pattern.match(obj_uuid):
491         # If the obj_uuid is a portable data hash, it might not be
492         # uniquely identified with a particular collection.  As a
493         # result, it is ambiguous as to what name to use for the copy.
494         # Apply some heuristics to pick which collection to get the
495         # name from.
496         srccol = src.collections().list(
497             filters=[['portable_data_hash', '=', obj_uuid]],
498             order="created_at asc"
499             ).execute(num_retries=args.retries)
500
501         items = srccol.get("items")
502
503         if not items:
504             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
505             return
506
507         c = None
508
509         if len(items) == 1:
510             # There's only one collection with the PDH, so use that.
511             c = items[0]
512         if not c:
513             # See if there is a collection that's in the same project
514             # as the root item (usually a workflow) being copied.
515             for i in items:
516                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
517                     c = i
518                     break
519         if not c:
520             # Didn't find any collections located in the same project, so
521             # pick the oldest collection that has a name assigned to it.
522             for i in items:
523                 if i.get("name"):
524                     c = i
525                     break
526         if not c:
527             # None of the collections have names (?!), so just pick the
528             # first one.
529             c = items[0]
530
531         # list() doesn't return manifest text (and we don't want it to,
532         # because we don't need the same maninfest text sent to us 50
533         # times) so go and retrieve the collection object directly
534         # which will include the manifest text.
535         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
536     else:
537         # Assume this is an actual collection uuid, so fetch it directly.
538         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
539
540     # If a collection with this hash already exists at the
541     # destination, and 'force' is not true, just return that
542     # collection.
543     if not args.force:
544         if 'portable_data_hash' in c:
545             colhash = c['portable_data_hash']
546         else:
547             colhash = c['uuid']
548         dstcol = dst.collections().list(
549             filters=[['portable_data_hash', '=', colhash]]
550         ).execute(num_retries=args.retries)
551         if dstcol['items_available'] > 0:
552             for d in dstcol['items']:
553                 if ((args.project_uuid == d['owner_uuid']) and
554                     (c.get('name') == d['name']) and
555                     (c['portable_data_hash'] == d['portable_data_hash'])):
556                     return d
557             c['manifest_text'] = dst.collections().get(
558                 uuid=dstcol['items'][0]['uuid']
559             ).execute(num_retries=args.retries)['manifest_text']
560             return create_collection_from(c, src, dst, args)
561
562     # Fetch the collection's manifest.
563     manifest = c['manifest_text']
564     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
565
566     # Copy each block from src_keep to dst_keep.
567     # Use the newly signed locators returned from dst_keep to build
568     # a new manifest as we go.
569     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
570     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
571     dst_manifest = io.StringIO()
572     dst_locators = {}
573     bytes_written = [0]
574     bytes_expected = total_collection_size(manifest)
575     if args.progress:
576         progress_writer = ProgressWriter(human_progress)
577     else:
578         progress_writer = None
579
580     # go through the words
581     # put each block loc into 'get' queue
582     # 'get' threads get block and put it into 'put' queue
583     # 'put' threads put block and then update dst_locators
584     #
585     # after going through the whole manifest we go back through it
586     # again and build dst_manifest
587
588     lock = threading.Lock()
589     get_queue = queue.Queue()
590     put_queue = queue.Queue()
591     transfer_error = []
592
593     def get_thread():
594         while True:
595             word = get_queue.get()
596             if word is None:
597                 put_queue.put(None)
598                 get_queue.task_done()
599                 return
600
601             blockhash = arvados.KeepLocator(word).md5sum
602             with lock:
603                 if blockhash in dst_locators:
604                     continue
605
606             try:
607                 logger.debug("Getting block %s", word)
608                 data = src_keep.get(word)
609                 put_queue.put((word, data))
610             except e:
611                 logger.error("Error getting block %s: %s", word, e)
612                 transfer_error.append(e)
613                 try:
614                     # Drain the 'get' queue so we end early
615                     while True:
616                         get_queue.get(False)
617                 except queue.Empty:
618                     pass
619             finally:
620                 get_queue.task_done()
621
622     def put_thread():
623         while True:
624             item = put_queue.get()
625             if item is None:
626                 put_queue.task_done()
627                 return
628
629             word, data = item
630             loc = arvados.KeepLocator(word)
631             blockhash = loc.md5sum
632             with lock:
633                 if blockhash in dst_locators:
634                     continue
635
636             try:
637                 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
638                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
639                 with lock:
640                     dst_locators[blockhash] = dst_locator
641                     bytes_written[0] += loc.size
642                     if progress_writer:
643                         progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
644             except e:
645                 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
646                 try:
647                     # Drain the 'get' queue so we end early
648                     while True:
649                         get_queue.get(False)
650                 except queue.Empty:
651                     pass
652                 transfer_error.append(e)
653             finally:
654                 put_queue.task_done()
655
656     for line in manifest.splitlines():
657         words = line.split()
658         for word in words[1:]:
659             try:
660                 loc = arvados.KeepLocator(word)
661             except ValueError:
662                 # If 'word' can't be parsed as a locator,
663                 # presume it's a filename.
664                 continue
665
666             get_queue.put(word)
667
668     get_queue.put(None)
669     get_queue.put(None)
670     get_queue.put(None)
671     get_queue.put(None)
672
673     threading.Thread(target=get_thread, daemon=True).start()
674     threading.Thread(target=get_thread, daemon=True).start()
675     threading.Thread(target=get_thread, daemon=True).start()
676     threading.Thread(target=get_thread, daemon=True).start()
677
678     threading.Thread(target=put_thread, daemon=True).start()
679     threading.Thread(target=put_thread, daemon=True).start()
680     threading.Thread(target=put_thread, daemon=True).start()
681     threading.Thread(target=put_thread, daemon=True).start()
682
683     get_queue.join()
684     put_queue.join()
685
686     if len(transfer_error) > 0:
687         return {"error_token": "Failed to transfer blocks"}
688
689     for line in manifest.splitlines():
690         words = line.split()
691         dst_manifest.write(words[0])
692         for word in words[1:]:
693             try:
694                 loc = arvados.KeepLocator(word)
695             except ValueError:
696                 # If 'word' can't be parsed as a locator,
697                 # presume it's a filename.
698                 dst_manifest.write(' ')
699                 dst_manifest.write(word)
700                 continue
701             blockhash = loc.md5sum
702             dst_manifest.write(' ')
703             dst_manifest.write(dst_locators[blockhash])
704         dst_manifest.write("\n")
705
706     if progress_writer:
707         progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
708         progress_writer.finish()
709
710     # Copy the manifest and save the collection.
711     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
712
713     c['manifest_text'] = dst_manifest.getvalue()
714     return create_collection_from(c, src, dst, args)
715
716 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
717     r = api.repositories().list(
718         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
719     if r['items_available'] != 1:
720         raise Exception('cannot identify repo {}; {} repos found'
721                         .format(repo_name, r['items_available']))
722
723     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
724     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
725     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
726
727     priority = https_url + other_url + http_url
728
729     for url in priority:
730         if url.startswith("http"):
731             u = urllib.parse.urlsplit(url)
732             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
733             git_config = ["-c", "credential.%s/.username=none" % baseurl,
734                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
735         else:
736             git_config = []
737
738         try:
739             logger.debug("trying %s", url)
740             subprocess.run(
741                 ['git', *git_config, 'ls-remote', url],
742                 check=True,
743                 env={
744                     'ARVADOS_API_TOKEN': api.api_token,
745                     'GIT_ASKPASS': '/bin/false',
746                     'HOME': os.environ['HOME'],
747                 },
748                 stdout=subprocess.DEVNULL,
749             )
750         except subprocess.CalledProcessError:
751             pass
752         else:
753             git_url = url
754             break
755     else:
756         raise Exception('Cannot access git repository, tried {}'
757                         .format(priority))
758
759     if git_url.startswith("http:"):
760         if allow_insecure_http:
761             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
762         else:
763             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
764
765     return (git_url, git_config)
766
767
768 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
769     """Copy the docker image identified by docker_image and
770     docker_image_tag from src to dst. Create appropriate
771     docker_image_repo+tag and docker_image_hash links at dst.
772
773     """
774
775     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
776
777     # Find the link identifying this docker image.
778     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
779         src, args.retries, docker_image, docker_image_tag)
780     if docker_image_list:
781         image_uuid, image_info = docker_image_list[0]
782         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
783
784         # Copy the collection it refers to.
785         dst_image_col = copy_collection(image_uuid, src, dst, args)
786     elif arvados.util.keep_locator_pattern.match(docker_image):
787         dst_image_col = copy_collection(docker_image, src, dst, args)
788     else:
789         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
790
791 def copy_project(obj_uuid, src, dst, owner_uuid, args):
792
793     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
794
795     # Create/update the destination project
796     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
797                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
798     if len(existing["items"]) == 0:
799         project_record = dst.groups().create(body={"group": {"group_class": "project",
800                                                              "owner_uuid": owner_uuid,
801                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
802     else:
803         project_record = existing["items"][0]
804
805     dst.groups().update(uuid=project_record["uuid"],
806                         body={"group": {
807                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
808
809     args.project_uuid = project_record["uuid"]
810
811     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
812
813
814     partial_error = ""
815
816     # Copy collections
817     try:
818         copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
819                          src, dst, args)
820     except Exception as e:
821         partial_error += "\n" + str(e)
822
823     # Copy workflows
824     for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
825         try:
826             copy_workflow(w["uuid"], src, dst, args)
827         except Exception as e:
828             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
829
830     if args.recursive:
831         for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
832             try:
833                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
834             except Exception as e:
835                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
836
837     project_record["partial_error"] = partial_error
838
839     return project_record
840
841 # git_rev_parse(rev, repo)
842 #
843 #    Returns the 40-character commit hash corresponding to 'rev' in
844 #    git repository 'repo' (which must be the path of a local git
845 #    repository)
846 #
847 def git_rev_parse(rev, repo):
848     proc = subprocess.run(
849         ['git', 'rev-parse', rev],
850         check=True,
851         cwd=repo,
852         stdout=subprocess.PIPE,
853         text=True,
854     )
855     return proc.stdout.read().strip()
856
857 # uuid_type(api, object_uuid)
858 #
859 #    Returns the name of the class that object_uuid belongs to, based on
860 #    the second field of the uuid.  This function consults the api's
861 #    schema to identify the object class.
862 #
863 #    It returns a string such as 'Collection', 'Workflow', etc.
864 #
865 #    Special case: if handed a Keep locator hash, return 'Collection'.
866 #
867 def uuid_type(api, object_uuid):
868     if re.match(arvados.util.keep_locator_pattern, object_uuid):
869         return 'Collection'
870
871     if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
872         return 'httpURL'
873
874     p = object_uuid.split('-')
875     if len(p) == 3:
876         type_prefix = p[1]
877         for k in api._schema.schemas:
878             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
879             if type_prefix == obj_class:
880                 return k
881     return None
882
883
884 def copy_from_http(url, src, dst, args):
885
886     project_uuid = args.project_uuid
887     varying_url_params = args.varying_url_params
888     prefer_cached_downloads = args.prefer_cached_downloads
889
890     cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
891                                                    varying_url_params=varying_url_params,
892                                                    prefer_cached_downloads=prefer_cached_downloads)
893     if cached[2] is not None:
894         return copy_collection(cached[2], src, dst, args)
895
896     cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
897                                                varying_url_params=varying_url_params,
898                                                prefer_cached_downloads=prefer_cached_downloads)
899
900     if cached is not None:
901         return {"uuid": cached[2]}
902
903
904 def abort(msg, code=1):
905     logger.info("arv-copy: %s", msg)
906     exit(code)
907
908
909 # Code for reporting on the progress of a collection upload.
910 # Stolen from arvados.commands.put.ArvPutCollectionWriter
911 # TODO(twp): figure out how to refactor into a shared library
912 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
913 # code)
914
915 def machine_progress(obj_uuid, bytes_written, bytes_expected):
916     return "{} {}: {} {} written {} total\n".format(
917         sys.argv[0],
918         os.getpid(),
919         obj_uuid,
920         bytes_written,
921         -1 if (bytes_expected is None) else bytes_expected)
922
923 def human_progress(obj_uuid, bytes_written, bytes_expected):
924     if bytes_expected:
925         return "\r{}: {}M / {}M {:.1%} ".format(
926             obj_uuid,
927             bytes_written >> 20, bytes_expected >> 20,
928             float(bytes_written) / bytes_expected)
929     else:
930         return "\r{}: {} ".format(obj_uuid, bytes_written)
931
932 class ProgressWriter(object):
933     _progress_func = None
934     outfile = sys.stderr
935
936     def __init__(self, progress_func):
937         self._progress_func = progress_func
938
939     def report(self, obj_uuid, bytes_written, bytes_expected):
940         if self._progress_func is not None:
941             self.outfile.write(
942                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
943
944     def finish(self):
945         self.outfile.write("\n")
946
947 if __name__ == '__main__':
948     main()