import arvados
import arvados.collection
import base64
+import copy
import datetime
import errno
import fcntl
import hashlib
import json
+import logging
import os
import pwd
-import time
+import re
import signal
import socket
import sys
import tempfile
import threading
-import copy
-import logging
+import time
from apiclient import errors as apiclient_errors
import arvados.commands._util as arv_cmd
manifest.
""")
+upload_opts.add_argument('--update-collection', type=str, default=None,
+ dest='update_collection', metavar="UUID", help="""
+Update an existing collection identified by the given Arvados collection
+UUID. All new local files will be uploaded.
+""")
+
upload_opts.add_argument('--use-filename', type=str, default=None,
dest='filename', help="""
Synonym for --filename.
args.progress = True
if args.paths == ['-']:
+ if args.update_collection:
+ arg_parser.error("""
+ --update-collection cannot be used when reading from stdin.
+ """)
args.resume = False
if not args.filename:
args.filename = 'stdin'
return args
+
+class CollectionUpdateError(Exception):
+ pass
+
+
class ResumeCacheConflict(Exception):
pass
def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
name=None, owner_uuid=None, ensure_unique_name=False,
num_retries=None, replication_desired=None,
- filename=None, update_time=60.0):
+ filename=None, update_time=1.0, update_collection=None):
self.paths = paths
self.resume = resume
+ self.update = False
self.reporter = reporter
self.bytes_expected = bytes_expected
self.bytes_written = 0
self._state = None # Previous run state (file list & manifest)
self._current_files = [] # Current run file list
self._cache_file = None
- self._collection = None
self._collection_lock = threading.Lock()
+ self._remote_collection = None # Collection being updated (if asked)
+ self._local_collection = None # Collection from previous run manifest
+ self._file_paths = [] # Files to be updated in remote collection
self._stop_checkpointer = threading.Event()
self._checkpointer = threading.Thread(target=self._update_task)
self._update_task_time = update_time # How many seconds wait between update runs
self.logger = logging.getLogger('arvados.arv_put')
+
# Load cached data if any and if needed
- self._setup_state()
+ self._setup_state(update_collection)
- def start(self):
+ def start(self, save_collection):
"""
Start supporting thread & file uploading
"""
if path == '-':
self._write_stdin(self.filename or 'stdin')
elif os.path.isdir(path):
- self._write_directory_tree(path)
+ if path == '.' or path == './' or os.path.dirname(path) == '':
+ dirname = ''
+ else:
+ dirname = os.path.dirname(path) + '/'
+ for root, dirs, files in os.walk(path):
+ # Make os.walk()'s dir traversing order deterministic
+ dirs.sort()
+ files.sort()
+ for f in files:
+ self._write_file(os.path.join(root, f),
+ os.path.join(root[len(dirname):], f))
else:
self._write_file(path, self.filename or os.path.basename(path))
finally:
self._checkpointer.join()
# Commit all & one last _update()
self.manifest_text()
+ if save_collection:
+ self.save_collection()
self._update()
- if self.resume:
- self._cache_file.close()
- # Correct the final written bytes count
- self.bytes_written -= self.bytes_skipped
+ self._cache_file.close()
+ # Correct the final written bytes count
+ self.bytes_written -= self.bytes_skipped
def save_collection(self):
- with self._collection_lock:
- self._my_collection().save_new(
+ if self.update:
+ # Check if files should be updated on the remote collection.
+ for fp in self._file_paths:
+ remote_file = self._remote_collection.find(fp)
+ if not remote_file:
+ # File don't exist on remote collection, copy it.
+ self._remote_collection.copy(fp, fp, self._local_collection)
+ elif remote_file != self._local_collection.find(fp):
+ # A different file exist on remote collection, overwrite it.
+ self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
+ else:
+ # The file already exist on remote collection, skip it.
+ pass
+ self._remote_collection.save(num_retries=self.num_retries)
+ else:
+ self._local_collection.save_new(
name=self.name, owner_uuid=self.owner_uuid,
ensure_unique_name=self.ensure_unique_name,
num_retries=self.num_retries)
Update cached manifest text and report progress.
"""
with self._collection_lock:
- self.bytes_written = self._collection_size(self._my_collection())
+ self.bytes_written = self._collection_size(self._local_collection)
# Update cache, if resume enabled
- if self.resume:
- with self._state_lock:
- # Get the manifest text without comitting pending blocks
- self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
- if self.resume:
+ with self._state_lock:
+ # Get the manifest text without comitting pending blocks
+ self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
self._save_state()
# Call the reporter, if any
self.report_progress()
if self.reporter is not None:
self.reporter(self.bytes_written, self.bytes_expected)
- def _write_directory_tree(self, path, stream_name="."):
- # TODO: Check what happens when multiple directories are passed as
- # arguments.
- # If the code below is uncommented, integration test
- # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
- # fails, I suppose it is because the manifest_uuid changes because
- # of the dir addition to stream_name.
-
- # if stream_name == '.':
- # stream_name = os.path.join('.', os.path.basename(path))
- for item in os.listdir(path):
- if os.path.isdir(os.path.join(path, item)):
- self._write_directory_tree(os.path.join(path, item),
- os.path.join(stream_name, item))
- else:
- self._write_file(os.path.join(path, item),
- os.path.join(stream_name, item))
-
def _write_stdin(self, filename):
- with self._collection_lock:
- output = self._my_collection().open(filename, 'w')
+ output = self._local_collection.open(filename, 'w')
self._write(sys.stdin, output)
output.close()
def _write_file(self, source, filename):
resume_offset = 0
- if self.resume:
- # Check if file was already uploaded (at least partially)
- with self._collection_lock:
- try:
- file_in_collection = self._my_collection().find(filename)
- except IOError:
- # Not found
- file_in_collection = None
+ should_upload = False
+ new_file_in_cache = False
+
+ # Record file path for updating the remote collection before exiting
+ self._file_paths.append(filename)
+
+ with self._state_lock:
# If no previous cached data on this file, store it for an eventual
# repeated run.
if source not in self._state['files']:
+ self._state['files'][source] = {
+ 'mtime': os.path.getmtime(source),
+ 'size' : os.path.getsize(source)
+ }
+ new_file_in_cache = True
+ cached_file_data = self._state['files'][source]
+
+ # Check if file was already uploaded (at least partially)
+ file_in_local_collection = self._local_collection.find(filename)
+
+ # If not resuming, upload the full file.
+ if not self.resume:
+ should_upload = True
+ # New file detected from last run, upload it.
+ elif new_file_in_cache:
+ should_upload = True
+ # Local file didn't change from last run.
+ elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+ if not file_in_local_collection:
+ # File not uploaded yet, upload it completely
+ should_upload = True
+ elif cached_file_data['size'] == file_in_local_collection.size():
+ # File already there, skip it.
+ self.bytes_skipped += cached_file_data['size']
+ elif cached_file_data['size'] > file_in_local_collection.size():
+ # File partially uploaded, resume!
+ resume_offset = file_in_local_collection.size()
+ should_upload = True
+ else:
+ # Inconsistent cache, re-upload the file
+ should_upload = True
+ self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ # Local file differs from cached data, re-upload it.
+ else:
+ should_upload = True
+
+ if should_upload:
+ with open(source, 'r') as source_fd:
with self._state_lock:
- self._state['files'][source] = {
- 'mtime': os.path.getmtime(source),
- 'size' : os.path.getsize(source)
- }
- with self._state_lock:
- cached_file_data = self._state['files'][source]
- # See if this file was already uploaded at least partially
- if file_in_collection:
- if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
- if cached_file_data['size'] == file_in_collection.size():
- # File already there, skip it.
- self.bytes_skipped += cached_file_data['size']
- return
- elif cached_file_data['size'] > file_in_collection.size():
- # File partially uploaded, resume!
- resume_offset = file_in_collection.size()
- else:
- # Inconsistent cache, re-upload the file
- self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ self._state['files'][source]['mtime'] = os.path.getmtime(source)
+ self._state['files'][source]['size'] = os.path.getsize(source)
+ if resume_offset > 0:
+ # Start upload where we left off
+ output = self._local_collection.open(filename, 'a')
+ source_fd.seek(resume_offset)
+ self.bytes_skipped += resume_offset
else:
- # Local file differs from cached data, re-upload it
- pass
- with open(source, 'r') as source_fd:
- if resume_offset > 0:
- # Start upload where we left off
- with self._collection_lock:
- output = self._my_collection().open(filename, 'a')
- source_fd.seek(resume_offset)
- self.bytes_skipped += resume_offset
- else:
- # Start from scratch
- with self._collection_lock:
- output = self._my_collection().open(filename, 'w')
- self._write(source_fd, output)
- output.close()
+ # Start from scratch
+ output = self._local_collection.open(filename, 'w')
+ self._write(source_fd, output)
+ output.close(flush=False)
def _write(self, source_fd, output):
first_read = True
output.write(data)
def _my_collection(self):
- """
- Create a new collection if none cached. Load it from cache otherwise.
- """
- if self._collection is None:
- with self._state_lock:
- manifest = self._state['manifest']
- if self.resume and manifest is not None:
- # Create collection from saved state
- self._collection = arvados.collection.Collection(
- manifest,
- replication_desired=self.replication_desired)
- else:
- # Create new collection
- self._collection = arvados.collection.Collection(
- replication_desired=self.replication_desired)
- return self._collection
+ return self._remote_collection if self.update else self._local_collection
- def _setup_state(self):
+ def _setup_state(self, update_collection):
"""
Create a new cache file or load a previously existing one.
"""
- if self.resume:
- md5 = hashlib.md5()
- md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
- realpaths = sorted(os.path.realpath(path) for path in self.paths)
- md5.update('\0'.join(realpaths))
- if self.filename:
- md5.update(self.filename)
- cache_filename = md5.hexdigest()
- self._cache_file = open(os.path.join(
- arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
- cache_filename), 'a+')
- self._cache_filename = self._cache_file.name
- self._lock_file(self._cache_file)
- self._cache_file.seek(0)
- with self._state_lock:
- try:
- self._state = json.load(self._cache_file)
- if not set(['manifest', 'files']).issubset(set(self._state.keys())):
- # Cache at least partially incomplete, set up new cache
- self._state = copy.deepcopy(self.EMPTY_STATE)
- except ValueError:
- # Cache file empty, set up new cache
+ # Load an already existing collection for update
+ if update_collection and re.match(arvados.util.collection_uuid_pattern,
+ update_collection):
+ try:
+ self._remote_collection = arvados.collection.Collection(update_collection)
+ except arvados.errors.ApiError as error:
+ raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
+ else:
+ self.update = True
+ elif update_collection:
+ # Collection locator provided, but unknown format
+ raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+
+ # Set up cache file name from input paths.
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ realpaths = sorted(os.path.realpath(path) for path in self.paths)
+ md5.update('\0'.join(realpaths))
+ if self.filename:
+ md5.update(self.filename)
+ cache_filename = md5.hexdigest()
+ self._cache_file = open(os.path.join(
+ arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+ cache_filename), 'a+')
+ self._cache_filename = self._cache_file.name
+ self._lock_file(self._cache_file)
+ self._cache_file.seek(0)
+ with self._state_lock:
+ try:
+ self._state = json.load(self._cache_file)
+ if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+ # Cache at least partially incomplete, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
- # Load how many bytes were uploaded on previous run
- with self._collection_lock:
- self.bytes_written = self._collection_size(self._my_collection())
- # No resume required
- else:
- with self._state_lock:
+ except ValueError:
+ # Cache file empty, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
+ # Load the previous manifest so we can check if files were modified remotely.
+ self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
+ # Load how many bytes were uploaded on previous run
+ with self._collection_lock:
+ self.bytes_written = self._collection_size(self._local_collection)
+
def _lock_file(self, fileobj):
try:
fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
self._cache_file = new_cache
def collection_name(self):
- with self._collection_lock:
- name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
- return name
+ return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
def manifest_locator(self):
- with self._collection_lock:
- locator = self._my_collection().manifest_locator()
- return locator
+ return self._my_collection().manifest_locator()
def portable_data_hash(self):
- with self._collection_lock:
- datahash = self._my_collection().portable_data_hash()
- return datahash
+ return self._my_collection().portable_data_hash()
def manifest_text(self, stream_name=".", strip=False, normalize=False):
- with self._collection_lock:
- manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
- return manifest
+ return self._my_collection().manifest_text(stream_name, strip, normalize)
def _datablocks_on_item(self, item):
"""
reporter = None
bytes_expected = expected_bytes_for(args.paths)
+
try:
writer = ArvPutUploadJob(paths = args.paths,
resume = args.resume,
replication_desired = args.replication,
name = collection_name,
owner_uuid = project_uuid,
- ensure_unique_name = True)
+ ensure_unique_name = True,
+ update_collection = args.update_collection)
except ResumeCacheConflict:
print >>stderr, "\n".join([
"arv-put: Another process is already uploading this data.",
" Use --no-resume if this is really what you want."])
sys.exit(1)
+ except CollectionUpdateError as error:
+ print >>stderr, "\n".join([
+ "arv-put: %s" % str(error)])
+ sys.exit(1)
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
for sigcode in CAUGHT_SIGNALS}
- if args.resume and writer.bytes_written > 0:
+ if not args.update_collection and args.resume and writer.bytes_written > 0:
print >>stderr, "\n".join([
"arv-put: Resuming previous upload from last checkpoint.",
" Use the --no-resume option to start over."])
writer.report_progress()
output = None
- writer.start()
+ try:
+ writer.start(save_collection=not(args.stream or args.raw))
+ except arvados.errors.ApiError as error:
+ print >>stderr, "\n".join([
+ "arv-put: %s" % str(error)])
+ sys.exit(1)
+
if args.progress: # Print newline to split stderr from stdout for humans.
print >>stderr
output = ','.join(writer.data_locators())
else:
try:
- writer.save_collection()
- print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+ if args.update_collection:
+ print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
+ else:
+ print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
if args.portable_data_hash:
output = writer.portable_data_hash()
else:
sys.exit(status)
# Success!
- writer.destroy_cache()
return output