3505: Move helper scripts into crunchutil module. In run-command, added
[arvados.git] / crunch_scripts / crunchutil / robust_put.py
diff --git a/crunch_scripts/crunchutil/robust_put.py b/crunch_scripts/crunchutil/robust_put.py
new file mode 100644 (file)
index 0000000..158ceb1
--- /dev/null
@@ -0,0 +1,48 @@
+import arvados
+import arvados.commands.put as put
+import os
+import logging
+
+def machine_progress(bytes_written, bytes_expected):
+    return "upload wrote {} total {}\n".format(
+        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
+
+class Args(object):
+    def __init__(self, fn):
+        self.filename = None
+        self.paths = [fn]
+        self.max_manifest_depth = 0
+
+# Upload to Keep with error recovery.
+# Return a uuid or raise an exception if there are too many failures.
+def upload(source_dir):
+    source_dir = os.path.abspath(source_dir)
+    done = False
+    if 'TASK_WORK' in os.environ:
+        resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
+    else:
+        resume_cache = put.ResumeCache(put.ResumeCache.make_path(Args(source_dir)))
+    reporter = put.progress_writer(machine_progress)
+    bytes_expected = put.expected_bytes_for([source_dir])
+    backoff = 1
+    outuuid = None
+    while not done:
+        try:
+            out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
+            out.do_queued_work()
+            out.write_directory_tree(source_dir, max_manifest_depth=0)
+            outuuid = out.finish()
+            done = True
+        except KeyboardInterrupt as e:
+            logging.critical("caught interrupt signal 2")
+            raise e
+        except Exception as e:
+            logging.exception("caught exception:")
+            backoff *= 2
+            if backoff > 256:
+                logging.critical("Too many upload failures, giving up")
+                raise e
+            else:
+                logging.warning("Sleeping for %s seconds before trying again" % backoff)
+                time.sleep(backoff)
+    return outuuid