import sys
import tempfile
import threading
+import copy
+import logging
from apiclient import errors as apiclient_errors
import arvados.commands._util as arv_cmd
pass
-class FileUploadError(Exception):
- pass
-
-
class ResumeCache(object):
CACHE_DIR = '.cache/arvados/arv-put'
class ArvPutUploadJob(object):
+ CACHE_DIR = '.cache/arvados/arv-put'
+ EMPTY_STATE = {
+ 'manifest' : None, # Last saved manifest checkpoint
+ 'files' : {} # Previous run file list: {path : {size, mtime}}
+ }
+
def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
- name=None, owner_uuid=None, ensure_unique_name=False,
- num_retries=None, write_copies=None, replication=None,
- filename=None):
+ name=None, owner_uuid=None, ensure_unique_name=False,
+ num_retries=None, replication_desired=None,
+ filename=None, update_time=60.0):
self.paths = paths
self.resume = resume
self.reporter = reporter
self.bytes_expected = bytes_expected
self.bytes_written = 0
+ self.bytes_skipped = 0
self.name = name
self.owner_uuid = owner_uuid
self.ensure_unique_name = ensure_unique_name
self.num_retries = num_retries
- self.write_copies = write_copies
- self.replication = replication
+ self.replication_desired = replication_desired
self.filename = filename
self._state_lock = threading.Lock()
self._state = None # Previous run state (file list & manifest)
self._current_files = [] # Current run file list
- self._cache_hash = None # MD5 digest based on paths & filename
self._cache_file = None
self._collection = None
self._collection_lock = threading.Lock()
self._stop_checkpointer = threading.Event()
self._checkpointer = threading.Thread(target=self._update_task)
- self._update_task_time = 60.0 # How many seconds wait between update runs
+ self._update_task_time = update_time # How many seconds wait between update runs
+ self.logger = logging.getLogger('arvados.arv_put')
# Load cached data if any and if needed
self._setup_state()
"""
Start supporting thread & file uploading
"""
+ self._checkpointer.daemon = True
self._checkpointer.start()
try:
for path in self.paths:
self._write_stdin(self.filename or 'stdin')
elif os.path.isdir(path):
self._write_directory_tree(path)
- else: #if os.path.isfile(path):
+ else:
self._write_file(path, self.filename or os.path.basename(path))
- # else:
- # raise FileUploadError('Inadequate file type, cannot upload: %s' % path)
finally:
# Stop the thread before doing anything else
self._stop_checkpointer.set()
self._checkpointer.join()
- # Successful upload, one last _update()
- self._update()
+ # Commit all & one last _update()
+ self.manifest_text()
+ self._update()
+ if self.resume:
+ self._cache_file.close()
+ # Correct the final written bytes count
+ self.bytes_written -= self.bytes_skipped
def save_collection(self):
with self._collection_lock:
self._my_collection().save_new(
- name=self.name, owner_uuid=self.owner_uuid,
- ensure_unique_name=self.ensure_unique_name,
- num_retries=self.num_retries,
- replication_desired=self.replication)
+ name=self.name, owner_uuid=self.owner_uuid,
+ ensure_unique_name=self.ensure_unique_name,
+ num_retries=self.num_retries)
+
+ def destroy_cache(self):
if self.resume:
- # Delete cache file upon successful collection saving
try:
- os.unlink(self._cache_file.name)
+ os.unlink(self._cache_filename)
except OSError as error:
- if error.errno != errno.ENOENT: # That's what we wanted anyway.
+ # That's what we wanted anyway.
+ if error.errno != errno.ENOENT:
raise
self._cache_file.close()
"""
size = 0
for item in collection.values():
- if isinstance(item, arvados.collection.Collection):
+ if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
size += self._collection_size(item)
else:
size += item.size()
# Update cache, if resume enabled
if self.resume:
with self._state_lock:
- self._state['manifest'] = self._my_collection().manifest_text()
+ # Get the manifest text without comitting pending blocks
+ self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
if self.resume:
self._save_state()
# Call the reporter, if any
if os.path.isdir(os.path.join(path, item)):
self._write_directory_tree(os.path.join(path, item),
os.path.join(stream_name, item))
- elif os.path.isfile(os.path.join(path, item)):
+ else:
self._write_file(os.path.join(path, item),
os.path.join(stream_name, item))
- else:
- raise FileUploadError('Inadequate file type, cannot upload: %s' % path)
def _write_stdin(self, filename):
with self._collection_lock:
def _write_file(self, source, filename):
resume_offset = 0
- resume_upload = False
if self.resume:
# Check if file was already uploaded (at least partially)
with self._collection_lock:
file_in_collection = None
# If no previous cached data on this file, store it for an eventual
# repeated run.
- if source not in self._state['files'].keys():
+ if source not in self._state['files']:
with self._state_lock:
self._state['files'][source] = {
- 'mtime' : os.path.getmtime(source),
+ 'mtime': os.path.getmtime(source),
'size' : os.path.getsize(source)
}
- cached_file_data = self._state['files'][source]
+ with self._state_lock:
+ cached_file_data = self._state['files'][source]
# See if this file was already uploaded at least partially
if file_in_collection:
if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
- if os.path.getsize(source) == file_in_collection.size():
+ if cached_file_data['size'] == file_in_collection.size():
# File already there, skip it.
+ self.bytes_skipped += cached_file_data['size']
return
- elif os.path.getsize(source) > file_in_collection.size():
+ elif cached_file_data['size'] > file_in_collection.size():
# File partially uploaded, resume!
- resume_upload = True
resume_offset = file_in_collection.size()
else:
# Inconsistent cache, re-upload the file
- pass
+ self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
else:
# Local file differs from cached data, re-upload it
pass
with open(source, 'r') as source_fd:
- if self.resume and resume_upload:
+ if resume_offset > 0:
+ # Start upload where we left off
with self._collection_lock:
- # Open for appending
output = self._my_collection().open(filename, 'a')
source_fd.seek(resume_offset)
+ self.bytes_skipped += resume_offset
else:
+ # Start from scratch
with self._collection_lock:
output = self._my_collection().open(filename, 'w')
self._write(source_fd, output)
output.close()
def _write(self, source_fd, output):
+ first_read = True
while True:
data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
- if not data:
+ # Allow an empty file to be written
+ if not data and not first_read:
break
+ if first_read:
+ first_read = False
output.write(data)
def _my_collection(self):
if self.resume and manifest is not None:
# Create collection from saved state
self._collection = arvados.collection.Collection(
- manifest,
- num_write_copies=self.write_copies)
+ manifest,
+ replication_desired=self.replication_desired)
else:
# Create new collection
self._collection = arvados.collection.Collection(
- num_write_copies=self.write_copies)
+ replication_desired=self.replication_desired)
return self._collection
def _setup_state(self):
md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
realpaths = sorted(os.path.realpath(path) for path in self.paths)
md5.update('\0'.join(realpaths))
- self._cache_hash = md5.hexdigest()
if self.filename:
md5.update(self.filename)
+ cache_filename = md5.hexdigest()
self._cache_file = open(os.path.join(
- arv_cmd.make_home_conf_dir('.cache/arvados/arv-put', 0o700, 'raise'),
- self._cache_hash), 'a+')
+ arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+ cache_filename), 'a+')
+ self._cache_filename = self._cache_file.name
+ self._lock_file(self._cache_file)
self._cache_file.seek(0)
with self._state_lock:
try:
self._state = json.load(self._cache_file)
- if not 'manifest' in self._state.keys():
- self._state['manifest'] = ""
- if not 'files' in self._state.keys():
- self._state['files'] = {}
+ if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+ # Cache at least partially incomplete, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
except ValueError:
- # File empty, set up new cache
- self._state = {
- 'manifest' : None,
- # Previous run file list: {path : {size, mtime}}
- 'files' : {}
- }
+ # Cache file empty, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
# Load how many bytes were uploaded on previous run
with self._collection_lock:
self.bytes_written = self._collection_size(self._my_collection())
# No resume required
else:
with self._state_lock:
- self._state = {
- 'manifest' : None,
- 'files' : {} # Previous run file list: {path : {size, mtime}}
- }
+ self._state = copy.deepcopy(self.EMPTY_STATE)
def _lock_file(self, fileobj):
try:
with self._state_lock:
state = self._state
new_cache_fd, new_cache_name = tempfile.mkstemp(
- dir=os.path.dirname(self._cache_file.name))
+ dir=os.path.dirname(self._cache_filename))
self._lock_file(new_cache_fd)
new_cache = os.fdopen(new_cache_fd, 'r+')
json.dump(state, new_cache)
- # new_cache.flush()
- # os.fsync(new_cache)
- os.rename(new_cache_name, self._cache_file.name)
+ new_cache.flush()
+ os.fsync(new_cache)
+ os.rename(new_cache_name, self._cache_filename)
except (IOError, OSError, ResumeCacheConflict) as error:
+ self.logger.error("There was a problem while saving the cache file: {}".format(error))
try:
os.unlink(new_cache_name)
except NameError: # mkstemp failed.
through subcollections
"""
if isinstance(item, arvados.arvfile.ArvadosFile):
- locators = []
- for segment in item.segments():
- loc = segment.locator
- locators.append(loc)
- return locators
+ if item.size() == 0:
+ # Empty file locator
+ return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+ else:
+ locators = []
+ for segment in item.segments():
+ loc = segment.locator
+ locators.append(loc)
+ return locators
elif isinstance(item, arvados.collection.Collection):
l = [self._datablocks_on_item(x) for x in item.values()]
# Fast list flattener method taken from:
return datablocks
-class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
- STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
- ['bytes_written', '_seen_inputs'])
-
- def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
- self.bytes_written = 0
- self._seen_inputs = []
- self.cache = cache
- self.reporter = reporter
- self.bytes_expected = bytes_expected
- super(ArvPutCollectionWriter, self).__init__(**kwargs)
-
- @classmethod
- def from_cache(cls, cache, reporter=None, bytes_expected=None,
- num_retries=0, replication=0):
- try:
- state = cache.load()
- state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
- writer = cls.from_state(state, cache, reporter, bytes_expected,
- num_retries=num_retries,
- replication=replication)
- except (TypeError, ValueError,
- arvados.errors.StaleWriterStateError) as error:
- return cls(cache, reporter, bytes_expected,
- num_retries=num_retries,
- replication=replication)
- else:
- return writer
-
- def cache_state(self):
- if self.cache is None:
- return
- state = self.dump_state()
- # Transform attributes for serialization.
- for attr, value in state.items():
- if attr == '_data_buffer':
- state[attr] = base64.encodestring(''.join(value))
- elif hasattr(value, 'popleft'):
- state[attr] = list(value)
- self.cache.save(state)
-
- def report_progress(self):
- if self.reporter is not None:
- self.reporter(self.bytes_written, self.bytes_expected)
-
- def flush_data(self):
- start_buffer_len = self._data_buffer_len
- start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
- super(ArvPutCollectionWriter, self).flush_data()
- if self._data_buffer_len < start_buffer_len: # We actually PUT data.
- self.bytes_written += (start_buffer_len - self._data_buffer_len)
- self.report_progress()
- if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
- self.cache_state()
-
- def _record_new_input(self, input_type, source_name, dest_name):
- # The key needs to be a list because that's what we'll get back
- # from JSON deserialization.
- key = [input_type, source_name, dest_name]
- if key in self._seen_inputs:
- return False
- self._seen_inputs.append(key)
- return True
-
- def write_file(self, source, filename=None):
- if self._record_new_input('file', source, filename):
- super(ArvPutCollectionWriter, self).write_file(source, filename)
-
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- if self._record_new_input('directory', path, stream_name):
- super(ArvPutCollectionWriter, self).write_directory_tree(
- path, stream_name, max_manifest_depth)
-
-
def expected_bytes_for(pathlist):
# Walk the given directory trees and stat files, adding up file sizes,
# so we can display progress as percent
print >>stderr, error
sys.exit(1)
- # write_copies diverges from args.replication here.
- # args.replication is how many copies we will instruct Arvados to
- # maintain (by passing it in collections().create()) after all
- # data is written -- and if None was given, we'll use None there.
- # Meanwhile, write_copies is how many copies of each data block we
- # write to Keep, which has to be a number.
- #
- # If we simply changed args.replication from None to a default
- # here, we'd end up erroneously passing the default replication
- # level (instead of None) to collections().create().
- write_copies = (args.replication or
- api_client._rootDesc.get('defaultCollectionReplication', 2))
-
if args.progress:
reporter = progress_writer(human_progress)
elif args.batch_progress:
bytes_expected = expected_bytes_for(args.paths)
try:
writer = ArvPutUploadJob(paths = args.paths,
- resume = args.resume,
- reporter = reporter,
- bytes_expected = bytes_expected,
- num_retries = args.retries,
- write_copies = write_copies,
- replication = args.replication,
- name = collection_name,
- owner_uuid = project_uuid,
- ensure_unique_name = True)
+ resume = args.resume,
+ filename = args.filename,
+ reporter = reporter,
+ bytes_expected = bytes_expected,
+ num_retries = args.retries,
+ replication_desired = args.replication,
+ name = collection_name,
+ owner_uuid = project_uuid,
+ ensure_unique_name = True)
except ResumeCacheConflict:
print >>stderr, "\n".join([
"arv-put: Another process is already uploading this data.",
if status != 0:
sys.exit(status)
+ # Success!
+ writer.destroy_cache()
return output