import argparse
import arvados
+import base64
import errno
import fcntl
import hashlib
self.close()
+class ResumeCacheCollectionWriter(arvados.ResumableCollectionWriter):
+ def __init__(self, cache=None):
+ self.cache = cache
+ super(ResumeCacheCollectionWriter, self).__init__()
+
+ @classmethod
+ def from_cache(cls, cache):
+ try:
+ state = cache.load()
+ state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
+ writer = cls.from_state(state)
+ except (TypeError, ValueError,
+ arvados.errors.StaleWriterStateError) as error:
+ return cls(cache)
+ else:
+ writer.cache = cache
+ return writer
+
+ def checkpoint_state(self):
+ if self.cache is None:
+ return
+ state = self.dump_state()
+ # Transform attributes for serialization.
+ for attr, value in state.items():
+ if attr == '_data_buffer':
+ state[attr] = base64.encodestring(''.join(value))
+ elif hasattr(value, 'popleft'):
+ state[attr] = list(value)
+ self.cache.save(state)
+
+
class CollectionWriterWithProgress(arvados.CollectionWriter):
def flush_data(self, *args, **kwargs):
if not getattr(self, 'display_type', None):