10315: Bring back arv-put command from 9701's branch.
[arvados.git] / sdk / python / arvados / commands / put.py
index 31cb0cbae4c9e7566ed7993ec42acbfd7cca472f..89753a22863808c3fb89686cbf9948dae8760171 100644 (file)
@@ -20,6 +20,8 @@ import socket
 import sys
 import tempfile
 import threading
+import copy
+import logging
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -280,10 +282,14 @@ class ResumeCache(object):
 
 class ArvPutUploadJob(object):
     CACHE_DIR = '.cache/arvados/arv-put'
+    EMPTY_STATE = {
+        'manifest' : None, # Last saved manifest checkpoint
+        'files' : {} # Previous run file list: {path : {size, mtime}}
+    }
 
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
                  name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, write_copies=None, replication=None,
+                 num_retries=None, replication_desired=None,
                  filename=None, update_time=60.0):
         self.paths = paths
         self.resume = resume
@@ -295,8 +301,7 @@ class ArvPutUploadJob(object):
         self.owner_uuid = owner_uuid
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
-        self.write_copies = write_copies
-        self.replication = replication
+        self.replication_desired = replication_desired
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
@@ -307,6 +312,7 @@ class ArvPutUploadJob(object):
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
+        self.logger = logging.getLogger('arvados.arv_put')
         # Load cached data if any and if needed
         self._setup_state()
 
@@ -314,6 +320,7 @@ class ArvPutUploadJob(object):
         """
         Start supporting thread & file uploading
         """
+        self._checkpointer.daemon = True
         self._checkpointer.start()
         try:
             for path in self.paths:
@@ -328,27 +335,28 @@ class ArvPutUploadJob(object):
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
             self._checkpointer.join()
-        # Successful upload, one last _update()
-        self._update()
-        if self.resume:
-            self._cache_file.close()
-            # Correct the final written bytes count
-            self.bytes_written -= self.bytes_skipped
+            # Commit all & one last _update()
+            self.manifest_text()
+            self._update()
+            if self.resume:
+                self._cache_file.close()
+                # Correct the final written bytes count
+                self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
         with self._collection_lock:
             self._my_collection().save_new(
-                                name=self.name, owner_uuid=self.owner_uuid,
-                                ensure_unique_name=self.ensure_unique_name,
-                                num_retries=self.num_retries,
-                                replication_desired=self.replication)
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
 
     def destroy_cache(self):
         if self.resume:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
-                if error.errno != errno.ENOENT:  # That's what we wanted anyway.
+                # That's what we wanted anyway.
+                if error.errno != errno.ENOENT:
                     raise
             self._cache_file.close()
 
@@ -381,7 +389,8 @@ class ArvPutUploadJob(object):
             # Update cache, if resume enabled
             if self.resume:
                 with self._state_lock:
-                    self._state['manifest'] = self._my_collection().manifest_text()
+                    # Get the manifest text without comitting pending blocks
+                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
         if self.resume:
             self._save_state()
         # Call the reporter, if any
@@ -417,7 +426,6 @@ class ArvPutUploadJob(object):
 
     def _write_file(self, source, filename):
         resume_offset = 0
-        resume_upload = False
         if self.resume:
             # Check if file was already uploaded (at least partially)
             with self._collection_lock:
@@ -428,48 +436,53 @@ class ArvPutUploadJob(object):
                     file_in_collection = None
             # If no previous cached data on this file, store it for an eventual
             # repeated run.
-            if source not in self._state['files'].keys():
+            if source not in self._state['files']:
                 with self._state_lock:
                     self._state['files'][source] = {
-                        'mtime' : os.path.getmtime(source),
+                        'mtime': os.path.getmtime(source),
                         'size' : os.path.getsize(source)
                     }
-            cached_file_data = self._state['files'][source]
+            with self._state_lock:
+                cached_file_data = self._state['files'][source]
             # See if this file was already uploaded at least partially
             if file_in_collection:
                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                    if os.path.getsize(source) == file_in_collection.size():
+                    if cached_file_data['size'] == file_in_collection.size():
                         # File already there, skip it.
-                        self.bytes_skipped += os.path.getsize(source)
+                        self.bytes_skipped += cached_file_data['size']
                         return
-                    elif os.path.getsize(source) > file_in_collection.size():
+                    elif cached_file_data['size'] > file_in_collection.size():
                         # File partially uploaded, resume!
-                        resume_upload = True
                         resume_offset = file_in_collection.size()
                     else:
                         # Inconsistent cache, re-upload the file
-                        pass
+                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
                 else:
                     # Local file differs from cached data, re-upload it
                     pass
         with open(source, 'r') as source_fd:
-            if self.resume and resume_upload:
+            if resume_offset > 0:
+                # Start upload where we left off
                 with self._collection_lock:
-                    # Open for appending
                     output = self._my_collection().open(filename, 'a')
                 source_fd.seek(resume_offset)
                 self.bytes_skipped += resume_offset
             else:
+                # Start from scratch
                 with self._collection_lock:
                     output = self._my_collection().open(filename, 'w')
             self._write(source_fd, output)
             output.close()
 
     def _write(self, source_fd, output):
+        first_read = True
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            if not data:
+            # Allow an empty file to be written
+            if not data and not first_read:
                 break
+            if first_read:
+                first_read = False
             output.write(data)
 
     def _my_collection(self):
@@ -482,12 +495,12 @@ class ArvPutUploadJob(object):
             if self.resume and manifest is not None:
                 # Create collection from saved state
                 self._collection = arvados.collection.Collection(
-                                        manifest,
-                                        num_write_copies=self.write_copies)
+                    manifest,
+                    replication_desired=self.replication_desired)
             else:
                 # Create new collection
                 self._collection = arvados.collection.Collection(
-                                        num_write_copies=self.write_copies)
+                    replication_desired=self.replication_desired)
         return self._collection
 
     def _setup_state(self):
@@ -511,27 +524,19 @@ class ArvPutUploadJob(object):
             with self._state_lock:
                 try:
                     self._state = json.load(self._cache_file)
-                    if not 'manifest' in self._state.keys():
-                        self._state['manifest'] = ""
-                    if not 'files' in self._state.keys():
-                        self._state['files'] = {}
+                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                        # Cache at least partially incomplete, set up new cache
+                        self._state = copy.deepcopy(self.EMPTY_STATE)
                 except ValueError:
-                    # File empty, set up new cache
-                    self._state = {
-                        'manifest' : None,
-                        # Previous run file list: {path : {size, mtime}}
-                        'files' : {}
-                    }
+                    # Cache file empty, set up new cache
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load how many bytes were uploaded on previous run
             with self._collection_lock:
                 self.bytes_written = self._collection_size(self._my_collection())
         # No resume required
         else:
             with self._state_lock:
-                self._state = {
-                    'manifest' : None,
-                    'files' : {} # Previous run file list: {path : {size, mtime}}
-                }
+                self._state = copy.deepcopy(self.EMPTY_STATE)
 
     def _lock_file(self, fileobj):
         try:
@@ -555,6 +560,7 @@ class ArvPutUploadJob(object):
             os.fsync(new_cache)
             os.rename(new_cache_name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
+            self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
                 os.unlink(new_cache_name)
             except NameError:  # mkstemp failed.
@@ -589,11 +595,15 @@ class ArvPutUploadJob(object):
         through subcollections
         """
         if isinstance(item, arvados.arvfile.ArvadosFile):
-            locators = []
-            for segment in item.segments():
-                loc = segment.locator
-                locators.append(loc)
-            return locators
+            if item.size() == 0:
+                # Empty file locator
+                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+            else:
+                locators = []
+                for segment in item.segments():
+                    loc = segment.locator
+                    locators.append(loc)
+                return locators
         elif isinstance(item, arvados.collection.Collection):
             l = [self._datablocks_on_item(x) for x in item.values()]
             # Fast list flattener method taken from:
@@ -689,19 +699,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         print >>stderr, error
         sys.exit(1)
 
-    # write_copies diverges from args.replication here.
-    # args.replication is how many copies we will instruct Arvados to
-    # maintain (by passing it in collections().create()) after all
-    # data is written -- and if None was given, we'll use None there.
-    # Meanwhile, write_copies is how many copies of each data block we
-    # write to Keep, which has to be a number.
-    #
-    # If we simply changed args.replication from None to a default
-    # here, we'd end up erroneously passing the default replication
-    # level (instead of None) to collections().create().
-    write_copies = (args.replication or
-                    api_client._rootDesc.get('defaultCollectionReplication', 2))
-
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
@@ -712,15 +709,15 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     bytes_expected = expected_bytes_for(args.paths)
     try:
         writer = ArvPutUploadJob(paths = args.paths,
-                                resume = args.resume,
-                                reporter = reporter,
-                                bytes_expected = bytes_expected,
-                                num_retries = args.retries,
-                                write_copies = write_copies,
-                                replication = args.replication,
-                                name = collection_name,
-                                owner_uuid = project_uuid,
-                                ensure_unique_name = True)
+                                 resume = args.resume,
+                                 filename = args.filename,
+                                 reporter = reporter,
+                                 bytes_expected = bytes_expected,
+                                 num_retries = args.retries,
+                                 replication_desired = args.replication,
+                                 name = collection_name,
+                                 owner_uuid = project_uuid,
+                                 ensure_unique_name = True)
     except ResumeCacheConflict:
         print >>stderr, "\n".join([
             "arv-put: Another process is already uploading this data.",
@@ -777,9 +774,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     if status != 0:
         sys.exit(status)
-    else:
-        writer.destroy_cache()
 
+    # Success!
+    writer.destroy_cache()
     return output