21700: Install Bundler system-wide in Rails postinst
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import subprocess
34 import sys
35 import logging
36 import tempfile
37 import urllib.parse
38 import io
39 import json
40 import queue
41 import threading
42
43 import arvados
44 import arvados.config
45 import arvados.keep
46 import arvados.util
47 import arvados.commands._util as arv_cmd
48 import arvados.commands.keepdocker
49 import arvados.http_to_keep
50
51 from arvados._version import __version__
52
53 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
54
55 logger = logging.getLogger('arvados.arv-copy')
56
57 # local_repo_dir records which git repositories from the Arvados source
58 # instance have been checked out locally during this run, and to which
59 # directories.
60 # e.g. if repository 'twp' from src_arv has been cloned into
61 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
62 #
63 local_repo_dir = {}
64
65 # List of collections that have been copied in this session, and their
66 # destination collection UUIDs.
67 collections_copied = {}
68
69 # Set of (repository, script_version) two-tuples of commits copied in git.
70 scripts_copied = set()
71
72 # The owner_uuid of the object being copied
73 src_owner_uuid = None
74
75 def main():
76     copy_opts = argparse.ArgumentParser(add_help=False)
77
78     copy_opts.add_argument(
79         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
80         help='Print version and exit.')
81     copy_opts.add_argument(
82         '-v', '--verbose', dest='verbose', action='store_true',
83         help='Verbose output.')
84     copy_opts.add_argument(
85         '--progress', dest='progress', action='store_true',
86         help='Report progress on copying collections. (default)')
87     copy_opts.add_argument(
88         '--no-progress', dest='progress', action='store_false',
89         help='Do not report progress on copying collections.')
90     copy_opts.add_argument(
91         '-f', '--force', dest='force', action='store_true',
92         help='Perform copy even if the object appears to exist at the remote destination.')
93     copy_opts.add_argument(
94         '--src', dest='source_arvados',
95         help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
96     copy_opts.add_argument(
97         '--dst', dest='destination_arvados',
98         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
99     copy_opts.add_argument(
100         '--recursive', dest='recursive', action='store_true',
101         help='Recursively copy any dependencies for this object, and subprojects. (default)')
102     copy_opts.add_argument(
103         '--no-recursive', dest='recursive', action='store_false',
104         help='Do not copy any dependencies or subprojects.')
105     copy_opts.add_argument(
106         '--project-uuid', dest='project_uuid',
107         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
108     copy_opts.add_argument(
109         '--storage-classes', dest='storage_classes',
110         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
111     copy_opts.add_argument("--varying-url-params", type=str, default="",
112                         help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
113
114     copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
115                         help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
116
117     copy_opts.add_argument(
118         'object_uuid',
119         help='The UUID of the object to be copied.')
120     copy_opts.set_defaults(progress=True)
121     copy_opts.set_defaults(recursive=True)
122
123     parser = argparse.ArgumentParser(
124         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
125         parents=[copy_opts, arv_cmd.retry_opt])
126     args = parser.parse_args()
127
128     if args.storage_classes:
129         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
130
131     if args.verbose:
132         logger.setLevel(logging.DEBUG)
133     else:
134         logger.setLevel(logging.INFO)
135
136     if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
137         args.source_arvados = args.object_uuid[:5]
138
139     # Create API clients for the source and destination instances
140     src_arv = api_for_instance(args.source_arvados, args.retries)
141     dst_arv = api_for_instance(args.destination_arvados, args.retries)
142
143     if not args.project_uuid:
144         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
145
146     # Identify the kind of object we have been given, and begin copying.
147     t = uuid_type(src_arv, args.object_uuid)
148
149     try:
150         if t == 'Collection':
151             set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
152             result = copy_collection(args.object_uuid,
153                                      src_arv, dst_arv,
154                                      args)
155         elif t == 'Workflow':
156             set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
157             result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
158         elif t == 'Group':
159             set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
160             result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
161         elif t == 'httpURL':
162             result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
163         else:
164             abort("cannot copy object {} of type {}".format(args.object_uuid, t))
165     except Exception as e:
166         logger.error("%s", e, exc_info=args.verbose)
167         exit(1)
168
169     # Clean up any outstanding temp git repositories.
170     for d in listvalues(local_repo_dir):
171         shutil.rmtree(d, ignore_errors=True)
172
173     if not result:
174         exit(1)
175
176     # If no exception was thrown and the response does not have an
177     # error_token field, presume success
178     if result is None or 'error_token' in result or 'uuid' not in result:
179         if result:
180             logger.error("API server returned an error result: {}".format(result))
181         exit(1)
182
183     print(result['uuid'])
184
185     if result.get('partial_error'):
186         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
187         exit(1)
188
189     logger.info("Success: created copy with uuid {}".format(result['uuid']))
190     exit(0)
191
192 def set_src_owner_uuid(resource, uuid, args):
193     global src_owner_uuid
194     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
195     src_owner_uuid = c.get("owner_uuid")
196
197 # api_for_instance(instance_name)
198 #
199 #     Creates an API client for the Arvados instance identified by
200 #     instance_name.
201 #
202 #     If instance_name contains a slash, it is presumed to be a path
203 #     (either local or absolute) to a file with Arvados configuration
204 #     settings.
205 #
206 #     Otherwise, it is presumed to be the name of a file in
207 #     $HOME/.config/arvados/instance_name.conf
208 #
209 def api_for_instance(instance_name, num_retries):
210     if not instance_name:
211         # Use environment
212         return arvados.api('v1')
213
214     if '/' in instance_name:
215         config_file = instance_name
216     else:
217         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
218
219     try:
220         cfg = arvados.config.load(config_file)
221     except (IOError, OSError) as e:
222         abort(("Could not open config file {}: {}\n" +
223                "You must make sure that your configuration tokens\n" +
224                "for Arvados instance {} are in {} and that this\n" +
225                "file is readable.").format(
226                    config_file, e, instance_name, config_file))
227
228     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
229         api_is_insecure = (
230             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
231                 ['1', 't', 'true', 'y', 'yes']))
232         client = arvados.api('v1',
233                              host=cfg['ARVADOS_API_HOST'],
234                              token=cfg['ARVADOS_API_TOKEN'],
235                              insecure=api_is_insecure,
236                              num_retries=num_retries,
237                              )
238     else:
239         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
240     return client
241
242 # Check if git is available
243 def check_git_availability():
244     try:
245         subprocess.run(
246             ['git', '--version'],
247             check=True,
248             stdout=subprocess.DEVNULL,
249         )
250     except FileNotFoundError:
251         abort('git command is not available. Please ensure git is installed.')
252
253
254 def filter_iter(arg):
255     """Iterate a filter string-or-list.
256
257     Pass in a filter field that can either be a string or list.
258     This will iterate elements as if the field had been written as a list.
259     """
260     if isinstance(arg, basestring):
261         return iter((arg,))
262     else:
263         return iter(arg)
264
265 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
266     """Update a single repository filter in-place for the destination.
267
268     If the filter checks that the repository is src_repository, it is
269     updated to check that the repository is dst_repository.  If it does
270     anything else, this function raises ValueError.
271     """
272     if src_repository is None:
273         raise ValueError("component does not specify a source repository")
274     elif dst_repository is None:
275         raise ValueError("no destination repository specified to update repository filter")
276     elif repo_filter[1:] == ['=', src_repository]:
277         repo_filter[2] = dst_repository
278     elif repo_filter[1:] == ['in', [src_repository]]:
279         repo_filter[2] = [dst_repository]
280     else:
281         raise ValueError("repository filter is not a simple source match")
282
283 def migrate_script_version_filter(version_filter):
284     """Update a single script_version filter in-place for the destination.
285
286     Currently this function checks that all the filter operands are Git
287     commit hashes.  If they're not, it raises ValueError to indicate that
288     the filter is not portable.  It could be extended to make other
289     transformations in the future.
290     """
291     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
292         raise ValueError("script_version filter is not limited to commit hashes")
293
294 def attr_filtered(filter_, *attr_names):
295     """Return True if filter_ applies to any of attr_names, else False."""
296     return any((name == 'any') or (name in attr_names)
297                for name in filter_iter(filter_[0]))
298
299 @contextlib.contextmanager
300 def exception_handler(handler, *exc_types):
301     """If any exc_types are raised in the block, call handler on the exception."""
302     try:
303         yield
304     except exc_types as error:
305         handler(error)
306
307
308 # copy_workflow(wf_uuid, src, dst, args)
309 #
310 #    Copies a workflow identified by wf_uuid from src to dst.
311 #
312 #    If args.recursive is True, also copy any collections
313 #      referenced in the workflow definition yaml.
314 #
315 #    The owner_uuid of the new workflow is set to any given
316 #      project_uuid or the user who copied the template.
317 #
318 #    Returns the copied workflow object.
319 #
320 def copy_workflow(wf_uuid, src, dst, args):
321     # fetch the workflow from the source instance
322     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
323
324     if not wf["definition"]:
325         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
326
327     # copy collections and docker images
328     if args.recursive and wf["definition"]:
329         env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
330                "ARVADOS_API_TOKEN": src.api_token,
331                "PATH": os.environ["PATH"]}
332         try:
333             result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
334                                     capture_output=True, env=env)
335         except FileNotFoundError:
336             no_arv_copy = True
337         else:
338             no_arv_copy = result.returncode == 2
339
340         if no_arv_copy:
341             raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
342         elif result.returncode != 0:
343             raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
344
345         locations = json.loads(result.stdout)
346
347         if locations:
348             copy_collections(locations, src, dst, args)
349
350     # copy the workflow itself
351     del wf['uuid']
352     wf['owner_uuid'] = args.project_uuid
353
354     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
355                                              ["name", "=", wf["name"]]]).execute()
356     if len(existing["items"]) == 0:
357         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
358     else:
359         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
360
361
362 def workflow_collections(obj, locations, docker_images):
363     if isinstance(obj, dict):
364         loc = obj.get('location', None)
365         if loc is not None:
366             if loc.startswith("keep:"):
367                 locations.append(loc[5:])
368
369         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
370         if docker_image is not None:
371             ds = docker_image.split(":", 1)
372             tag = ds[1] if len(ds)==2 else 'latest'
373             docker_images[ds[0]] = tag
374
375         for x in obj:
376             workflow_collections(obj[x], locations, docker_images)
377     elif isinstance(obj, list):
378         for x in obj:
379             workflow_collections(x, locations, docker_images)
380
381 # copy_collections(obj, src, dst, args)
382 #
383 #    Recursively copies all collections referenced by 'obj' from src
384 #    to dst.  obj may be a dict or a list, in which case we run
385 #    copy_collections on every value it contains. If it is a string,
386 #    search it for any substring that matches a collection hash or uuid
387 #    (this will find hidden references to collections like
388 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
389 #
390 #    Returns a copy of obj with any old collection uuids replaced by
391 #    the new ones.
392 #
393 def copy_collections(obj, src, dst, args):
394
395     def copy_collection_fn(collection_match):
396         """Helper function for regex substitution: copies a single collection,
397         identified by the collection_match MatchObject, to the
398         destination.  Returns the destination collection uuid (or the
399         portable data hash if that's what src_id is).
400
401         """
402         src_id = collection_match.group(0)
403         if src_id not in collections_copied:
404             dst_col = copy_collection(src_id, src, dst, args)
405             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
406                 collections_copied[src_id] = src_id
407             else:
408                 collections_copied[src_id] = dst_col['uuid']
409         return collections_copied[src_id]
410
411     if isinstance(obj, basestring):
412         # Copy any collections identified in this string to dst, replacing
413         # them with the dst uuids as necessary.
414         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
415         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
416         return obj
417     elif isinstance(obj, dict):
418         return type(obj)((v, copy_collections(obj[v], src, dst, args))
419                          for v in obj)
420     elif isinstance(obj, list):
421         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
422     return obj
423
424
425 def total_collection_size(manifest_text):
426     """Return the total number of bytes in this collection (excluding
427     duplicate blocks)."""
428
429     total_bytes = 0
430     locators_seen = {}
431     for line in manifest_text.splitlines():
432         words = line.split()
433         for word in words[1:]:
434             try:
435                 loc = arvados.KeepLocator(word)
436             except ValueError:
437                 continue  # this word isn't a locator, skip it
438             if loc.md5sum not in locators_seen:
439                 locators_seen[loc.md5sum] = True
440                 total_bytes += loc.size
441
442     return total_bytes
443
444 def create_collection_from(c, src, dst, args):
445     """Create a new collection record on dst, and copy Docker metadata if
446     available."""
447
448     collection_uuid = c['uuid']
449     body = {}
450     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
451         body[d] = c[d]
452
453     if not body["name"]:
454         body['name'] = "copied from " + collection_uuid
455
456     if args.storage_classes:
457         body['storage_classes_desired'] = args.storage_classes
458
459     body['owner_uuid'] = args.project_uuid
460
461     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
462
463     # Create docker_image_repo+tag and docker_image_hash links
464     # at the destination.
465     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
466         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
467
468         for src_link in docker_links:
469             body = {key: src_link[key]
470                     for key in ['link_class', 'name', 'properties']}
471             body['head_uuid'] = dst_collection['uuid']
472             body['owner_uuid'] = args.project_uuid
473
474             lk = dst.links().create(body=body).execute(num_retries=args.retries)
475             logger.debug('created dst link {}'.format(lk))
476
477     return dst_collection
478
479 # copy_collection(obj_uuid, src, dst, args)
480 #
481 #    Copies the collection identified by obj_uuid from src to dst.
482 #    Returns the collection object created at dst.
483 #
484 #    If args.progress is True, produce a human-friendly progress
485 #    report.
486 #
487 #    If a collection with the desired portable_data_hash already
488 #    exists at dst, and args.force is False, copy_collection returns
489 #    the existing collection without copying any blocks.  Otherwise
490 #    (if no collection exists or if args.force is True)
491 #    copy_collection copies all of the collection data blocks from src
492 #    to dst.
493 #
494 #    For this application, it is critical to preserve the
495 #    collection's manifest hash, which is not guaranteed with the
496 #    arvados.CollectionReader and arvados.CollectionWriter classes.
497 #    Copying each block in the collection manually, followed by
498 #    the manifest block, ensures that the collection's manifest
499 #    hash will not change.
500 #
501 def copy_collection(obj_uuid, src, dst, args):
502     if arvados.util.keep_locator_pattern.match(obj_uuid):
503         # If the obj_uuid is a portable data hash, it might not be
504         # uniquely identified with a particular collection.  As a
505         # result, it is ambiguous as to what name to use for the copy.
506         # Apply some heuristics to pick which collection to get the
507         # name from.
508         srccol = src.collections().list(
509             filters=[['portable_data_hash', '=', obj_uuid]],
510             order="created_at asc"
511             ).execute(num_retries=args.retries)
512
513         items = srccol.get("items")
514
515         if not items:
516             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
517             return
518
519         c = None
520
521         if len(items) == 1:
522             # There's only one collection with the PDH, so use that.
523             c = items[0]
524         if not c:
525             # See if there is a collection that's in the same project
526             # as the root item (usually a workflow) being copied.
527             for i in items:
528                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
529                     c = i
530                     break
531         if not c:
532             # Didn't find any collections located in the same project, so
533             # pick the oldest collection that has a name assigned to it.
534             for i in items:
535                 if i.get("name"):
536                     c = i
537                     break
538         if not c:
539             # None of the collections have names (?!), so just pick the
540             # first one.
541             c = items[0]
542
543         # list() doesn't return manifest text (and we don't want it to,
544         # because we don't need the same maninfest text sent to us 50
545         # times) so go and retrieve the collection object directly
546         # which will include the manifest text.
547         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
548     else:
549         # Assume this is an actual collection uuid, so fetch it directly.
550         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
551
552     # If a collection with this hash already exists at the
553     # destination, and 'force' is not true, just return that
554     # collection.
555     if not args.force:
556         if 'portable_data_hash' in c:
557             colhash = c['portable_data_hash']
558         else:
559             colhash = c['uuid']
560         dstcol = dst.collections().list(
561             filters=[['portable_data_hash', '=', colhash]]
562         ).execute(num_retries=args.retries)
563         if dstcol['items_available'] > 0:
564             for d in dstcol['items']:
565                 if ((args.project_uuid == d['owner_uuid']) and
566                     (c.get('name') == d['name']) and
567                     (c['portable_data_hash'] == d['portable_data_hash'])):
568                     return d
569             c['manifest_text'] = dst.collections().get(
570                 uuid=dstcol['items'][0]['uuid']
571             ).execute(num_retries=args.retries)['manifest_text']
572             return create_collection_from(c, src, dst, args)
573
574     # Fetch the collection's manifest.
575     manifest = c['manifest_text']
576     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
577
578     # Copy each block from src_keep to dst_keep.
579     # Use the newly signed locators returned from dst_keep to build
580     # a new manifest as we go.
581     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
582     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
583     dst_manifest = io.StringIO()
584     dst_locators = {}
585     bytes_written = 0
586     bytes_expected = total_collection_size(manifest)
587     if args.progress:
588         progress_writer = ProgressWriter(human_progress)
589     else:
590         progress_writer = None
591
592     # go through the words
593     # put each block loc into 'get' queue
594     # 'get' threads get block and put it into 'put' queue
595     # 'put' threads put block and then update dst_locators
596     #
597     # after going through the whole manifest we go back through it
598     # again and build dst_manifest
599
600     lock = threading.Lock()
601
602     # the get queue should be unbounded because we'll add all the
603     # block hashes we want to get, but these are small
604     get_queue = queue.Queue()
605
606     threadcount = 4
607
608     # the put queue contains full data blocks
609     # and if 'get' is faster than 'put' we could end up consuming
610     # a great deal of RAM if it isn't bounded.
611     put_queue = queue.Queue(threadcount)
612     transfer_error = []
613
614     def get_thread():
615         while True:
616             word = get_queue.get()
617             if word is None:
618                 put_queue.put(None)
619                 get_queue.task_done()
620                 return
621
622             blockhash = arvados.KeepLocator(word).md5sum
623             with lock:
624                 if blockhash in dst_locators:
625                     # Already uploaded
626                     get_queue.task_done()
627                     continue
628
629             try:
630                 logger.debug("Getting block %s", word)
631                 data = src_keep.get(word)
632                 put_queue.put((word, data))
633             except e:
634                 logger.error("Error getting block %s: %s", word, e)
635                 transfer_error.append(e)
636                 try:
637                     # Drain the 'get' queue so we end early
638                     while True:
639                         get_queue.get(False)
640                         get_queue.task_done()
641                 except queue.Empty:
642                     pass
643             finally:
644                 get_queue.task_done()
645
646     def put_thread():
647         nonlocal bytes_written
648         while True:
649             item = put_queue.get()
650             if item is None:
651                 put_queue.task_done()
652                 return
653
654             word, data = item
655             loc = arvados.KeepLocator(word)
656             blockhash = loc.md5sum
657             with lock:
658                 if blockhash in dst_locators:
659                     # Already uploaded
660                     put_queue.task_done()
661                     continue
662
663             try:
664                 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
665                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
666                 with lock:
667                     dst_locators[blockhash] = dst_locator
668                     bytes_written += loc.size
669                     if progress_writer:
670                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
671             except e:
672                 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
673                 try:
674                     # Drain the 'get' queue so we end early
675                     while True:
676                         get_queue.get(False)
677                         get_queue.task_done()
678                 except queue.Empty:
679                     pass
680                 transfer_error.append(e)
681             finally:
682                 put_queue.task_done()
683
684     for line in manifest.splitlines():
685         words = line.split()
686         for word in words[1:]:
687             try:
688                 loc = arvados.KeepLocator(word)
689             except ValueError:
690                 # If 'word' can't be parsed as a locator,
691                 # presume it's a filename.
692                 continue
693
694             get_queue.put(word)
695
696     for i in range(0, threadcount):
697         get_queue.put(None)
698
699     for i in range(0, threadcount):
700         threading.Thread(target=get_thread, daemon=True).start()
701
702     for i in range(0, threadcount):
703         threading.Thread(target=put_thread, daemon=True).start()
704
705     get_queue.join()
706     put_queue.join()
707
708     if len(transfer_error) > 0:
709         return {"error_token": "Failed to transfer blocks"}
710
711     for line in manifest.splitlines():
712         words = line.split()
713         dst_manifest.write(words[0])
714         for word in words[1:]:
715             try:
716                 loc = arvados.KeepLocator(word)
717             except ValueError:
718                 # If 'word' can't be parsed as a locator,
719                 # presume it's a filename.
720                 dst_manifest.write(' ')
721                 dst_manifest.write(word)
722                 continue
723             blockhash = loc.md5sum
724             dst_manifest.write(' ')
725             dst_manifest.write(dst_locators[blockhash])
726         dst_manifest.write("\n")
727
728     if progress_writer:
729         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
730         progress_writer.finish()
731
732     # Copy the manifest and save the collection.
733     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
734
735     c['manifest_text'] = dst_manifest.getvalue()
736     return create_collection_from(c, src, dst, args)
737
738 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
739     r = api.repositories().list(
740         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
741     if r['items_available'] != 1:
742         raise Exception('cannot identify repo {}; {} repos found'
743                         .format(repo_name, r['items_available']))
744
745     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
746     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
747     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
748
749     priority = https_url + other_url + http_url
750
751     for url in priority:
752         if url.startswith("http"):
753             u = urllib.parse.urlsplit(url)
754             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
755             git_config = ["-c", "credential.%s/.username=none" % baseurl,
756                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
757         else:
758             git_config = []
759
760         try:
761             logger.debug("trying %s", url)
762             subprocess.run(
763                 ['git', *git_config, 'ls-remote', url],
764                 check=True,
765                 env={
766                     'ARVADOS_API_TOKEN': api.api_token,
767                     'GIT_ASKPASS': '/bin/false',
768                     'HOME': os.environ['HOME'],
769                 },
770                 stdout=subprocess.DEVNULL,
771             )
772         except subprocess.CalledProcessError:
773             pass
774         else:
775             git_url = url
776             break
777     else:
778         raise Exception('Cannot access git repository, tried {}'
779                         .format(priority))
780
781     if git_url.startswith("http:"):
782         if allow_insecure_http:
783             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
784         else:
785             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
786
787     return (git_url, git_config)
788
789
790 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
791     """Copy the docker image identified by docker_image and
792     docker_image_tag from src to dst. Create appropriate
793     docker_image_repo+tag and docker_image_hash links at dst.
794
795     """
796
797     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
798
799     # Find the link identifying this docker image.
800     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
801         src, args.retries, docker_image, docker_image_tag)
802     if docker_image_list:
803         image_uuid, image_info = docker_image_list[0]
804         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
805
806         # Copy the collection it refers to.
807         dst_image_col = copy_collection(image_uuid, src, dst, args)
808     elif arvados.util.keep_locator_pattern.match(docker_image):
809         dst_image_col = copy_collection(docker_image, src, dst, args)
810     else:
811         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
812
813 def copy_project(obj_uuid, src, dst, owner_uuid, args):
814
815     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
816
817     # Create/update the destination project
818     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
819                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
820     if len(existing["items"]) == 0:
821         project_record = dst.groups().create(body={"group": {"group_class": "project",
822                                                              "owner_uuid": owner_uuid,
823                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
824     else:
825         project_record = existing["items"][0]
826
827     dst.groups().update(uuid=project_record["uuid"],
828                         body={"group": {
829                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
830
831     args.project_uuid = project_record["uuid"]
832
833     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
834
835
836     partial_error = ""
837
838     # Copy collections
839     try:
840         copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
841                          src, dst, args)
842     except Exception as e:
843         partial_error += "\n" + str(e)
844
845     # Copy workflows
846     for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
847         try:
848             copy_workflow(w["uuid"], src, dst, args)
849         except Exception as e:
850             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
851
852     if args.recursive:
853         for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
854             try:
855                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
856             except Exception as e:
857                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
858
859     project_record["partial_error"] = partial_error
860
861     return project_record
862
863 # git_rev_parse(rev, repo)
864 #
865 #    Returns the 40-character commit hash corresponding to 'rev' in
866 #    git repository 'repo' (which must be the path of a local git
867 #    repository)
868 #
869 def git_rev_parse(rev, repo):
870     proc = subprocess.run(
871         ['git', 'rev-parse', rev],
872         check=True,
873         cwd=repo,
874         stdout=subprocess.PIPE,
875         text=True,
876     )
877     return proc.stdout.read().strip()
878
879 # uuid_type(api, object_uuid)
880 #
881 #    Returns the name of the class that object_uuid belongs to, based on
882 #    the second field of the uuid.  This function consults the api's
883 #    schema to identify the object class.
884 #
885 #    It returns a string such as 'Collection', 'Workflow', etc.
886 #
887 #    Special case: if handed a Keep locator hash, return 'Collection'.
888 #
889 def uuid_type(api, object_uuid):
890     if re.match(arvados.util.keep_locator_pattern, object_uuid):
891         return 'Collection'
892
893     if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
894         return 'httpURL'
895
896     p = object_uuid.split('-')
897     if len(p) == 3:
898         type_prefix = p[1]
899         for k in api._schema.schemas:
900             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
901             if type_prefix == obj_class:
902                 return k
903     return None
904
905
906 def copy_from_http(url, src, dst, args):
907
908     project_uuid = args.project_uuid
909     varying_url_params = args.varying_url_params
910     prefer_cached_downloads = args.prefer_cached_downloads
911
912     cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
913                                                    varying_url_params=varying_url_params,
914                                                    prefer_cached_downloads=prefer_cached_downloads)
915     if cached[2] is not None:
916         return copy_collection(cached[2], src, dst, args)
917
918     cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
919                                                varying_url_params=varying_url_params,
920                                                prefer_cached_downloads=prefer_cached_downloads)
921
922     if cached is not None:
923         return {"uuid": cached[2]}
924
925
926 def abort(msg, code=1):
927     logger.info("arv-copy: %s", msg)
928     exit(code)
929
930
931 # Code for reporting on the progress of a collection upload.
932 # Stolen from arvados.commands.put.ArvPutCollectionWriter
933 # TODO(twp): figure out how to refactor into a shared library
934 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
935 # code)
936
937 def machine_progress(obj_uuid, bytes_written, bytes_expected):
938     return "{} {}: {} {} written {} total\n".format(
939         sys.argv[0],
940         os.getpid(),
941         obj_uuid,
942         bytes_written,
943         -1 if (bytes_expected is None) else bytes_expected)
944
945 def human_progress(obj_uuid, bytes_written, bytes_expected):
946     if bytes_expected:
947         return "\r{}: {}M / {}M {:.1%} ".format(
948             obj_uuid,
949             bytes_written >> 20, bytes_expected >> 20,
950             float(bytes_written) / bytes_expected)
951     else:
952         return "\r{}: {} ".format(obj_uuid, bytes_written)
953
954 class ProgressWriter(object):
955     _progress_func = None
956     outfile = sys.stderr
957
958     def __init__(self, progress_func):
959         self._progress_func = progress_func
960
961     def report(self, obj_uuid, bytes_written, bytes_expected):
962         if self._progress_func is not None:
963             self.outfile.write(
964                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
965
966     def finish(self):
967         self.outfile.write("\n")
968
969 if __name__ == '__main__':
970     main()