Merge branch '8206-gce-retry-init' closes #8206
[arvados.git] / crunch_scripts / crunchutil / robust_put.py
1 import arvados
2 import arvados.commands.put as put
3 import os
4 import logging
5 import time
6
7 def machine_progress(bytes_written, bytes_expected):
8     return "upload wrote {} total {}\n".format(
9         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
10
11 class Args(object):
12     def __init__(self, fn):
13         self.filename = None
14         self.paths = [fn]
15         self.max_manifest_depth = 0
16
17 # Upload to Keep with error recovery.
18 # Return a uuid or raise an exception if there are too many failures.
19 def upload(source_dir, logger=None):
20     if logger is None:
21         logger = logging.getLogger("arvados")
22
23     source_dir = os.path.abspath(source_dir)
24     done = False
25     if 'TASK_WORK' in os.environ:
26         resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
27     else:
28         resume_cache = put.ResumeCache(put.ResumeCache.make_path(Args(source_dir)))
29     reporter = put.progress_writer(machine_progress)
30     bytes_expected = put.expected_bytes_for([source_dir])
31     backoff = 1
32     outuuid = None
33     while not done:
34         try:
35             out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
36             out.do_queued_work()
37             out.write_directory_tree(source_dir, max_manifest_depth=0)
38             outuuid = out.finish()
39             done = True
40         except KeyboardInterrupt as e:
41             logger.critical("caught interrupt signal 2")
42             raise e
43         except Exception as e:
44             logger.exception("caught exception:")
45             backoff *= 2
46             if backoff > 256:
47                 logger.critical("Too many upload failures, giving up")
48                 raise e
49             else:
50                 logger.warning("Sleeping for %s seconds before trying again" % backoff)
51                 time.sleep(backoff)
52     return outuuid