total data size).
""")
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument('--resume', action='store_true', default=True,
+ help="""
+ Continue interrupted uploads from cached state (default).
+ """)
+ group.add_argument('--no-resume', action='store_false', dest='resume',
+ help="""
+ Do not continue interrupted uploads from cached state.
+ """)
+
args = parser.parse_args(arguments)
if len(args.paths) == 0:
class ResumeCache(object):
CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
+ @classmethod
+ def setup_user_cache(cls):
+ try:
+ os.makedirs(cls.CACHE_DIR)
+ except OSError as error:
+ if error.errno != errno.EEXIST:
+ raise
+ else:
+ os.chmod(cls.CACHE_DIR, 0o700)
+
def __init__(self, file_spec):
try:
self.cache_file = open(file_spec, 'a+')
class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
- def __init__(self, cache=None, reporter=None, bytes_expected=None):
- self.__init_locals__(cache, reporter, bytes_expected)
- super(ArvPutCollectionWriter, self).__init__()
+ STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
+ ['bytes_written'])
- def __init_locals__(self, cache, reporter, bytes_expected):
+ def __init__(self, cache=None, reporter=None, bytes_expected=None):
+ self.bytes_written = 0
self.cache = cache
self.report_func = reporter
- self.bytes_written = 0
self.bytes_expected = bytes_expected
+ super(ArvPutCollectionWriter, self).__init__()
@classmethod
def from_cache(cls, cache, reporter=None, bytes_expected=None):
try:
state = cache.load()
state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
- writer = cls.from_state(state)
+ writer = cls.from_state(state, cache, reporter, bytes_expected)
except (TypeError, ValueError,
arvados.errors.StaleWriterStateError) as error:
return cls(cache, reporter, bytes_expected)
else:
- writer.__init_locals__(cache, reporter, bytes_expected)
return writer
def checkpoint_state(self):
return write_progress
def main(arguments=None):
+ ResumeCache.setup_user_cache()
args = parse_arguments(arguments)
if args.progress:
else:
reporter = None
- writer = ArvPutCollectionWriter(
- reporter=reporter, bytes_expected=expected_bytes_for(args.paths))
+ try:
+ resume_cache = ResumeCache(args)
+ if not args.resume:
+ resume_cache.restart()
+ except ResumeCacheConflict:
+ print "arv-put: Another process is already uploading this data."
+ sys.exit(1)
+
+ writer = ArvPutCollectionWriter.from_cache(
+ resume_cache, reporter, expected_bytes_for(args.paths))
# Copy file data to Keep.
for path in args.paths:
# Print the locator (uuid) of the new collection.
print writer.finish()
+ resume_cache.destroy()
if __name__ == '__main__':
main()