-#!/usr/bin/env python
-
-# TODO:
-# --md5sum - display md5 of each file as read from disk
-
+from __future__ import division
+from future.utils import listitems, listvalues
+from builtins import str
+from builtins import object
import argparse
import arvados
import arvados.collection
if len(args.paths) == 0:
args.paths = ['-']
- args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
+ args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
if args.filename:
@classmethod
def make_path(cls, args):
md5 = hashlib.md5()
- md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
realpaths = sorted(os.path.realpath(path) for path in args.paths)
- md5.update('\0'.join(realpaths))
+ md5.update(b'\0'.join([p.encode() for p in realpaths]))
if any(os.path.isdir(path) for path in realpaths):
- md5.update("-1")
+ md5.update(b'-1')
elif args.filename:
- md5.update(args.filename)
+ md5.update(args.filename.encode())
return os.path.join(
arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
md5.hexdigest())
# we have a custom signal handler in place that raises SystemExit with
# the catched signal's code.
if not isinstance(e, SystemExit) or e.code != -2:
- self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
+ self.logger.warning("Abnormal termination:\n{}".format(
+ traceback.format_exc()))
raise
finally:
if not self.dry_run:
Recursively get the total size of the collection
"""
size = 0
- for item in collection.values():
+ for item in listvalues(collection):
if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
size += self._collection_size(item)
else:
with self._state_lock:
self._state['manifest'] = manifest
if self.use_cache:
- self._save_state()
+ try:
+ self._save_state()
+ except Exception as e:
+ self.logger.error("Unexpected error trying to save cache file: {}".format(e))
else:
self.bytes_written = self.bytes_skipped
# Call the reporter, if any
self.reporter(self.bytes_written, self.bytes_expected)
def _write_stdin(self, filename):
- output = self._local_collection.open(filename, 'w')
+ output = self._local_collection.open(filename, 'wb')
self._write(sys.stdin, output)
output.close()
def _upload_files(self):
for source, resume_offset, filename in self._files_to_upload:
- with open(source, 'r') as source_fd:
+ with open(source, 'rb') as source_fd:
with self._state_lock:
self._state['files'][source]['mtime'] = os.path.getmtime(source)
self._state['files'][source]['size'] = os.path.getsize(source)
if resume_offset > 0:
# Start upload where we left off
- output = self._local_collection.open(filename, 'a')
+ output = self._local_collection.open(filename, 'ab')
source_fd.seek(resume_offset)
else:
# Start from scratch
- output = self._local_collection.open(filename, 'w')
+ output = self._local_collection.open(filename, 'wb')
self._write(source_fd, output)
output.close(flush=False)
if self.use_cache:
# Set up cache file name from input paths.
md5 = hashlib.md5()
- md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
realpaths = sorted(os.path.realpath(path) for path in self.paths)
- md5.update('\0'.join(realpaths))
+ md5.update(b'\0'.join([p.encode() for p in realpaths]))
if self.filename:
- md5.update(self.filename)
+ md5.update(self.filename.encode())
cache_filename = md5.hexdigest()
cache_filepath = os.path.join(
arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
cache_filename)
- if self.resume:
+ if self.resume and os.path.exists(cache_filepath):
+ self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'a+')
else:
# --no-resume means start with a empty cache file.
+ self.logger.info("Creating new cache file at {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'w+')
self._cache_filename = self._cache_file.name
self._lock_file(self._cache_file)
# Cache file empty, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
else:
+ self.logger.info("No cache usage requested for this run.")
# No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
# Load the previous manifest so we can check if files were modified remotely.
def collection_file_paths(self, col, path_prefix='.'):
"""Return a list of file paths by recursively go through the entire collection `col`"""
file_paths = []
- for name, item in col.items():
+ for name, item in listitems(col):
if isinstance(item, arvados.arvfile.ArvadosFile):
file_paths.append(os.path.join(path_prefix, name))
elif isinstance(item, arvados.collection.Subcollection):
"""
Atomically save current state into cache.
"""
+ with self._state_lock:
+ # We're not using copy.deepcopy() here because it's a lot slower
+ # than json.dumps(), and we're already needing JSON format to be
+ # saved on disk.
+ state = json.dumps(self._state)
try:
- with self._state_lock:
- # We're not using copy.deepcopy() here because it's a lot slower
- # than json.dumps(), and we're already needing JSON format to be
- # saved on disk.
- state = json.dumps(self._state)
- new_cache_fd, new_cache_name = tempfile.mkstemp(
- dir=os.path.dirname(self._cache_filename))
- self._lock_file(new_cache_fd)
- new_cache = os.fdopen(new_cache_fd, 'r+')
+ new_cache = tempfile.NamedTemporaryFile(
+ mode='w+',
+ dir=os.path.dirname(self._cache_filename), delete=False)
+ self._lock_file(new_cache)
new_cache.write(state)
new_cache.flush()
os.fsync(new_cache)
- os.rename(new_cache_name, self._cache_filename)
+ os.rename(new_cache.name, self._cache_filename)
except (IOError, OSError, ResumeCacheConflict) as error:
self.logger.error("There was a problem while saving the cache file: {}".format(error))
try:
def portable_data_hash(self):
pdh = self._my_collection().portable_data_hash()
- m = self._my_collection().stripped_manifest()
- local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
+ m = self._my_collection().stripped_manifest().encode()
+ local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
if pdh != local_pdh:
logger.warning("\n".join([
"arv-put: API server provided PDH differs from local manifest.",
locators.append(loc)
return locators
elif isinstance(item, arvados.collection.Collection):
- l = [self._datablocks_on_item(x) for x in item.values()]
+ l = [self._datablocks_on_item(x) for x in listvalues(item)]
# Fast list flattener method taken from:
# http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
return [loc for sublist in l for loc in sublist]
if not output.endswith('\n'):
stdout.write('\n')
- for sigcode, orig_handler in orig_signal_handlers.items():
+ for sigcode, orig_handler in listitems(orig_signal_handlers):
signal.signal(sigcode, orig_handler)
if status != 0: