Merge branch '2800-python-global-state' into 2800-pgs
[arvados.git] / crunch_scripts / crunchutil / robust_put.py
1 import arvados
2 import arvados.commands.put as put
3 import os
4 import logging
5
6 def machine_progress(bytes_written, bytes_expected):
7     return "upload wrote {} total {}\n".format(
8         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
9
10 class Args(object):
11     def __init__(self, fn):
12         self.filename = None
13         self.paths = [fn]
14         self.max_manifest_depth = 0
15
16 # Upload to Keep with error recovery.
17 # Return a uuid or raise an exception if there are too many failures.
18 def upload(source_dir):
19     source_dir = os.path.abspath(source_dir)
20     done = False
21     if 'TASK_WORK' in os.environ:
22         resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
23     else:
24         resume_cache = put.ResumeCache(put.ResumeCache.make_path(Args(source_dir)))
25     reporter = put.progress_writer(machine_progress)
26     bytes_expected = put.expected_bytes_for([source_dir])
27     backoff = 1
28     outuuid = None
29     while not done:
30         try:
31             out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
32             out.do_queued_work()
33             out.write_directory_tree(source_dir, max_manifest_depth=0)
34             outuuid = out.finish()
35             done = True
36         except KeyboardInterrupt as e:
37             logging.critical("caught interrupt signal 2")
38             raise e
39         except Exception as e:
40             logging.exception("caught exception:")
41             backoff *= 2
42             if backoff > 256:
43                 logging.critical("Too many upload failures, giving up")
44                 raise e
45             else:
46                 logging.warning("Sleeping for %s seconds before trying again" % backoff)
47                 time.sleep(backoff)
48     return outuuid