10315: Bring back arv-put command from 9701's branch.
[arvados.git] / sdk / python / arvados / commands / put.py
old mode 100755 (executable)
new mode 100644 (file)
index d2c5214..89753a2
@@ -20,6 +20,8 @@ import socket
 import sys
 import tempfile
 import threading
+import copy
+import logging
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -196,10 +198,6 @@ class ResumeCacheConflict(Exception):
     pass
 
 
-class FileUploadError(Exception):
-    pass
-
-
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
@@ -283,32 +281,38 @@ class ResumeCache(object):
 
 
 class ArvPutUploadJob(object):
+    CACHE_DIR = '.cache/arvados/arv-put'
+    EMPTY_STATE = {
+        'manifest' : None, # Last saved manifest checkpoint
+        'files' : {} # Previous run file list: {path : {size, mtime}}
+    }
+
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                name=None, owner_uuid=None, ensure_unique_name=False,
-                num_retries=None, write_copies=None, replication=None,
-                filename=None):
+                 name=None, owner_uuid=None, ensure_unique_name=False,
+                 num_retries=None, replication_desired=None,
+                 filename=None, update_time=60.0):
         self.paths = paths
         self.resume = resume
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
+        self.bytes_skipped = 0
         self.name = name
         self.owner_uuid = owner_uuid
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
-        self.write_copies = write_copies
-        self.replication = replication
+        self.replication_desired = replication_desired
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
-        self._cache_hash = None # MD5 digest based on paths & filename
         self._cache_file = None
         self._collection = None
         self._collection_lock = threading.Lock()
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
-        self._update_task_time = 60.0 # How many seconds wait between update runs
+        self._update_task_time = update_time  # How many seconds wait between update runs
+        self.logger = logging.getLogger('arvados.arv_put')
         # Load cached data if any and if needed
         self._setup_state()
 
@@ -316,6 +320,7 @@ class ArvPutUploadJob(object):
         """
         Start supporting thread & file uploading
         """
+        self._checkpointer.daemon = True
         self._checkpointer.start()
         try:
             for path in self.paths:
@@ -324,30 +329,34 @@ class ArvPutUploadJob(object):
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
                     self._write_directory_tree(path)
-                else: #if os.path.isfile(path):
+                else:
                     self._write_file(path, self.filename or os.path.basename(path))
-                # else:
-                #     raise FileUploadError('Inadequate file type, cannot upload: %s' % path)
         finally:
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
             self._checkpointer.join()
-        # Successful upload, one last _update()
-        self._update()
+            # Commit all & one last _update()
+            self.manifest_text()
+            self._update()
+            if self.resume:
+                self._cache_file.close()
+                # Correct the final written bytes count
+                self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
         with self._collection_lock:
             self._my_collection().save_new(
-                                name=self.name, owner_uuid=self.owner_uuid,
-                                ensure_unique_name=self.ensure_unique_name,
-                                num_retries=self.num_retries,
-                                replication_desired=self.replication)
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
+
+    def destroy_cache(self):
         if self.resume:
-            # Delete cache file upon successful collection saving
             try:
-                os.unlink(self._cache_file.name)
+                os.unlink(self._cache_filename)
             except OSError as error:
-                if error.errno != errno.ENOENT:  # That's what we wanted anyway.
+                # That's what we wanted anyway.
+                if error.errno != errno.ENOENT:
                     raise
             self._cache_file.close()
 
@@ -357,7 +366,7 @@ class ArvPutUploadJob(object):
         """
         size = 0
         for item in collection.values():
-            if isinstance(item, arvados.collection.Collection):
+            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                 size += self._collection_size(item)
             else:
                 size += item.size()
@@ -380,7 +389,8 @@ class ArvPutUploadJob(object):
             # Update cache, if resume enabled
             if self.resume:
                 with self._state_lock:
-                    self._state['manifest'] = self._my_collection().manifest_text()
+                    # Get the manifest text without comitting pending blocks
+                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
         if self.resume:
             self._save_state()
         # Call the reporter, if any
@@ -404,11 +414,9 @@ class ArvPutUploadJob(object):
             if os.path.isdir(os.path.join(path, item)):
                 self._write_directory_tree(os.path.join(path, item),
                                 os.path.join(stream_name, item))
-            elif os.path.isfile(os.path.join(path, item)):
+            else:
                 self._write_file(os.path.join(path, item),
                                 os.path.join(stream_name, item))
-            else:
-                raise FileUploadError('Inadequate file type, cannot upload: %s' % path)
 
     def _write_stdin(self, filename):
         with self._collection_lock:
@@ -418,7 +426,6 @@ class ArvPutUploadJob(object):
 
     def _write_file(self, source, filename):
         resume_offset = 0
-        resume_upload = False
         if self.resume:
             # Check if file was already uploaded (at least partially)
             with self._collection_lock:
@@ -429,46 +436,53 @@ class ArvPutUploadJob(object):
                     file_in_collection = None
             # If no previous cached data on this file, store it for an eventual
             # repeated run.
-            if source not in self._state['files'].keys():
+            if source not in self._state['files']:
                 with self._state_lock:
                     self._state['files'][source] = {
-                        'mtime' : os.path.getmtime(source),
+                        'mtime': os.path.getmtime(source),
                         'size' : os.path.getsize(source)
                     }
-            cached_file_data = self._state['files'][source]
+            with self._state_lock:
+                cached_file_data = self._state['files'][source]
             # See if this file was already uploaded at least partially
             if file_in_collection:
                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                    if os.path.getsize(source) == file_in_collection.size():
+                    if cached_file_data['size'] == file_in_collection.size():
                         # File already there, skip it.
+                        self.bytes_skipped += cached_file_data['size']
                         return
-                    elif os.path.getsize(source) > file_in_collection.size():
+                    elif cached_file_data['size'] > file_in_collection.size():
                         # File partially uploaded, resume!
-                        resume_upload = True
                         resume_offset = file_in_collection.size()
                     else:
                         # Inconsistent cache, re-upload the file
-                        pass
+                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
                 else:
                     # Local file differs from cached data, re-upload it
                     pass
         with open(source, 'r') as source_fd:
-            if self.resume and resume_upload:
+            if resume_offset > 0:
+                # Start upload where we left off
                 with self._collection_lock:
-                    # Open for appending
                     output = self._my_collection().open(filename, 'a')
                 source_fd.seek(resume_offset)
+                self.bytes_skipped += resume_offset
             else:
+                # Start from scratch
                 with self._collection_lock:
                     output = self._my_collection().open(filename, 'w')
             self._write(source_fd, output)
             output.close()
 
     def _write(self, source_fd, output):
+        first_read = True
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            if not data:
+            # Allow an empty file to be written
+            if not data and not first_read:
                 break
+            if first_read:
+                first_read = False
             output.write(data)
 
     def _my_collection(self):
@@ -481,12 +495,12 @@ class ArvPutUploadJob(object):
             if self.resume and manifest is not None:
                 # Create collection from saved state
                 self._collection = arvados.collection.Collection(
-                                        manifest,
-                                        num_write_copies=self.write_copies)
+                    manifest,
+                    replication_desired=self.replication_desired)
             else:
                 # Create new collection
                 self._collection = arvados.collection.Collection(
-                                        num_write_copies=self.write_copies)
+                    replication_desired=self.replication_desired)
         return self._collection
 
     def _setup_state(self):
@@ -498,37 +512,31 @@ class ArvPutUploadJob(object):
             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
             md5.update('\0'.join(realpaths))
-            self._cache_hash = md5.hexdigest()
             if self.filename:
                 md5.update(self.filename)
+            cache_filename = md5.hexdigest()
             self._cache_file = open(os.path.join(
-                arv_cmd.make_home_conf_dir('.cache/arvados/arv-put', 0o700, 'raise'),
-                self._cache_hash), 'a+')
+                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+                cache_filename), 'a+')
+            self._cache_filename = self._cache_file.name
+            self._lock_file(self._cache_file)
             self._cache_file.seek(0)
             with self._state_lock:
                 try:
                     self._state = json.load(self._cache_file)
-                    if not 'manifest' in self._state.keys():
-                        self._state['manifest'] = ""
-                    if not 'files' in self._state.keys():
-                        self._state['files'] = {}
+                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                        # Cache at least partially incomplete, set up new cache
+                        self._state = copy.deepcopy(self.EMPTY_STATE)
                 except ValueError:
-                    # File empty, set up new cache
-                    self._state = {
-                        'manifest' : None,
-                        # Previous run file list: {path : {size, mtime}}
-                        'files' : {}
-                    }
+                    # Cache file empty, set up new cache
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load how many bytes were uploaded on previous run
             with self._collection_lock:
                 self.bytes_written = self._collection_size(self._my_collection())
         # No resume required
         else:
             with self._state_lock:
-                self._state = {
-                    'manifest' : None,
-                    'files' : {} # Previous run file list: {path : {size, mtime}}
-                }
+                self._state = copy.deepcopy(self.EMPTY_STATE)
 
     def _lock_file(self, fileobj):
         try:
@@ -544,14 +552,15 @@ class ArvPutUploadJob(object):
             with self._state_lock:
                 state = self._state
             new_cache_fd, new_cache_name = tempfile.mkstemp(
-                dir=os.path.dirname(self._cache_file.name))
+                dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
             new_cache = os.fdopen(new_cache_fd, 'r+')
             json.dump(state, new_cache)
-            new_cache.flush()
-            os.fsync(new_cache)
-            os.rename(new_cache_name, self._cache_file.name)
+            new_cache.flush()
+            os.fsync(new_cache)
+            os.rename(new_cache_name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
+            self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
                 os.unlink(new_cache_name)
             except NameError:  # mkstemp failed.
@@ -586,11 +595,15 @@ class ArvPutUploadJob(object):
         through subcollections
         """
         if isinstance(item, arvados.arvfile.ArvadosFile):
-            locators = []
-            for segment in item.segments():
-                loc = segment.locator
-                locators.append(loc)
-            return locators
+            if item.size() == 0:
+                # Empty file locator
+                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+            else:
+                locators = []
+                for segment in item.segments():
+                    loc = segment.locator
+                    locators.append(loc)
+                return locators
         elif isinstance(item, arvados.collection.Collection):
             l = [self._datablocks_on_item(x) for x in item.values()]
             # Fast list flattener method taken from:
@@ -607,81 +620,6 @@ class ArvPutUploadJob(object):
         return datablocks
 
 
-class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
-    STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
-                   ['bytes_written', '_seen_inputs'])
-
-    def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
-        self.bytes_written = 0
-        self._seen_inputs = []
-        self.cache = cache
-        self.reporter = reporter
-        self.bytes_expected = bytes_expected
-        super(ArvPutCollectionWriter, self).__init__(**kwargs)
-
-    @classmethod
-    def from_cache(cls, cache, reporter=None, bytes_expected=None,
-                   num_retries=0, replication=0):
-        try:
-            state = cache.load()
-            state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state, cache, reporter, bytes_expected,
-                                    num_retries=num_retries,
-                                    replication=replication)
-        except (TypeError, ValueError,
-                arvados.errors.StaleWriterStateError) as error:
-            return cls(cache, reporter, bytes_expected,
-                       num_retries=num_retries,
-                       replication=replication)
-        else:
-            return writer
-
-    def cache_state(self):
-        if self.cache is None:
-            return
-        state = self.dump_state()
-        # Transform attributes for serialization.
-        for attr, value in state.items():
-            if attr == '_data_buffer':
-                state[attr] = base64.encodestring(''.join(value))
-            elif hasattr(value, 'popleft'):
-                state[attr] = list(value)
-        self.cache.save(state)
-
-    def report_progress(self):
-        if self.reporter is not None:
-            self.reporter(self.bytes_written, self.bytes_expected)
-
-    def flush_data(self):
-        start_buffer_len = self._data_buffer_len
-        start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
-        super(ArvPutCollectionWriter, self).flush_data()
-        if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
-            self.bytes_written += (start_buffer_len - self._data_buffer_len)
-            self.report_progress()
-            if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
-                self.cache_state()
-
-    def _record_new_input(self, input_type, source_name, dest_name):
-        # The key needs to be a list because that's what we'll get back
-        # from JSON deserialization.
-        key = [input_type, source_name, dest_name]
-        if key in self._seen_inputs:
-            return False
-        self._seen_inputs.append(key)
-        return True
-
-    def write_file(self, source, filename=None):
-        if self._record_new_input('file', source, filename):
-            super(ArvPutCollectionWriter, self).write_file(source, filename)
-
-    def write_directory_tree(self,
-                             path, stream_name='.', max_manifest_depth=-1):
-        if self._record_new_input('directory', path, stream_name):
-            super(ArvPutCollectionWriter, self).write_directory_tree(
-                path, stream_name, max_manifest_depth)
-
-
 def expected_bytes_for(pathlist):
     # Walk the given directory trees and stat files, adding up file sizes,
     # so we can display progress as percent
@@ -761,19 +699,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         print >>stderr, error
         sys.exit(1)
 
-    # write_copies diverges from args.replication here.
-    # args.replication is how many copies we will instruct Arvados to
-    # maintain (by passing it in collections().create()) after all
-    # data is written -- and if None was given, we'll use None there.
-    # Meanwhile, write_copies is how many copies of each data block we
-    # write to Keep, which has to be a number.
-    #
-    # If we simply changed args.replication from None to a default
-    # here, we'd end up erroneously passing the default replication
-    # level (instead of None) to collections().create().
-    write_copies = (args.replication or
-                    api_client._rootDesc.get('defaultCollectionReplication', 2))
-
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
@@ -784,15 +709,15 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     bytes_expected = expected_bytes_for(args.paths)
     try:
         writer = ArvPutUploadJob(paths = args.paths,
-                                resume = args.resume,
-                                reporter = reporter,
-                                bytes_expected = bytes_expected,
-                                num_retries = args.retries,
-                                write_copies = write_copies,
-                                replication = args.replication,
-                                name = collection_name,
-                                owner_uuid = project_uuid,
-                                ensure_unique_name = True)
+                                 resume = args.resume,
+                                 filename = args.filename,
+                                 reporter = reporter,
+                                 bytes_expected = bytes_expected,
+                                 num_retries = args.retries,
+                                 replication_desired = args.replication,
+                                 name = collection_name,
+                                 owner_uuid = project_uuid,
+                                 ensure_unique_name = True)
     except ResumeCacheConflict:
         print >>stderr, "\n".join([
             "arv-put: Another process is already uploading this data.",
@@ -850,6 +775,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     if status != 0:
         sys.exit(status)
 
+    # Success!
+    writer.destroy_cache()
     return output