import argparse
import arvados
+import base64
+import errno
+import fcntl
import hashlib
+import json
import os
import sys
+import tempfile
def parse_arguments(arguments):
parser = argparse.ArgumentParser(
total data size).
""")
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument('--resume', action='store_true', default=True,
+ help="""
+ Continue interrupted uploads from cached state (default).
+ """)
+ group.add_argument('--no-resume', action='store_false', dest='resume',
+ help="""
+ Do not continue interrupted uploads from cached state.
+ """)
+
args = parser.parse_args(arguments)
if len(args.paths) == 0:
return args
+class ResumeCacheConflict(Exception):
+ pass
+
+
class ResumeCache(object):
CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
+ @classmethod
+ def setup_user_cache(cls):
+ try:
+ os.makedirs(cls.CACHE_DIR)
+ except OSError as error:
+ if error.errno != errno.EEXIST:
+ raise
+ else:
+ os.chmod(cls.CACHE_DIR, 0o700)
+
+ def __init__(self, file_spec):
+ try:
+ self.cache_file = open(file_spec, 'a+')
+ except TypeError:
+ file_spec = self.make_path(file_spec)
+ self.cache_file = open(file_spec, 'a+')
+ self._lock_file(self.cache_file)
+ self.filename = self.cache_file.name
+
@classmethod
def make_path(cls, args):
md5 = hashlib.md5()
md5.update(args.filename)
return os.path.join(cls.CACHE_DIR, md5.hexdigest())
-
-class CollectionWriterWithProgress(arvados.CollectionWriter):
- def flush_data(self, *args, **kwargs):
- if not getattr(self, 'display_type', None):
- return
- if not hasattr(self, 'bytes_flushed'):
- self.bytes_flushed = 0
- self.bytes_flushed += self._data_buffer_len
- super(CollectionWriterWithProgress, self).flush_data(*args, **kwargs)
- self.bytes_flushed -= self._data_buffer_len
- if self.display_type == 'machine':
- sys.stderr.write('%s %d: %d written %d total\n' %
- (sys.argv[0],
- os.getpid(),
- self.bytes_flushed,
- getattr(self, 'bytes_expected', -1)))
- elif getattr(self, 'bytes_expected', 0) > 0:
- pct = 100.0 * self.bytes_flushed / self.bytes_expected
- sys.stderr.write('\r%dM / %dM %.1f%% ' %
- (self.bytes_flushed >> 20,
- self.bytes_expected >> 20, pct))
+ def _lock_file(self, fileobj):
+ try:
+ fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+ def load(self):
+ self.cache_file.seek(0)
+ return json.load(self.cache_file)
+
+ def save(self, data):
+ try:
+ new_cache_fd, new_cache_name = tempfile.mkstemp(
+ dir=os.path.dirname(self.filename))
+ self._lock_file(new_cache_fd)
+ new_cache = os.fdopen(new_cache_fd, 'r+')
+ json.dump(data, new_cache)
+ os.rename(new_cache_name, self.filename)
+ except (IOError, OSError, ResumeCacheConflict) as error:
+ try:
+ os.unlink(new_cache_name)
+ except NameError: # mkstemp failed.
+ pass
else:
- sys.stderr.write('\r%d ' % self.bytes_flushed)
+ self.cache_file.close()
+ self.cache_file = new_cache
- def manifest_text(self, *args, **kwargs):
- manifest_text = (super(CollectionWriterWithProgress, self)
- .manifest_text(*args, **kwargs))
- if getattr(self, 'display_type', None):
- if self.display_type == 'human':
- sys.stderr.write('\n')
- self.display_type = None
- return manifest_text
+ def close(self):
+ self.cache_file.close()
+ def destroy(self):
+ try:
+ os.unlink(self.filename)
+ except OSError as error:
+ if error.errno != errno.ENOENT: # That's what we wanted anyway.
+ raise
+ self.close()
-def main(arguments=None):
- args = parse_arguments(arguments)
+ def restart(self):
+ self.destroy()
+ self.__init__(self.filename)
- if args.progress:
- writer = CollectionWriterWithProgress()
- writer.display_type = 'human'
- elif args.batch_progress:
- writer = CollectionWriterWithProgress()
- writer.display_type = 'machine'
- else:
- writer = arvados.CollectionWriter()
+class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
+ def __init__(self, cache=None, reporter=None, bytes_expected=None):
+ self.__init_locals__(cache, reporter, bytes_expected)
+ super(ArvPutCollectionWriter, self).__init__()
+
+ def __init_locals__(self, cache, reporter, bytes_expected):
+ self.cache = cache
+ self.report_func = reporter
+ self.bytes_written = 0
+ self.bytes_expected = bytes_expected
+
+ @classmethod
+ def from_cache(cls, cache, reporter=None, bytes_expected=None):
+ try:
+ state = cache.load()
+ state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
+ writer = cls.from_state(state)
+ except (TypeError, ValueError,
+ arvados.errors.StaleWriterStateError) as error:
+ return cls(cache, reporter, bytes_expected)
+ else:
+ writer.__init_locals__(cache, reporter, bytes_expected)
+ return writer
+
+ def checkpoint_state(self):
+ if self.cache is None:
+ return
+ state = self.dump_state()
+ # Transform attributes for serialization.
+ for attr, value in state.items():
+ if attr == '_data_buffer':
+ state[attr] = base64.encodestring(''.join(value))
+ elif hasattr(value, 'popleft'):
+ state[attr] = list(value)
+ self.cache.save(state)
+
+ def flush_data(self):
+ bytes_buffered = self._data_buffer_len
+ super(ArvPutCollectionWriter, self).flush_data()
+ self.bytes_written += (bytes_buffered - self._data_buffer_len)
+ if self.report_func is not None:
+ self.report_func(self.bytes_written, self.bytes_expected)
+
+
+def expected_bytes_for(pathlist):
# Walk the given directory trees and stat files, adding up file sizes,
# so we can display progress as percent
- writer.bytes_expected = 0
- for path in args.paths:
+ bytesum = 0
+ for path in pathlist:
if os.path.isdir(path):
for filename in arvados.util.listdir_recursive(path):
- writer.bytes_expected += os.path.getsize(
- os.path.join(path, filename))
+ bytesum += os.path.getsize(os.path.join(path, filename))
elif not os.path.isfile(path):
- del writer.bytes_expected
- break
+ return None
else:
- writer.bytes_expected += os.path.getsize(path)
+ bytesum += os.path.getsize(path)
+ return bytesum
+
+_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
+ os.getpid())
+def machine_progress(bytes_written, bytes_expected):
+ return _machine_format.format(
+ bytes_written, -1 if (bytes_expected is None) else bytes_expected)
+
+def human_progress(bytes_written, bytes_expected):
+ if bytes_expected:
+ return "\r{}M / {}M {:.1f}% ".format(
+ bytes_written >> 20, bytes_expected >> 20,
+ bytes_written / bytes_expected)
+ else:
+ return "\r{} ".format(bytes_written)
+
+def progress_writer(progress_func, outfile=sys.stderr):
+ def write_progress(bytes_written, bytes_expected):
+ outfile.write(progress_func(bytes_written, bytes_expected))
+ return write_progress
+
+def main(arguments=None):
+ ResumeCache.setup_user_cache()
+ args = parse_arguments(arguments)
+
+ if args.progress:
+ reporter = progress_writer(human_progress)
+ elif args.batch_progress:
+ reporter = progress_writer(machine_progress)
+ else:
+ reporter = None
+
+ try:
+ resume_cache = ResumeCache(args)
+ if not args.resume:
+ resume_cache.restart()
+ except ResumeCacheConflict:
+ print "arv-put: Another process is already uploading this data."
+ sys.exit(1)
+
+ writer = ArvPutCollectionWriter.from_cache(
+ resume_cache, reporter, expected_bytes_for(args.paths))
# Copy file data to Keep.
for path in args.paths:
# Print the locator (uuid) of the new collection.
print writer.finish()
+ resume_cache.destroy()
if __name__ == '__main__':
main()