Merge branch '8784-dir-listings'
[arvados.git] / crunch_scripts / crunchutil / robust_put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import arvados
6 import arvados.commands.put as put
7 import os
8 import logging
9 import time
10
11 def machine_progress(bytes_written, bytes_expected):
12     return "upload wrote {} total {}\n".format(
13         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
14
15 class Args(object):
16     def __init__(self, fn):
17         self.filename = None
18         self.paths = [fn]
19         self.max_manifest_depth = 0
20
21 # Upload to Keep with error recovery.
22 # Return a uuid or raise an exception if there are too many failures.
23 def upload(source_dir, logger=None):
24     if logger is None:
25         logger = logging.getLogger("arvados")
26
27     source_dir = os.path.abspath(source_dir)
28     done = False
29     if 'TASK_WORK' in os.environ:
30         resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
31     else:
32         resume_cache = put.ResumeCache(put.ResumeCache.make_path(Args(source_dir)))
33     reporter = put.progress_writer(machine_progress)
34     bytes_expected = put.expected_bytes_for([source_dir])
35     backoff = 1
36     outuuid = None
37     while not done:
38         try:
39             out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
40             out.do_queued_work()
41             out.write_directory_tree(source_dir, max_manifest_depth=0)
42             outuuid = out.finish()
43             done = True
44         except KeyboardInterrupt as e:
45             logger.critical("caught interrupt signal 2")
46             raise e
47         except Exception as e:
48             logger.exception("caught exception:")
49             backoff *= 2
50             if backoff > 256:
51                 logger.critical("Too many upload failures, giving up")
52                 raise e
53             else:
54                 logger.warning("Sleeping for %s seconds before trying again" % backoff)
55                 time.sleep(backoff)
56     return outuuid