Merge branch 'zoe-translates/python-sdk-arv_copy-thread-exception'
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have configuration files {src}.conf and
17 # {dst}.conf in a standard configuration directory with valid login credentials
18 # for instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 import argparse
22 import contextlib
23 import getpass
24 import os
25 import re
26 import shutil
27 import subprocess
28 import sys
29 import logging
30 import tempfile
31 import urllib.parse
32 import io
33 import json
34 import queue
35 import threading
36
37 import arvados
38 import arvados.config
39 import arvados.keep
40 import arvados.util
41 import arvados.commands._util as arv_cmd
42 import arvados.commands.keepdocker
43 import arvados.http_to_keep
44
45 from arvados._version import __version__
46
47 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
48
49 logger = logging.getLogger('arvados.arv-copy')
50
51 # local_repo_dir records which git repositories from the Arvados source
52 # instance have been checked out locally during this run, and to which
53 # directories.
54 # e.g. if repository 'twp' from src_arv has been cloned into
55 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
56 #
57 local_repo_dir = {}
58
59 # List of collections that have been copied in this session, and their
60 # destination collection UUIDs.
61 collections_copied = {}
62
63 # Set of (repository, script_version) two-tuples of commits copied in git.
64 scripts_copied = set()
65
66 # The owner_uuid of the object being copied
67 src_owner_uuid = None
68
69 def main():
70     copy_opts = argparse.ArgumentParser(add_help=False)
71
72     copy_opts.add_argument(
73         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
74         help='Print version and exit.')
75     copy_opts.add_argument(
76         '-v', '--verbose', dest='verbose', action='store_true',
77         help='Verbose output.')
78     copy_opts.add_argument(
79         '--progress', dest='progress', action='store_true',
80         help='Report progress on copying collections. (default)')
81     copy_opts.add_argument(
82         '--no-progress', dest='progress', action='store_false',
83         help='Do not report progress on copying collections.')
84     copy_opts.add_argument(
85         '-f', '--force', dest='force', action='store_true',
86         help='Perform copy even if the object appears to exist at the remote destination.')
87     copy_opts.add_argument(
88         '--src', dest='source_arvados',
89         help="""
90 Client configuration location for the source Arvados cluster.
91 May be either a configuration file path, or a plain identifier like `foo`
92 to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
93 If not provided, will search for a configuration file named after the cluster ID of the source object UUID.
94 """,
95     )
96     copy_opts.add_argument(
97         '--dst', dest='destination_arvados',
98         help="""
99 Client configuration location for the destination Arvados cluster.
100 May be either a configuration file path, or a plain identifier like `foo`
101 to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
102 If not provided, will use the default client configuration from the environment or `settings.conf`.
103 """,
104     )
105     copy_opts.add_argument(
106         '--recursive', dest='recursive', action='store_true',
107         help='Recursively copy any dependencies for this object, and subprojects. (default)')
108     copy_opts.add_argument(
109         '--no-recursive', dest='recursive', action='store_false',
110         help='Do not copy any dependencies or subprojects.')
111     copy_opts.add_argument(
112         '--project-uuid', dest='project_uuid',
113         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
114     copy_opts.add_argument(
115         '--storage-classes', dest='storage_classes',
116         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
117     copy_opts.add_argument("--varying-url-params", type=str, default="",
118                         help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
119
120     copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
121                         help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
122
123     copy_opts.add_argument(
124         'object_uuid',
125         help='The UUID of the object to be copied.')
126     copy_opts.set_defaults(progress=True)
127     copy_opts.set_defaults(recursive=True)
128
129     parser = argparse.ArgumentParser(
130         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
131         parents=[copy_opts, arv_cmd.retry_opt])
132     args = parser.parse_args()
133
134     if args.storage_classes:
135         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
136
137     if args.verbose:
138         logger.setLevel(logging.DEBUG)
139     else:
140         logger.setLevel(logging.INFO)
141
142     if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
143         args.source_arvados = args.object_uuid[:5]
144
145     # Create API clients for the source and destination instances
146     src_arv = api_for_instance(args.source_arvados, args.retries)
147     dst_arv = api_for_instance(args.destination_arvados, args.retries)
148
149     if not args.project_uuid:
150         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
151
152     # Identify the kind of object we have been given, and begin copying.
153     t = uuid_type(src_arv, args.object_uuid)
154
155     try:
156         if t == 'Collection':
157             set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
158             result = copy_collection(args.object_uuid,
159                                      src_arv, dst_arv,
160                                      args)
161         elif t == 'Workflow':
162             set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
163             result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
164         elif t == 'Group':
165             set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
166             result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
167         elif t == 'httpURL':
168             result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
169         else:
170             abort("cannot copy object {} of type {}".format(args.object_uuid, t))
171     except Exception as e:
172         logger.error("%s", e, exc_info=args.verbose)
173         exit(1)
174
175     # Clean up any outstanding temp git repositories.
176     for d in local_repo_dir.values():
177         shutil.rmtree(d, ignore_errors=True)
178
179     if not result:
180         exit(1)
181
182     # If no exception was thrown and the response does not have an
183     # error_token field, presume success
184     if result is None or 'error_token' in result or 'uuid' not in result:
185         if result:
186             logger.error("API server returned an error result: {}".format(result))
187         exit(1)
188
189     print(result['uuid'])
190
191     if result.get('partial_error'):
192         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
193         exit(1)
194
195     logger.info("Success: created copy with uuid {}".format(result['uuid']))
196     exit(0)
197
198 def set_src_owner_uuid(resource, uuid, args):
199     global src_owner_uuid
200     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
201     src_owner_uuid = c.get("owner_uuid")
202
203 # api_for_instance(instance_name)
204 #
205 #     Creates an API client for the Arvados instance identified by
206 #     instance_name.
207 #
208 #     If instance_name contains a slash, it is presumed to be a path
209 #     (either local or absolute) to a file with Arvados configuration
210 #     settings.
211 #
212 #     Otherwise, it is presumed to be the name of a file in a standard
213 #     configuration directory.
214 #
215 def api_for_instance(instance_name, num_retries):
216     if not instance_name:
217         # Use environment
218         return arvados.api('v1')
219
220     if '/' in instance_name:
221         config_file = instance_name
222     else:
223         dirs = arvados.util._BaseDirectories('CONFIG')
224         config_file = next(dirs.search(f'{instance_name}.conf'), '')
225
226     try:
227         cfg = arvados.config.load(config_file)
228     except OSError as e:
229         if config_file:
230             verb = 'open'
231         else:
232             verb = 'find'
233             config_file = f'{instance_name}.conf'
234         abort(("Could not {} config file {}: {}\n" +
235                "You must make sure that your configuration tokens\n" +
236                "for Arvados instance {} are in {} and that this\n" +
237                "file is readable.").format(
238                    verb, config_file, e.strerror, instance_name, config_file))
239
240     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
241         api_is_insecure = (
242             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
243                 ['1', 't', 'true', 'y', 'yes']))
244         client = arvados.api('v1',
245                              host=cfg['ARVADOS_API_HOST'],
246                              token=cfg['ARVADOS_API_TOKEN'],
247                              insecure=api_is_insecure,
248                              num_retries=num_retries,
249                              )
250     else:
251         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
252     return client
253
254 # Check if git is available
255 def check_git_availability():
256     try:
257         subprocess.run(
258             ['git', '--version'],
259             check=True,
260             stdout=subprocess.DEVNULL,
261         )
262     except FileNotFoundError:
263         abort('git command is not available. Please ensure git is installed.')
264
265
266 def filter_iter(arg):
267     """Iterate a filter string-or-list.
268
269     Pass in a filter field that can either be a string or list.
270     This will iterate elements as if the field had been written as a list.
271     """
272     if isinstance(arg, str):
273         yield arg
274     else:
275         yield from arg
276
277 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
278     """Update a single repository filter in-place for the destination.
279
280     If the filter checks that the repository is src_repository, it is
281     updated to check that the repository is dst_repository.  If it does
282     anything else, this function raises ValueError.
283     """
284     if src_repository is None:
285         raise ValueError("component does not specify a source repository")
286     elif dst_repository is None:
287         raise ValueError("no destination repository specified to update repository filter")
288     elif repo_filter[1:] == ['=', src_repository]:
289         repo_filter[2] = dst_repository
290     elif repo_filter[1:] == ['in', [src_repository]]:
291         repo_filter[2] = [dst_repository]
292     else:
293         raise ValueError("repository filter is not a simple source match")
294
295 def migrate_script_version_filter(version_filter):
296     """Update a single script_version filter in-place for the destination.
297
298     Currently this function checks that all the filter operands are Git
299     commit hashes.  If they're not, it raises ValueError to indicate that
300     the filter is not portable.  It could be extended to make other
301     transformations in the future.
302     """
303     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
304         raise ValueError("script_version filter is not limited to commit hashes")
305
306 def attr_filtered(filter_, *attr_names):
307     """Return True if filter_ applies to any of attr_names, else False."""
308     return any((name == 'any') or (name in attr_names)
309                for name in filter_iter(filter_[0]))
310
311 @contextlib.contextmanager
312 def exception_handler(handler, *exc_types):
313     """If any exc_types are raised in the block, call handler on the exception."""
314     try:
315         yield
316     except exc_types as error:
317         handler(error)
318
319
320 # copy_workflow(wf_uuid, src, dst, args)
321 #
322 #    Copies a workflow identified by wf_uuid from src to dst.
323 #
324 #    If args.recursive is True, also copy any collections
325 #      referenced in the workflow definition yaml.
326 #
327 #    The owner_uuid of the new workflow is set to any given
328 #      project_uuid or the user who copied the template.
329 #
330 #    Returns the copied workflow object.
331 #
332 def copy_workflow(wf_uuid, src, dst, args):
333     # fetch the workflow from the source instance
334     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
335
336     if not wf["definition"]:
337         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
338
339     # copy collections and docker images
340     if args.recursive and wf["definition"]:
341         env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
342                "ARVADOS_API_TOKEN": src.api_token,
343                "PATH": os.environ["PATH"]}
344         try:
345             result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
346                                     capture_output=True, env=env)
347         except FileNotFoundError:
348             no_arv_copy = True
349         else:
350             no_arv_copy = result.returncode == 2
351
352         if no_arv_copy:
353             raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
354         elif result.returncode != 0:
355             raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
356
357         locations = json.loads(result.stdout)
358
359         if locations:
360             copy_collections(locations, src, dst, args)
361
362     # copy the workflow itself
363     del wf['uuid']
364     wf['owner_uuid'] = args.project_uuid
365
366     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
367                                              ["name", "=", wf["name"]]]).execute()
368     if len(existing["items"]) == 0:
369         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
370     else:
371         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
372
373
374 def workflow_collections(obj, locations, docker_images):
375     if isinstance(obj, dict):
376         loc = obj.get('location', None)
377         if loc is not None:
378             if loc.startswith("keep:"):
379                 locations.append(loc[5:])
380
381         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
382         if docker_image is not None:
383             ds = docker_image.split(":", 1)
384             tag = ds[1] if len(ds)==2 else 'latest'
385             docker_images[ds[0]] = tag
386
387         for x in obj:
388             workflow_collections(obj[x], locations, docker_images)
389     elif isinstance(obj, list):
390         for x in obj:
391             workflow_collections(x, locations, docker_images)
392
393 # copy_collections(obj, src, dst, args)
394 #
395 #    Recursively copies all collections referenced by 'obj' from src
396 #    to dst.  obj may be a dict or a list, in which case we run
397 #    copy_collections on every value it contains. If it is a string,
398 #    search it for any substring that matches a collection hash or uuid
399 #    (this will find hidden references to collections like
400 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
401 #
402 #    Returns a copy of obj with any old collection uuids replaced by
403 #    the new ones.
404 #
405 def copy_collections(obj, src, dst, args):
406
407     def copy_collection_fn(collection_match):
408         """Helper function for regex substitution: copies a single collection,
409         identified by the collection_match MatchObject, to the
410         destination.  Returns the destination collection uuid (or the
411         portable data hash if that's what src_id is).
412
413         """
414         src_id = collection_match.group(0)
415         if src_id not in collections_copied:
416             dst_col = copy_collection(src_id, src, dst, args)
417             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
418                 collections_copied[src_id] = src_id
419             else:
420                 collections_copied[src_id] = dst_col['uuid']
421         return collections_copied[src_id]
422
423     if isinstance(obj, str):
424         # Copy any collections identified in this string to dst, replacing
425         # them with the dst uuids as necessary.
426         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
427         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
428         return obj
429     elif isinstance(obj, dict):
430         return type(obj)((v, copy_collections(obj[v], src, dst, args))
431                          for v in obj)
432     elif isinstance(obj, list):
433         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
434     return obj
435
436
437 def total_collection_size(manifest_text):
438     """Return the total number of bytes in this collection (excluding
439     duplicate blocks)."""
440
441     total_bytes = 0
442     locators_seen = {}
443     for line in manifest_text.splitlines():
444         words = line.split()
445         for word in words[1:]:
446             try:
447                 loc = arvados.KeepLocator(word)
448             except ValueError:
449                 continue  # this word isn't a locator, skip it
450             if loc.md5sum not in locators_seen:
451                 locators_seen[loc.md5sum] = True
452                 total_bytes += loc.size
453
454     return total_bytes
455
456 def create_collection_from(c, src, dst, args):
457     """Create a new collection record on dst, and copy Docker metadata if
458     available."""
459
460     collection_uuid = c['uuid']
461     body = {}
462     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
463         body[d] = c[d]
464
465     if not body["name"]:
466         body['name'] = "copied from " + collection_uuid
467
468     if args.storage_classes:
469         body['storage_classes_desired'] = args.storage_classes
470
471     body['owner_uuid'] = args.project_uuid
472
473     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
474
475     # Create docker_image_repo+tag and docker_image_hash links
476     # at the destination.
477     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
478         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
479
480         for src_link in docker_links:
481             body = {key: src_link[key]
482                     for key in ['link_class', 'name', 'properties']}
483             body['head_uuid'] = dst_collection['uuid']
484             body['owner_uuid'] = args.project_uuid
485
486             lk = dst.links().create(body=body).execute(num_retries=args.retries)
487             logger.debug('created dst link {}'.format(lk))
488
489     return dst_collection
490
491 # copy_collection(obj_uuid, src, dst, args)
492 #
493 #    Copies the collection identified by obj_uuid from src to dst.
494 #    Returns the collection object created at dst.
495 #
496 #    If args.progress is True, produce a human-friendly progress
497 #    report.
498 #
499 #    If a collection with the desired portable_data_hash already
500 #    exists at dst, and args.force is False, copy_collection returns
501 #    the existing collection without copying any blocks.  Otherwise
502 #    (if no collection exists or if args.force is True)
503 #    copy_collection copies all of the collection data blocks from src
504 #    to dst.
505 #
506 #    For this application, it is critical to preserve the
507 #    collection's manifest hash, which is not guaranteed with the
508 #    arvados.CollectionReader and arvados.CollectionWriter classes.
509 #    Copying each block in the collection manually, followed by
510 #    the manifest block, ensures that the collection's manifest
511 #    hash will not change.
512 #
513 def copy_collection(obj_uuid, src, dst, args):
514     if arvados.util.keep_locator_pattern.match(obj_uuid):
515         # If the obj_uuid is a portable data hash, it might not be
516         # uniquely identified with a particular collection.  As a
517         # result, it is ambiguous as to what name to use for the copy.
518         # Apply some heuristics to pick which collection to get the
519         # name from.
520         srccol = src.collections().list(
521             filters=[['portable_data_hash', '=', obj_uuid]],
522             order="created_at asc"
523             ).execute(num_retries=args.retries)
524
525         items = srccol.get("items")
526
527         if not items:
528             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
529             return
530
531         c = None
532
533         if len(items) == 1:
534             # There's only one collection with the PDH, so use that.
535             c = items[0]
536         if not c:
537             # See if there is a collection that's in the same project
538             # as the root item (usually a workflow) being copied.
539             for i in items:
540                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
541                     c = i
542                     break
543         if not c:
544             # Didn't find any collections located in the same project, so
545             # pick the oldest collection that has a name assigned to it.
546             for i in items:
547                 if i.get("name"):
548                     c = i
549                     break
550         if not c:
551             # None of the collections have names (?!), so just pick the
552             # first one.
553             c = items[0]
554
555         # list() doesn't return manifest text (and we don't want it to,
556         # because we don't need the same maninfest text sent to us 50
557         # times) so go and retrieve the collection object directly
558         # which will include the manifest text.
559         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
560     else:
561         # Assume this is an actual collection uuid, so fetch it directly.
562         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
563
564     # If a collection with this hash already exists at the
565     # destination, and 'force' is not true, just return that
566     # collection.
567     if not args.force:
568         if 'portable_data_hash' in c:
569             colhash = c['portable_data_hash']
570         else:
571             colhash = c['uuid']
572         dstcol = dst.collections().list(
573             filters=[['portable_data_hash', '=', colhash]]
574         ).execute(num_retries=args.retries)
575         if dstcol['items_available'] > 0:
576             for d in dstcol['items']:
577                 if ((args.project_uuid == d['owner_uuid']) and
578                     (c.get('name') == d['name']) and
579                     (c['portable_data_hash'] == d['portable_data_hash'])):
580                     return d
581             c['manifest_text'] = dst.collections().get(
582                 uuid=dstcol['items'][0]['uuid']
583             ).execute(num_retries=args.retries)['manifest_text']
584             return create_collection_from(c, src, dst, args)
585
586     # Fetch the collection's manifest.
587     manifest = c['manifest_text']
588     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
589
590     # Copy each block from src_keep to dst_keep.
591     # Use the newly signed locators returned from dst_keep to build
592     # a new manifest as we go.
593     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
594     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
595     dst_manifest = io.StringIO()
596     dst_locators = {}
597     bytes_written = 0
598     bytes_expected = total_collection_size(manifest)
599     if args.progress:
600         progress_writer = ProgressWriter(human_progress)
601     else:
602         progress_writer = None
603
604     # go through the words
605     # put each block loc into 'get' queue
606     # 'get' threads get block and put it into 'put' queue
607     # 'put' threads put block and then update dst_locators
608     #
609     # after going through the whole manifest we go back through it
610     # again and build dst_manifest
611
612     lock = threading.Lock()
613
614     # the get queue should be unbounded because we'll add all the
615     # block hashes we want to get, but these are small
616     get_queue = queue.Queue()
617
618     threadcount = 4
619
620     # the put queue contains full data blocks
621     # and if 'get' is faster than 'put' we could end up consuming
622     # a great deal of RAM if it isn't bounded.
623     put_queue = queue.Queue(threadcount)
624     transfer_error = []
625
626     def get_thread():
627         while True:
628             word = get_queue.get()
629             if word is None:
630                 put_queue.put(None)
631                 get_queue.task_done()
632                 return
633
634             blockhash = arvados.KeepLocator(word).md5sum
635             with lock:
636                 if blockhash in dst_locators:
637                     # Already uploaded
638                     get_queue.task_done()
639                     continue
640
641             try:
642                 logger.debug("Getting block %s", word)
643                 data = src_keep.get(word)
644                 put_queue.put((word, data))
645             except Exception as e:
646                 logger.error("Error getting block %s: %s", word, e)
647                 transfer_error.append(e)
648                 try:
649                     # Drain the 'get' queue so we end early
650                     while True:
651                         get_queue.get(False)
652                         get_queue.task_done()
653                 except queue.Empty:
654                     pass
655             finally:
656                 get_queue.task_done()
657
658     def put_thread():
659         nonlocal bytes_written
660         while True:
661             item = put_queue.get()
662             if item is None:
663                 put_queue.task_done()
664                 return
665
666             word, data = item
667             loc = arvados.KeepLocator(word)
668             blockhash = loc.md5sum
669             with lock:
670                 if blockhash in dst_locators:
671                     # Already uploaded
672                     put_queue.task_done()
673                     continue
674
675             try:
676                 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
677                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
678                 with lock:
679                     dst_locators[blockhash] = dst_locator
680                     bytes_written += loc.size
681                     if progress_writer:
682                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
683             except Exception as e:
684                 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
685                 try:
686                     # Drain the 'get' queue so we end early
687                     while True:
688                         get_queue.get(False)
689                         get_queue.task_done()
690                 except queue.Empty:
691                     pass
692                 transfer_error.append(e)
693             finally:
694                 put_queue.task_done()
695
696     for line in manifest.splitlines():
697         words = line.split()
698         for word in words[1:]:
699             try:
700                 loc = arvados.KeepLocator(word)
701             except ValueError:
702                 # If 'word' can't be parsed as a locator,
703                 # presume it's a filename.
704                 continue
705
706             get_queue.put(word)
707
708     for i in range(0, threadcount):
709         get_queue.put(None)
710
711     for i in range(0, threadcount):
712         threading.Thread(target=get_thread, daemon=True).start()
713
714     for i in range(0, threadcount):
715         threading.Thread(target=put_thread, daemon=True).start()
716
717     get_queue.join()
718     put_queue.join()
719
720     if len(transfer_error) > 0:
721         return {"error_token": "Failed to transfer blocks"}
722
723     for line in manifest.splitlines():
724         words = line.split()
725         dst_manifest.write(words[0])
726         for word in words[1:]:
727             try:
728                 loc = arvados.KeepLocator(word)
729             except ValueError:
730                 # If 'word' can't be parsed as a locator,
731                 # presume it's a filename.
732                 dst_manifest.write(' ')
733                 dst_manifest.write(word)
734                 continue
735             blockhash = loc.md5sum
736             dst_manifest.write(' ')
737             dst_manifest.write(dst_locators[blockhash])
738         dst_manifest.write("\n")
739
740     if progress_writer:
741         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
742         progress_writer.finish()
743
744     # Copy the manifest and save the collection.
745     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
746
747     c['manifest_text'] = dst_manifest.getvalue()
748     return create_collection_from(c, src, dst, args)
749
750 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
751     """Copy the docker image identified by docker_image and
752     docker_image_tag from src to dst. Create appropriate
753     docker_image_repo+tag and docker_image_hash links at dst.
754
755     """
756
757     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
758
759     # Find the link identifying this docker image.
760     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
761         src, args.retries, docker_image, docker_image_tag)
762     if docker_image_list:
763         image_uuid, image_info = docker_image_list[0]
764         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
765
766         # Copy the collection it refers to.
767         dst_image_col = copy_collection(image_uuid, src, dst, args)
768     elif arvados.util.keep_locator_pattern.match(docker_image):
769         dst_image_col = copy_collection(docker_image, src, dst, args)
770     else:
771         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
772
773 def copy_project(obj_uuid, src, dst, owner_uuid, args):
774
775     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
776
777     # Create/update the destination project
778     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
779                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
780     if len(existing["items"]) == 0:
781         project_record = dst.groups().create(body={"group": {"group_class": "project",
782                                                              "owner_uuid": owner_uuid,
783                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
784     else:
785         project_record = existing["items"][0]
786
787     dst.groups().update(uuid=project_record["uuid"],
788                         body={"group": {
789                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
790
791     args.project_uuid = project_record["uuid"]
792
793     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
794
795
796     partial_error = ""
797
798     # Copy collections
799     try:
800         copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
801                          src, dst, args)
802     except Exception as e:
803         partial_error += "\n" + str(e)
804
805     # Copy workflows
806     for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
807         try:
808             copy_workflow(w["uuid"], src, dst, args)
809         except Exception as e:
810             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
811
812     if args.recursive:
813         for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
814             try:
815                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
816             except Exception as e:
817                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
818
819     project_record["partial_error"] = partial_error
820
821     return project_record
822
823 # git_rev_parse(rev, repo)
824 #
825 #    Returns the 40-character commit hash corresponding to 'rev' in
826 #    git repository 'repo' (which must be the path of a local git
827 #    repository)
828 #
829 def git_rev_parse(rev, repo):
830     proc = subprocess.run(
831         ['git', 'rev-parse', rev],
832         check=True,
833         cwd=repo,
834         stdout=subprocess.PIPE,
835         text=True,
836     )
837     return proc.stdout.read().strip()
838
839 # uuid_type(api, object_uuid)
840 #
841 #    Returns the name of the class that object_uuid belongs to, based on
842 #    the second field of the uuid.  This function consults the api's
843 #    schema to identify the object class.
844 #
845 #    It returns a string such as 'Collection', 'Workflow', etc.
846 #
847 #    Special case: if handed a Keep locator hash, return 'Collection'.
848 #
849 def uuid_type(api, object_uuid):
850     if re.match(arvados.util.keep_locator_pattern, object_uuid):
851         return 'Collection'
852
853     if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
854         return 'httpURL'
855
856     p = object_uuid.split('-')
857     if len(p) == 3:
858         type_prefix = p[1]
859         for k in api._schema.schemas:
860             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
861             if type_prefix == obj_class:
862                 return k
863     return None
864
865
866 def copy_from_http(url, src, dst, args):
867
868     project_uuid = args.project_uuid
869     varying_url_params = args.varying_url_params
870     prefer_cached_downloads = args.prefer_cached_downloads
871
872     cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
873                                                    varying_url_params=varying_url_params,
874                                                    prefer_cached_downloads=prefer_cached_downloads)
875     if cached[2] is not None:
876         return copy_collection(cached[2], src, dst, args)
877
878     cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
879                                                varying_url_params=varying_url_params,
880                                                prefer_cached_downloads=prefer_cached_downloads)
881
882     if cached is not None:
883         return {"uuid": cached[2]}
884
885
886 def abort(msg, code=1):
887     logger.info("arv-copy: %s", msg)
888     exit(code)
889
890
891 # Code for reporting on the progress of a collection upload.
892 # Stolen from arvados.commands.put.ArvPutCollectionWriter
893 # TODO(twp): figure out how to refactor into a shared library
894 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
895 # code)
896
897 def machine_progress(obj_uuid, bytes_written, bytes_expected):
898     return "{} {}: {} {} written {} total\n".format(
899         sys.argv[0],
900         os.getpid(),
901         obj_uuid,
902         bytes_written,
903         -1 if (bytes_expected is None) else bytes_expected)
904
905 def human_progress(obj_uuid, bytes_written, bytes_expected):
906     if bytes_expected:
907         return "\r{}: {}M / {}M {:.1%} ".format(
908             obj_uuid,
909             bytes_written >> 20, bytes_expected >> 20,
910             float(bytes_written) / bytes_expected)
911     else:
912         return "\r{}: {} ".format(obj_uuid, bytes_written)
913
914 class ProgressWriter(object):
915     _progress_func = None
916     outfile = sys.stderr
917
918     def __init__(self, progress_func):
919         self._progress_func = progress_func
920
921     def report(self, obj_uuid, bytes_written, bytes_expected):
922         if self._progress_func is not None:
923             self.outfile.write(
924                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
925
926     def finish(self):
927         self.outfile.write("\n")
928
929 if __name__ == '__main__':
930     main()