21935: Rename safeapi.ThreadSafeApiCache to api.ThreadSafeAPIClient
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have configuration files {src}.conf and
17 # {dst}.conf in a standard configuration directory with valid login credentials
18 # for instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 import argparse
22 import contextlib
23 import getpass
24 import os
25 import re
26 import shutil
27 import subprocess
28 import sys
29 import logging
30 import tempfile
31 import urllib.parse
32 import io
33 import json
34 import queue
35 import threading
36
37 import arvados
38 import arvados.config
39 import arvados.keep
40 import arvados.util
41 import arvados.commands._util as arv_cmd
42 import arvados.commands.keepdocker
43 import arvados.http_to_keep
44
45 from arvados._version import __version__
46
47 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
48
49 logger = logging.getLogger('arvados.arv-copy')
50
51 # local_repo_dir records which git repositories from the Arvados source
52 # instance have been checked out locally during this run, and to which
53 # directories.
54 # e.g. if repository 'twp' from src_arv has been cloned into
55 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
56 #
57 local_repo_dir = {}
58
59 # List of collections that have been copied in this session, and their
60 # destination collection UUIDs.
61 collections_copied = {}
62
63 # Set of (repository, script_version) two-tuples of commits copied in git.
64 scripts_copied = set()
65
66 # The owner_uuid of the object being copied
67 src_owner_uuid = None
68
69 def main():
70     copy_opts = argparse.ArgumentParser(add_help=False)
71
72     copy_opts.add_argument(
73         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
74         help='Print version and exit.')
75     copy_opts.add_argument(
76         '-v', '--verbose', dest='verbose', action='store_true',
77         help='Verbose output.')
78     copy_opts.add_argument(
79         '--progress', dest='progress', action='store_true',
80         help='Report progress on copying collections. (default)')
81     copy_opts.add_argument(
82         '--no-progress', dest='progress', action='store_false',
83         help='Do not report progress on copying collections.')
84     copy_opts.add_argument(
85         '-f', '--force', dest='force', action='store_true',
86         help='Perform copy even if the object appears to exist at the remote destination.')
87     copy_opts.add_argument(
88         '--src', dest='source_arvados',
89         help="""
90 Client configuration location for the source Arvados cluster.
91 May be either a configuration file path, or a plain identifier like `foo`
92 to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
93 If not provided, will search for a configuration file named after the cluster ID of the source object UUID.
94 """,
95     )
96     copy_opts.add_argument(
97         '--dst', dest='destination_arvados',
98         help="""
99 Client configuration location for the destination Arvados cluster.
100 May be either a configuration file path, or a plain identifier like `foo`
101 to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
102 If not provided, will use the default client configuration from the environment or `settings.conf`.
103 """,
104     )
105     copy_opts.add_argument(
106         '--recursive', dest='recursive', action='store_true',
107         help='Recursively copy any dependencies for this object, and subprojects. (default)')
108     copy_opts.add_argument(
109         '--no-recursive', dest='recursive', action='store_false',
110         help='Do not copy any dependencies or subprojects.')
111     copy_opts.add_argument(
112         '--project-uuid', dest='project_uuid',
113         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
114     copy_opts.add_argument(
115         '--storage-classes', dest='storage_classes',
116         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
117     copy_opts.add_argument("--varying-url-params", type=str, default="",
118                         help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
119
120     copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
121                         help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
122
123     copy_opts.add_argument(
124         'object_uuid',
125         help='The UUID of the object to be copied.')
126     copy_opts.set_defaults(progress=True)
127     copy_opts.set_defaults(recursive=True)
128
129     parser = argparse.ArgumentParser(
130         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
131         parents=[copy_opts, arv_cmd.retry_opt])
132     args = parser.parse_args()
133
134     if args.storage_classes:
135         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
136
137     if args.verbose:
138         logger.setLevel(logging.DEBUG)
139     else:
140         logger.setLevel(logging.INFO)
141
142     if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
143         args.source_arvados = args.object_uuid[:5]
144
145     # Create API clients for the source and destination instances
146     src_arv = api_for_instance(args.source_arvados, args.retries)
147     dst_arv = api_for_instance(args.destination_arvados, args.retries)
148
149     if not args.project_uuid:
150         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
151
152     # Identify the kind of object we have been given, and begin copying.
153     t = uuid_type(src_arv, args.object_uuid)
154
155     try:
156         if t == 'Collection':
157             set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
158             result = copy_collection(args.object_uuid,
159                                      src_arv, dst_arv,
160                                      args)
161         elif t == 'Workflow':
162             set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
163             result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
164         elif t == 'Group':
165             set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
166             result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
167         elif t == 'httpURL':
168             result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
169         else:
170             abort("cannot copy object {} of type {}".format(args.object_uuid, t))
171     except Exception as e:
172         logger.error("%s", e, exc_info=args.verbose)
173         exit(1)
174
175     # Clean up any outstanding temp git repositories.
176     for d in local_repo_dir.values():
177         shutil.rmtree(d, ignore_errors=True)
178
179     if not result:
180         exit(1)
181
182     # If no exception was thrown and the response does not have an
183     # error_token field, presume success
184     if result is None or 'error_token' in result or 'uuid' not in result:
185         if result:
186             logger.error("API server returned an error result: {}".format(result))
187         exit(1)
188
189     print(result['uuid'])
190
191     if result.get('partial_error'):
192         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
193         exit(1)
194
195     logger.info("Success: created copy with uuid {}".format(result['uuid']))
196     exit(0)
197
198 def set_src_owner_uuid(resource, uuid, args):
199     global src_owner_uuid
200     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
201     src_owner_uuid = c.get("owner_uuid")
202
203 # api_for_instance(instance_name)
204 #
205 #     Creates an API client for the Arvados instance identified by
206 #     instance_name.
207 #
208 #     If instance_name contains a slash, it is presumed to be a path
209 #     (either local or absolute) to a file with Arvados configuration
210 #     settings.
211 #
212 #     Otherwise, it is presumed to be the name of a file in a standard
213 #     configuration directory.
214 #
215 def api_for_instance(instance_name, num_retries):
216     if not instance_name:
217         # Use environment
218         return arvados.api('v1')
219
220     if '/' in instance_name:
221         config_file = instance_name
222     else:
223         dirs = arvados.util._BaseDirectories('CONFIG')
224         config_file = next(dirs.search(f'{instance_name}.conf'), '')
225
226     try:
227         cfg = arvados.config.load(config_file)
228     except OSError as e:
229         if config_file:
230             verb = 'open'
231         else:
232             verb = 'find'
233             config_file = f'{instance_name}.conf'
234         abort(("Could not {} config file {}: {}\n" +
235                "You must make sure that your configuration tokens\n" +
236                "for Arvados instance {} are in {} and that this\n" +
237                "file is readable.").format(
238                    verb, config_file, e.strerror, instance_name, config_file))
239
240     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
241         api_is_insecure = (
242             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
243                 ['1', 't', 'true', 'y', 'yes']))
244         client = arvados.api('v1',
245                              host=cfg['ARVADOS_API_HOST'],
246                              token=cfg['ARVADOS_API_TOKEN'],
247                              insecure=api_is_insecure,
248                              num_retries=num_retries,
249                              )
250     else:
251         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
252     return client
253
254 # Check if git is available
255 def check_git_availability():
256     try:
257         subprocess.run(
258             ['git', '--version'],
259             check=True,
260             stdout=subprocess.DEVNULL,
261         )
262     except FileNotFoundError:
263         abort('git command is not available. Please ensure git is installed.')
264
265
266 def filter_iter(arg):
267     """Iterate a filter string-or-list.
268
269     Pass in a filter field that can either be a string or list.
270     This will iterate elements as if the field had been written as a list.
271     """
272     if isinstance(arg, str):
273         yield arg
274     else:
275         yield from arg
276
277 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
278     """Update a single repository filter in-place for the destination.
279
280     If the filter checks that the repository is src_repository, it is
281     updated to check that the repository is dst_repository.  If it does
282     anything else, this function raises ValueError.
283     """
284     if src_repository is None:
285         raise ValueError("component does not specify a source repository")
286     elif dst_repository is None:
287         raise ValueError("no destination repository specified to update repository filter")
288     elif repo_filter[1:] == ['=', src_repository]:
289         repo_filter[2] = dst_repository
290     elif repo_filter[1:] == ['in', [src_repository]]:
291         repo_filter[2] = [dst_repository]
292     else:
293         raise ValueError("repository filter is not a simple source match")
294
295 def migrate_script_version_filter(version_filter):
296     """Update a single script_version filter in-place for the destination.
297
298     Currently this function checks that all the filter operands are Git
299     commit hashes.  If they're not, it raises ValueError to indicate that
300     the filter is not portable.  It could be extended to make other
301     transformations in the future.
302     """
303     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
304         raise ValueError("script_version filter is not limited to commit hashes")
305
306 def attr_filtered(filter_, *attr_names):
307     """Return True if filter_ applies to any of attr_names, else False."""
308     return any((name == 'any') or (name in attr_names)
309                for name in filter_iter(filter_[0]))
310
311 @contextlib.contextmanager
312 def exception_handler(handler, *exc_types):
313     """If any exc_types are raised in the block, call handler on the exception."""
314     try:
315         yield
316     except exc_types as error:
317         handler(error)
318
319
320 # copy_workflow(wf_uuid, src, dst, args)
321 #
322 #    Copies a workflow identified by wf_uuid from src to dst.
323 #
324 #    If args.recursive is True, also copy any collections
325 #      referenced in the workflow definition yaml.
326 #
327 #    The owner_uuid of the new workflow is set to any given
328 #      project_uuid or the user who copied the template.
329 #
330 #    Returns the copied workflow object.
331 #
332 def copy_workflow(wf_uuid, src, dst, args):
333     # fetch the workflow from the source instance
334     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
335
336     if not wf["definition"]:
337         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
338
339     # copy collections and docker images
340     if args.recursive and wf["definition"]:
341         env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
342                "ARVADOS_API_TOKEN": src.api_token,
343                "PATH": os.environ["PATH"]}
344         try:
345             result = subprocess.run(
346                 ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
347                 env=env,
348                 stdout=subprocess.PIPE,
349                 universal_newlines=True,
350             )
351         except FileNotFoundError:
352             no_arv_copy = True
353         else:
354             no_arv_copy = result.returncode == 2
355
356         if no_arv_copy:
357             raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
358         elif result.returncode != 0:
359             raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
360
361         locations = json.loads(result.stdout)
362
363         if locations:
364             copy_collections(locations, src, dst, args)
365
366     # copy the workflow itself
367     del wf['uuid']
368     wf['owner_uuid'] = args.project_uuid
369
370     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
371                                              ["name", "=", wf["name"]]]).execute()
372     if len(existing["items"]) == 0:
373         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
374     else:
375         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
376
377
378 def workflow_collections(obj, locations, docker_images):
379     if isinstance(obj, dict):
380         loc = obj.get('location', None)
381         if loc is not None:
382             if loc.startswith("keep:"):
383                 locations.append(loc[5:])
384
385         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
386         if docker_image is not None:
387             ds = docker_image.split(":", 1)
388             tag = ds[1] if len(ds)==2 else 'latest'
389             docker_images[ds[0]] = tag
390
391         for x in obj:
392             workflow_collections(obj[x], locations, docker_images)
393     elif isinstance(obj, list):
394         for x in obj:
395             workflow_collections(x, locations, docker_images)
396
397 # copy_collections(obj, src, dst, args)
398 #
399 #    Recursively copies all collections referenced by 'obj' from src
400 #    to dst.  obj may be a dict or a list, in which case we run
401 #    copy_collections on every value it contains. If it is a string,
402 #    search it for any substring that matches a collection hash or uuid
403 #    (this will find hidden references to collections like
404 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
405 #
406 #    Returns a copy of obj with any old collection uuids replaced by
407 #    the new ones.
408 #
409 def copy_collections(obj, src, dst, args):
410
411     def copy_collection_fn(collection_match):
412         """Helper function for regex substitution: copies a single collection,
413         identified by the collection_match MatchObject, to the
414         destination.  Returns the destination collection uuid (or the
415         portable data hash if that's what src_id is).
416
417         """
418         src_id = collection_match.group(0)
419         if src_id not in collections_copied:
420             dst_col = copy_collection(src_id, src, dst, args)
421             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
422                 collections_copied[src_id] = src_id
423             else:
424                 collections_copied[src_id] = dst_col['uuid']
425         return collections_copied[src_id]
426
427     if isinstance(obj, str):
428         # Copy any collections identified in this string to dst, replacing
429         # them with the dst uuids as necessary.
430         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
431         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
432         return obj
433     elif isinstance(obj, dict):
434         return type(obj)((v, copy_collections(obj[v], src, dst, args))
435                          for v in obj)
436     elif isinstance(obj, list):
437         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
438     return obj
439
440
441 def total_collection_size(manifest_text):
442     """Return the total number of bytes in this collection (excluding
443     duplicate blocks)."""
444
445     total_bytes = 0
446     locators_seen = {}
447     for line in manifest_text.splitlines():
448         words = line.split()
449         for word in words[1:]:
450             try:
451                 loc = arvados.KeepLocator(word)
452             except ValueError:
453                 continue  # this word isn't a locator, skip it
454             if loc.md5sum not in locators_seen:
455                 locators_seen[loc.md5sum] = True
456                 total_bytes += loc.size
457
458     return total_bytes
459
460 def create_collection_from(c, src, dst, args):
461     """Create a new collection record on dst, and copy Docker metadata if
462     available."""
463
464     collection_uuid = c['uuid']
465     body = {}
466     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
467         body[d] = c[d]
468
469     if not body["name"]:
470         body['name'] = "copied from " + collection_uuid
471
472     if args.storage_classes:
473         body['storage_classes_desired'] = args.storage_classes
474
475     body['owner_uuid'] = args.project_uuid
476
477     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
478
479     # Create docker_image_repo+tag and docker_image_hash links
480     # at the destination.
481     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
482         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
483
484         for src_link in docker_links:
485             body = {key: src_link[key]
486                     for key in ['link_class', 'name', 'properties']}
487             body['head_uuid'] = dst_collection['uuid']
488             body['owner_uuid'] = args.project_uuid
489
490             lk = dst.links().create(body=body).execute(num_retries=args.retries)
491             logger.debug('created dst link {}'.format(lk))
492
493     return dst_collection
494
495 # copy_collection(obj_uuid, src, dst, args)
496 #
497 #    Copies the collection identified by obj_uuid from src to dst.
498 #    Returns the collection object created at dst.
499 #
500 #    If args.progress is True, produce a human-friendly progress
501 #    report.
502 #
503 #    If a collection with the desired portable_data_hash already
504 #    exists at dst, and args.force is False, copy_collection returns
505 #    the existing collection without copying any blocks.  Otherwise
506 #    (if no collection exists or if args.force is True)
507 #    copy_collection copies all of the collection data blocks from src
508 #    to dst.
509 #
510 #    For this application, it is critical to preserve the
511 #    collection's manifest hash, which is not guaranteed with the
512 #    arvados.CollectionReader and arvados.CollectionWriter classes.
513 #    Copying each block in the collection manually, followed by
514 #    the manifest block, ensures that the collection's manifest
515 #    hash will not change.
516 #
517 def copy_collection(obj_uuid, src, dst, args):
518     if arvados.util.keep_locator_pattern.match(obj_uuid):
519         # If the obj_uuid is a portable data hash, it might not be
520         # uniquely identified with a particular collection.  As a
521         # result, it is ambiguous as to what name to use for the copy.
522         # Apply some heuristics to pick which collection to get the
523         # name from.
524         srccol = src.collections().list(
525             filters=[['portable_data_hash', '=', obj_uuid]],
526             order="created_at asc"
527             ).execute(num_retries=args.retries)
528
529         items = srccol.get("items")
530
531         if not items:
532             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
533             return
534
535         c = None
536
537         if len(items) == 1:
538             # There's only one collection with the PDH, so use that.
539             c = items[0]
540         if not c:
541             # See if there is a collection that's in the same project
542             # as the root item (usually a workflow) being copied.
543             for i in items:
544                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
545                     c = i
546                     break
547         if not c:
548             # Didn't find any collections located in the same project, so
549             # pick the oldest collection that has a name assigned to it.
550             for i in items:
551                 if i.get("name"):
552                     c = i
553                     break
554         if not c:
555             # None of the collections have names (?!), so just pick the
556             # first one.
557             c = items[0]
558
559         # list() doesn't return manifest text (and we don't want it to,
560         # because we don't need the same maninfest text sent to us 50
561         # times) so go and retrieve the collection object directly
562         # which will include the manifest text.
563         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
564     else:
565         # Assume this is an actual collection uuid, so fetch it directly.
566         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
567
568     # If a collection with this hash already exists at the
569     # destination, and 'force' is not true, just return that
570     # collection.
571     if not args.force:
572         if 'portable_data_hash' in c:
573             colhash = c['portable_data_hash']
574         else:
575             colhash = c['uuid']
576         dstcol = dst.collections().list(
577             filters=[['portable_data_hash', '=', colhash]]
578         ).execute(num_retries=args.retries)
579         if dstcol['items_available'] > 0:
580             for d in dstcol['items']:
581                 if ((args.project_uuid == d['owner_uuid']) and
582                     (c.get('name') == d['name']) and
583                     (c['portable_data_hash'] == d['portable_data_hash'])):
584                     return d
585             c['manifest_text'] = dst.collections().get(
586                 uuid=dstcol['items'][0]['uuid']
587             ).execute(num_retries=args.retries)['manifest_text']
588             return create_collection_from(c, src, dst, args)
589
590     # Fetch the collection's manifest.
591     manifest = c['manifest_text']
592     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
593
594     # Copy each block from src_keep to dst_keep.
595     # Use the newly signed locators returned from dst_keep to build
596     # a new manifest as we go.
597     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
598     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
599     dst_manifest = io.StringIO()
600     dst_locators = {}
601     bytes_written = 0
602     bytes_expected = total_collection_size(manifest)
603     if args.progress:
604         progress_writer = ProgressWriter(human_progress)
605     else:
606         progress_writer = None
607
608     # go through the words
609     # put each block loc into 'get' queue
610     # 'get' threads get block and put it into 'put' queue
611     # 'put' threads put block and then update dst_locators
612     #
613     # after going through the whole manifest we go back through it
614     # again and build dst_manifest
615
616     lock = threading.Lock()
617
618     # the get queue should be unbounded because we'll add all the
619     # block hashes we want to get, but these are small
620     get_queue = queue.Queue()
621
622     threadcount = 4
623
624     # the put queue contains full data blocks
625     # and if 'get' is faster than 'put' we could end up consuming
626     # a great deal of RAM if it isn't bounded.
627     put_queue = queue.Queue(threadcount)
628     transfer_error = []
629
630     def get_thread():
631         while True:
632             word = get_queue.get()
633             if word is None:
634                 put_queue.put(None)
635                 get_queue.task_done()
636                 return
637
638             blockhash = arvados.KeepLocator(word).md5sum
639             with lock:
640                 if blockhash in dst_locators:
641                     # Already uploaded
642                     get_queue.task_done()
643                     continue
644
645             try:
646                 logger.debug("Getting block %s", word)
647                 data = src_keep.get(word)
648                 put_queue.put((word, data))
649             except Exception as e:
650                 logger.error("Error getting block %s: %s", word, e)
651                 transfer_error.append(e)
652                 try:
653                     # Drain the 'get' queue so we end early
654                     while True:
655                         get_queue.get(False)
656                         get_queue.task_done()
657                 except queue.Empty:
658                     pass
659             finally:
660                 get_queue.task_done()
661
662     def put_thread():
663         nonlocal bytes_written
664         while True:
665             item = put_queue.get()
666             if item is None:
667                 put_queue.task_done()
668                 return
669
670             word, data = item
671             loc = arvados.KeepLocator(word)
672             blockhash = loc.md5sum
673             with lock:
674                 if blockhash in dst_locators:
675                     # Already uploaded
676                     put_queue.task_done()
677                     continue
678
679             try:
680                 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
681                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
682                 with lock:
683                     dst_locators[blockhash] = dst_locator
684                     bytes_written += loc.size
685                     if progress_writer:
686                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
687             except Exception as e:
688                 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
689                 try:
690                     # Drain the 'get' queue so we end early
691                     while True:
692                         get_queue.get(False)
693                         get_queue.task_done()
694                 except queue.Empty:
695                     pass
696                 transfer_error.append(e)
697             finally:
698                 put_queue.task_done()
699
700     for line in manifest.splitlines():
701         words = line.split()
702         for word in words[1:]:
703             try:
704                 loc = arvados.KeepLocator(word)
705             except ValueError:
706                 # If 'word' can't be parsed as a locator,
707                 # presume it's a filename.
708                 continue
709
710             get_queue.put(word)
711
712     for i in range(0, threadcount):
713         get_queue.put(None)
714
715     for i in range(0, threadcount):
716         threading.Thread(target=get_thread, daemon=True).start()
717
718     for i in range(0, threadcount):
719         threading.Thread(target=put_thread, daemon=True).start()
720
721     get_queue.join()
722     put_queue.join()
723
724     if len(transfer_error) > 0:
725         return {"error_token": "Failed to transfer blocks"}
726
727     for line in manifest.splitlines():
728         words = line.split()
729         dst_manifest.write(words[0])
730         for word in words[1:]:
731             try:
732                 loc = arvados.KeepLocator(word)
733             except ValueError:
734                 # If 'word' can't be parsed as a locator,
735                 # presume it's a filename.
736                 dst_manifest.write(' ')
737                 dst_manifest.write(word)
738                 continue
739             blockhash = loc.md5sum
740             dst_manifest.write(' ')
741             dst_manifest.write(dst_locators[blockhash])
742         dst_manifest.write("\n")
743
744     if progress_writer:
745         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
746         progress_writer.finish()
747
748     # Copy the manifest and save the collection.
749     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
750
751     c['manifest_text'] = dst_manifest.getvalue()
752     return create_collection_from(c, src, dst, args)
753
754 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
755     """Copy the docker image identified by docker_image and
756     docker_image_tag from src to dst. Create appropriate
757     docker_image_repo+tag and docker_image_hash links at dst.
758
759     """
760
761     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
762
763     # Find the link identifying this docker image.
764     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
765         src, args.retries, docker_image, docker_image_tag)
766     if docker_image_list:
767         image_uuid, image_info = docker_image_list[0]
768         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
769
770         # Copy the collection it refers to.
771         dst_image_col = copy_collection(image_uuid, src, dst, args)
772     elif arvados.util.keep_locator_pattern.match(docker_image):
773         dst_image_col = copy_collection(docker_image, src, dst, args)
774     else:
775         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
776
777 def copy_project(obj_uuid, src, dst, owner_uuid, args):
778
779     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
780
781     # Create/update the destination project
782     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
783                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
784     if len(existing["items"]) == 0:
785         project_record = dst.groups().create(body={"group": {"group_class": "project",
786                                                              "owner_uuid": owner_uuid,
787                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
788     else:
789         project_record = existing["items"][0]
790
791     dst.groups().update(uuid=project_record["uuid"],
792                         body={"group": {
793                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
794
795     args.project_uuid = project_record["uuid"]
796
797     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
798
799
800     partial_error = ""
801
802     # Copy collections
803     try:
804         copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
805                          src, dst, args)
806     except Exception as e:
807         partial_error += "\n" + str(e)
808
809     # Copy workflows
810     for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
811         try:
812             copy_workflow(w["uuid"], src, dst, args)
813         except Exception as e:
814             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
815
816     if args.recursive:
817         for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
818             try:
819                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
820             except Exception as e:
821                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
822
823     project_record["partial_error"] = partial_error
824
825     return project_record
826
827 # git_rev_parse(rev, repo)
828 #
829 #    Returns the 40-character commit hash corresponding to 'rev' in
830 #    git repository 'repo' (which must be the path of a local git
831 #    repository)
832 #
833 def git_rev_parse(rev, repo):
834     proc = subprocess.run(
835         ['git', 'rev-parse', rev],
836         check=True,
837         cwd=repo,
838         stdout=subprocess.PIPE,
839         text=True,
840     )
841     return proc.stdout.read().strip()
842
843 # uuid_type(api, object_uuid)
844 #
845 #    Returns the name of the class that object_uuid belongs to, based on
846 #    the second field of the uuid.  This function consults the api's
847 #    schema to identify the object class.
848 #
849 #    It returns a string such as 'Collection', 'Workflow', etc.
850 #
851 #    Special case: if handed a Keep locator hash, return 'Collection'.
852 #
853 def uuid_type(api, object_uuid):
854     if re.match(arvados.util.keep_locator_pattern, object_uuid):
855         return 'Collection'
856
857     if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
858         return 'httpURL'
859
860     p = object_uuid.split('-')
861     if len(p) == 3:
862         type_prefix = p[1]
863         for k in api._schema.schemas:
864             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
865             if type_prefix == obj_class:
866                 return k
867     return None
868
869
870 def copy_from_http(url, src, dst, args):
871
872     project_uuid = args.project_uuid
873     varying_url_params = args.varying_url_params
874     prefer_cached_downloads = args.prefer_cached_downloads
875
876     cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
877                                                    varying_url_params=varying_url_params,
878                                                    prefer_cached_downloads=prefer_cached_downloads)
879     if cached[2] is not None:
880         return copy_collection(cached[2], src, dst, args)
881
882     cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
883                                                varying_url_params=varying_url_params,
884                                                prefer_cached_downloads=prefer_cached_downloads)
885
886     if cached is not None:
887         return {"uuid": cached[2]}
888
889
890 def abort(msg, code=1):
891     logger.info("arv-copy: %s", msg)
892     exit(code)
893
894
895 # Code for reporting on the progress of a collection upload.
896 # Stolen from arvados.commands.put.ArvPutCollectionWriter
897 # TODO(twp): figure out how to refactor into a shared library
898 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
899 # code)
900
901 def machine_progress(obj_uuid, bytes_written, bytes_expected):
902     return "{} {}: {} {} written {} total\n".format(
903         sys.argv[0],
904         os.getpid(),
905         obj_uuid,
906         bytes_written,
907         -1 if (bytes_expected is None) else bytes_expected)
908
909 def human_progress(obj_uuid, bytes_written, bytes_expected):
910     if bytes_expected:
911         return "\r{}: {}M / {}M {:.1%} ".format(
912             obj_uuid,
913             bytes_written >> 20, bytes_expected >> 20,
914             float(bytes_written) / bytes_expected)
915     else:
916         return "\r{}: {} ".format(obj_uuid, bytes_written)
917
918 class ProgressWriter(object):
919     _progress_func = None
920     outfile = sys.stderr
921
922     def __init__(self, progress_func):
923         self._progress_func = progress_func
924
925     def report(self, obj_uuid, bytes_written, bytes_expected):
926         if self._progress_func is not None:
927             self.outfile.write(
928                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
929
930     def finish(self):
931         self.outfile.write("\n")
932
933 if __name__ == '__main__':
934     main()