import hashlib
import json
import os
+import signal
import sys
import tempfile
+CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
+
def parse_arguments(arguments):
parser = argparse.ArgumentParser(
description='Copy data from the local filesystem to Keep.')
total data size).
""")
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument('--resume', action='store_true', default=True,
+ help="""
+ Continue interrupted uploads from cached state (default).
+ """)
+ group.add_argument('--no-resume', action='store_false', dest='resume',
+ help="""
+ Do not continue interrupted uploads from cached state.
+ """)
+
args = parser.parse_args(arguments)
if len(args.paths) == 0:
class ResumeCache(object):
CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
- def __init__(self, file_spec):
+ @classmethod
+ def setup_user_cache(cls):
try:
- self.cache_file = open(file_spec, 'a+')
- except TypeError:
- file_spec = self.make_path(file_spec)
- self.cache_file = open(file_spec, 'a+')
+ os.makedirs(cls.CACHE_DIR)
+ except OSError as error:
+ if error.errno != errno.EEXIST:
+ raise
+ else:
+ os.chmod(cls.CACHE_DIR, 0o700)
+
+ def __init__(self, file_spec):
+ self.cache_file = open(file_spec, 'a+')
self._lock_file(self.cache_file)
self.filename = self.cache_file.name
md5 = hashlib.md5()
md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
realpaths = sorted(os.path.realpath(path) for path in args.paths)
- md5.update(''.join(realpaths))
+ md5.update('\0'.join(realpaths))
if any(os.path.isdir(path) for path in realpaths):
md5.update(str(max(args.max_manifest_depth, -1)))
elif args.filename:
class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
- def __init__(self, cache=None, reporter=None, bytes_expected=None):
- self.__init_locals__(cache, reporter, bytes_expected)
- super(ArvPutCollectionWriter, self).__init__()
+ STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
+ ['bytes_written', '_seen_inputs'])
- def __init_locals__(self, cache, reporter, bytes_expected):
- self.cache = cache
- self.report_func = reporter
+ def __init__(self, cache=None, reporter=None, bytes_expected=None):
self.bytes_written = 0
+ self._seen_inputs = []
+ self.cache = cache
+ self.reporter = reporter
self.bytes_expected = bytes_expected
+ super(ArvPutCollectionWriter, self).__init__()
@classmethod
def from_cache(cls, cache, reporter=None, bytes_expected=None):
try:
state = cache.load()
state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
- writer = cls.from_state(state)
+ writer = cls.from_state(state, cache, reporter, bytes_expected)
except (TypeError, ValueError,
arvados.errors.StaleWriterStateError) as error:
return cls(cache, reporter, bytes_expected)
else:
- writer.__init_locals__(cache, reporter, bytes_expected)
return writer
- def checkpoint_state(self):
+ def cache_state(self):
if self.cache is None:
return
state = self.dump_state()
state[attr] = list(value)
self.cache.save(state)
+ def report_progress(self):
+ if self.reporter is not None:
+ self.reporter(self.bytes_written, self.bytes_expected)
+
def flush_data(self):
- bytes_buffered = self._data_buffer_len
+ start_buffer_len = self._data_buffer_len
+ start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
super(ArvPutCollectionWriter, self).flush_data()
- self.bytes_written += (bytes_buffered - self._data_buffer_len)
- if self.report_func is not None:
- self.report_func(self.bytes_written, self.bytes_expected)
+ if self._data_buffer_len < start_buffer_len: # We actually PUT data.
+ self.bytes_written += (start_buffer_len - self._data_buffer_len)
+ self.report_progress()
+ if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+ self.cache_state()
+
+ def _record_new_input(self, input_type, source_name, dest_name):
+ # The key needs to be a list because that's what we'll get back
+ # from JSON deserialization.
+ key = [input_type, source_name, dest_name]
+ if key in self._seen_inputs:
+ return False
+ self._seen_inputs.append(key)
+ return True
+
+ def write_file(self, source, filename=None):
+ if self._record_new_input('file', source, filename):
+ super(ArvPutCollectionWriter, self).write_file(source, filename)
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ if self._record_new_input('directory', path, stream_name):
+ super(ArvPutCollectionWriter, self).write_directory_tree(
+ path, stream_name, max_manifest_depth)
def expected_bytes_for(pathlist):
def human_progress(bytes_written, bytes_expected):
if bytes_expected:
- return "\r{}M / {}M {:.1f}% ".format(
+ return "\r{}M / {}M {:.1%} ".format(
bytes_written >> 20, bytes_expected >> 20,
- bytes_written / bytes_expected)
+ float(bytes_written) / bytes_expected)
else:
return "\r{} ".format(bytes_written)
outfile.write(progress_func(bytes_written, bytes_expected))
return write_progress
+def exit_signal_handler(sigcode, frame):
+ sys.exit(-sigcode)
+
def main(arguments=None):
args = parse_arguments(arguments)
reporter = progress_writer(machine_progress)
else:
reporter = None
-
- writer = ArvPutCollectionWriter(
- reporter=reporter, bytes_expected=expected_bytes_for(args.paths))
-
- # Copy file data to Keep.
- for path in args.paths:
+ bytes_expected = expected_bytes_for(args.paths)
+
+ try:
+ ResumeCache.setup_user_cache()
+ resume_cache = ResumeCache(ResumeCache.make_path(args))
+ except (IOError, OSError):
+ # Couldn't open cache directory/file. Continue without it.
+ resume_cache = None
+ writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
+ except ResumeCacheConflict:
+ print "arv-put: Another process is already uploading this data."
+ sys.exit(1)
+ else:
+ if not args.resume:
+ resume_cache.restart()
+ writer = ArvPutCollectionWriter.from_cache(
+ resume_cache, reporter, bytes_expected)
+
+ # Install our signal handler for each code in CAUGHT_SIGNALS, and save
+ # the originals.
+ orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
+ for sigcode in CAUGHT_SIGNALS}
+
+ if writer.bytes_written > 0: # We're resuming a previous upload.
+ print >>sys.stderr, "\n".join([
+ "arv-put: Resuming previous upload from last checkpoint.",
+ " Use the --no-resume option to start over."])
+ writer.report_progress()
+
+ writer.do_queued_work() # Do work resumed from cache.
+ for path in args.paths: # Copy file data to Keep.
if os.path.isdir(path):
writer.write_directory_tree(
path, max_manifest_depth=args.max_manifest_depth)
else:
writer.start_new_stream()
writer.write_file(path, args.filename or os.path.basename(path))
+ writer.finish_current_stream()
+
+ if args.progress: # Print newline to split stderr from stdout for humans.
+ print >>sys.stderr
if args.stream:
print writer.manifest_text(),
elif args.raw:
- writer.finish_current_stream()
print ','.join(writer.data_locators())
else:
# Register the resulting collection in Arvados.
- arvados.api().collections().create(
+ collection = arvados.api().collections().create(
body={
'uuid': writer.finish(),
'manifest_text': writer.manifest_text(),
).execute()
# Print the locator (uuid) of the new collection.
- print writer.finish()
+ print collection['uuid']
+
+ for sigcode, orig_handler in orig_signal_handlers.items():
+ signal.signal(sigcode, orig_handler)
+
+ if resume_cache is not None:
+ resume_cache.destroy()
if __name__ == '__main__':
main()