2752: Add ResumableCollectionWriter serialization to arv-put.
[arvados.git] / sdk / python / arvados / commands / put.py
index a0dec2b05acd7aed316e3a6c1b9fe93a74f32ae8..d3da8cfc8c687819c4714aae5ff1c7f332396292 100644 (file)
@@ -5,6 +5,7 @@
 
 import argparse
 import arvados
+import base64
 import errno
 import fcntl
 import hashlib
@@ -198,6 +199,37 @@ class ResumeCache(object):
         self.close()
 
 
+class ResumeCacheCollectionWriter(arvados.ResumableCollectionWriter):
+    def __init__(self, cache=None):
+        self.cache = cache
+        super(ResumeCacheCollectionWriter, self).__init__()
+
+    @classmethod
+    def from_cache(cls, cache):
+        try:
+            state = cache.load()
+            state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
+            writer = cls.from_state(state)
+        except (TypeError, ValueError,
+                arvados.errors.StaleWriterStateError) as error:
+            return cls(cache)
+        else:
+            writer.cache = cache
+            return writer
+
+    def checkpoint_state(self):
+        if self.cache is None:
+            return
+        state = self.dump_state()
+        # Transform attributes for serialization.
+        for attr, value in state.items():
+            if attr == '_data_buffer':
+                state[attr] = base64.encodestring(''.join(value))
+            elif hasattr(value, 'popleft'):
+                state[attr] = list(value)
+        self.cache.save(state)
+
+
 class CollectionWriterWithProgress(arvados.CollectionWriter):
     def flush_data(self, *args, **kwargs):
         if not getattr(self, 'display_type', None):