8937: invalidate cache and create new one if there are errors on head request during...
[arvados.git] / sdk / python / arvados / commands / put.py
index cc4fd53293af59e8c1fe8bef799c1b41694e6cb5..8fa1c8f66b9dbd456d3c661884a4052ca4b239c1 100644 (file)
@@ -5,6 +5,7 @@
 
 import argparse
 import arvados
 
 import argparse
 import arvados
+import arvados.collection
 import base64
 import datetime
 import errno
 import base64
 import datetime
 import errno
@@ -27,11 +28,13 @@ api_client = None
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
-                    help="""
+                         help="""
 Local file or directory. Default: read from standard input.
 """)
 
 Local file or directory. Default: read from standard input.
 """)
 
-upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
+_group = upload_opts.add_mutually_exclusive_group()
+
+_group.add_argument('--max-manifest-depth', type=int, metavar='N',
                     default=-1, help="""
 Maximum depth of directory tree to represent in the manifest
 structure. A directory structure deeper than this will be represented
                     default=-1, help="""
 Maximum depth of directory tree to represent in the manifest
 structure. A directory structure deeper than this will be represented
@@ -40,56 +43,62 @@ a single stream. Default: -1 (unlimited), i.e., exactly one manifest
 stream per filesystem directory that contains files.
 """)
 
 stream per filesystem directory that contains files.
 """)
 
+_group.add_argument('--normalize', action='store_true',
+                    help="""
+Normalize the manifest by re-ordering files and streams after writing
+data.
+""")
+
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
-                   help="""
+                    help="""
 Synonym for --stream.
 """)
 
 _group.add_argument('--stream', action='store_true',
 Synonym for --stream.
 """)
 
 _group.add_argument('--stream', action='store_true',
-                   help="""
+                    help="""
 Store the file content and display the resulting manifest on
 stdout. Do not write the manifest to Keep or save a Collection object
 in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
 Store the file content and display the resulting manifest on
 stdout. Do not write the manifest to Keep or save a Collection object
 in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
-                   help="""
+                    help="""
 Synonym for --manifest.
 """)
 
 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
 Synonym for --manifest.
 """)
 
 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
-                   help="""
+                    help="""
 Synonym for --manifest.
 """)
 
 _group.add_argument('--manifest', action='store_true',
 Synonym for --manifest.
 """)
 
 _group.add_argument('--manifest', action='store_true',
-                   help="""
+                    help="""
 Store the file data and resulting manifest in Keep, save a Collection
 object in Arvados, and display the manifest locator (Collection uuid)
 on stdout. This is the default behavior.
 """)
 
 _group.add_argument('--as-raw', action='store_true', dest='raw',
 Store the file data and resulting manifest in Keep, save a Collection
 object in Arvados, and display the manifest locator (Collection uuid)
 on stdout. This is the default behavior.
 """)
 
 _group.add_argument('--as-raw', action='store_true', dest='raw',
-                   help="""
+                    help="""
 Synonym for --raw.
 """)
 
 _group.add_argument('--raw', action='store_true',
 Synonym for --raw.
 """)
 
 _group.add_argument('--raw', action='store_true',
-                   help="""
+                    help="""
 Store the file content and display the data block locators on stdout,
 separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
 upload_opts.add_argument('--use-filename', type=str, default=None,
 Store the file content and display the data block locators on stdout,
 separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
 upload_opts.add_argument('--use-filename', type=str, default=None,
-                    dest='filename', help="""
+                         dest='filename', help="""
 Synonym for --filename.
 """)
 
 upload_opts.add_argument('--filename', type=str, default=None,
 Synonym for --filename.
 """)
 
 upload_opts.add_argument('--filename', type=str, default=None,
-                    help="""
+                         help="""
 Use the given filename in the manifest, instead of the name of the
 local file. This is useful when "-" or "/dev/stdin" is given as an
 input file. It can be used only if there is exactly one path given and
 Use the given filename in the manifest, instead of the name of the
 local file. This is useful when "-" or "/dev/stdin" is given as an
 input file. It can be used only if there is exactly one path given and
@@ -97,15 +106,16 @@ it is not a directory. Implies --manifest.
 """)
 
 upload_opts.add_argument('--portable-data-hash', action='store_true',
 """)
 
 upload_opts.add_argument('--portable-data-hash', action='store_true',
-                    help="""
+                         help="""
 Print the portable data hash instead of the Arvados UUID for the collection
 created by the upload.
 """)
 
 Print the portable data hash instead of the Arvados UUID for the collection
 created by the upload.
 """)
 
-upload_opts.add_argument('--normalize', action='store_true',
-                    help="""
-Normalize the manifest by re-ordering files and streams after writing
-data. This makes the --max-manifest-depth option ineffective.
+upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
+                         help="""
+Set the replication level for the new collection: how many different
+physical storage devices (e.g., disks) should have a copy of each data
+block. Default is to use the server-provided default (if any) or 2.
 """)
 
 run_opts = argparse.ArgumentParser(add_help=False)
 """)
 
 run_opts = argparse.ArgumentParser(add_help=False)
@@ -121,31 +131,31 @@ Save the collection with the specified name.
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
-                   help="""
+                    help="""
 Display human-readable progress on stderr (bytes and, if possible,
 percentage of total data size). This is the default behavior when
 stderr is a tty.
 """)
 
 _group.add_argument('--no-progress', action='store_true',
 Display human-readable progress on stderr (bytes and, if possible,
 percentage of total data size). This is the default behavior when
 stderr is a tty.
 """)
 
 _group.add_argument('--no-progress', action='store_true',
-                   help="""
+                    help="""
 Do not display human-readable progress on stderr, even if stderr is a
 tty.
 """)
 
 _group.add_argument('--batch-progress', action='store_true',
 Do not display human-readable progress on stderr, even if stderr is a
 tty.
 """)
 
 _group.add_argument('--batch-progress', action='store_true',
-                   help="""
+                    help="""
 Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
 Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
-                   help="""
+                    help="""
 Continue interrupted uploads from cached state (default).
 """)
 _group.add_argument('--no-resume', action='store_false', dest='resume',
 Continue interrupted uploads from cached state (default).
 """)
 _group.add_argument('--no-resume', action='store_false', dest='resume',
-                   help="""
+                    help="""
 Do not continue interrupted uploads from cached state.
 """)
 
 Do not continue interrupted uploads from cached state.
 """)
 
@@ -157,7 +167,9 @@ def parse_arguments(arguments):
     args = arg_parser.parse_args(arguments)
 
     if len(args.paths) == 0:
     args = arg_parser.parse_args(arguments)
 
     if len(args.paths) == 0:
-        args.paths += ['/dev/stdin']
+        args.paths = ['-']
+
+    args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
 
     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
         if args.filename:
 
     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
         if args.filename:
@@ -172,9 +184,9 @@ def parse_arguments(arguments):
         args.progress = True
 
     if args.paths == ['-']:
         args.progress = True
 
     if args.paths == ['-']:
-        args.paths = ['/dev/stdin']
+        args.resume = False
         if not args.filename:
         if not args.filename:
-            args.filename = '-'
+            args.filename = 'stdin'
 
     return args
 
 
     return args
 
@@ -185,10 +197,25 @@ class ResumeCacheConflict(Exception):
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
-    def __init__(self, file_spec):
+    def __init__(self, file_spec, api_client=None, num_retries=0):
         self.cache_file = open(file_spec, 'a+')
         self._lock_file(self.cache_file)
         self.filename = self.cache_file.name
         self.cache_file = open(file_spec, 'a+')
         self._lock_file(self.cache_file)
         self.filename = self.cache_file.name
+        try:
+            state = self.load()
+            locator = None
+            try:
+                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
+                    locator = state["_finished_streams"][0][1][0]
+                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
+                    locator = state["_current_stream_locators"][0]
+                if locator is not None:
+                    kc = arvados.keep.KeepClient(api_client=api_client)
+                    kc.head(locator, num_retries=num_retries)
+            except Exception as e:
+                raise arvados.errors.KeepRequestError("Head request error for {}: {}".format(locator, e))
+        except (ValueError):
+            pass
 
     @classmethod
     def make_path(cls, args):
 
     @classmethod
     def make_path(cls, args):
@@ -251,27 +278,28 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
                    ['bytes_written', '_seen_inputs'])
 
     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
                    ['bytes_written', '_seen_inputs'])
 
-    def __init__(self, cache=None, reporter=None, bytes_expected=None,
-                 api_client=None, num_retries=0):
+    def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
         self.bytes_written = 0
         self._seen_inputs = []
         self.cache = cache
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
         self._seen_inputs = []
         self.cache = cache
         self.reporter = reporter
         self.bytes_expected = bytes_expected
-        super(ArvPutCollectionWriter, self).__init__(
-            api_client, num_retries=num_retries)
+        super(ArvPutCollectionWriter, self).__init__(**kwargs)
 
     @classmethod
     def from_cache(cls, cache, reporter=None, bytes_expected=None,
 
     @classmethod
     def from_cache(cls, cache, reporter=None, bytes_expected=None,
-                   num_retries=0):
+                   num_retries=0, replication=0):
         try:
             state = cache.load()
             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
             writer = cls.from_state(state, cache, reporter, bytes_expected,
         try:
             state = cache.load()
             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
             writer = cls.from_state(state, cache, reporter, bytes_expected,
-                                    num_retries=num_retries)
+                                    num_retries=num_retries,
+                                    replication=replication)
         except (TypeError, ValueError,
                 arvados.errors.StaleWriterStateError) as error:
         except (TypeError, ValueError,
                 arvados.errors.StaleWriterStateError) as error:
-            return cls(cache, reporter, bytes_expected, num_retries=num_retries)
+            return cls(cache, reporter, bytes_expected,
+                       num_retries=num_retries,
+                       replication=replication)
         else:
             return writer
 
         else:
             return writer
 
@@ -293,12 +321,12 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
 
     def flush_data(self):
         start_buffer_len = self._data_buffer_len
 
     def flush_data(self):
         start_buffer_len = self._data_buffer_len
-        start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
+        start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
         super(ArvPutCollectionWriter, self).flush_data()
         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
             self.bytes_written += (start_buffer_len - self._data_buffer_len)
             self.report_progress()
         super(ArvPutCollectionWriter, self).flush_data()
         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
             self.bytes_written += (start_buffer_len - self._data_buffer_len)
             self.report_progress()
-            if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+            if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
                 self.cache_state()
 
     def _record_new_input(self, input_type, source_name, dest_name):
                 self.cache_state()
 
     def _record_new_input(self, input_type, source_name, dest_name):
@@ -400,6 +428,19 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         print >>stderr, error
         sys.exit(1)
 
         print >>stderr, error
         sys.exit(1)
 
+    # write_copies diverges from args.replication here.
+    # args.replication is how many copies we will instruct Arvados to
+    # maintain (by passing it in collections().create()) after all
+    # data is written -- and if None was given, we'll use None there.
+    # Meanwhile, write_copies is how many copies of each data block we
+    # write to Keep, which has to be a number.
+    #
+    # If we simply changed args.replication from None to a default
+    # here, we'd end up erroneously passing the default replication
+    # level (instead of None) to collections().create().
+    write_copies = (args.replication or
+                    api_client._rootDesc.get('defaultCollectionReplication', 2))
+
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
@@ -411,9 +452,14 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     resume_cache = None
     if args.resume:
         try:
     resume_cache = None
     if args.resume:
         try:
-            resume_cache = ResumeCache(ResumeCache.make_path(args))
+            cachepath = ResumeCache.make_path(args)
+            resume_cache = ResumeCache(cachepath, api_client=api_client, num_retries=args.retries)
         except (IOError, OSError, ValueError):
             pass  # Couldn't open cache directory/file.  Continue without it.
         except (IOError, OSError, ValueError):
             pass  # Couldn't open cache directory/file.  Continue without it.
+        except arvados.errors.KeepRequestError:
+            # delete the cache and create a new one
+            shutil.rmtree(cachepath)
+            resume_cache = ResumeCache(cachepath)
         except ResumeCacheConflict:
             print >>stderr, "\n".join([
                 "arv-put: Another process is already uploading this data.",
         except ResumeCacheConflict:
             print >>stderr, "\n".join([
                 "arv-put: Another process is already uploading this data.",
@@ -421,11 +467,15 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             sys.exit(1)
 
     if resume_cache is None:
             sys.exit(1)
 
     if resume_cache is None:
-        writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
-                                        num_retries=args.retries)
+        writer = ArvPutCollectionWriter(
+            resume_cache, reporter, bytes_expected,
+            num_retries=args.retries,
+            replication=write_copies)
     else:
         writer = ArvPutCollectionWriter.from_cache(
     else:
         writer = ArvPutCollectionWriter.from_cache(
-            resume_cache, reporter, bytes_expected, num_retries=args.retries)
+            resume_cache, reporter, bytes_expected,
+            num_retries=args.retries,
+            replication=write_copies)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
@@ -440,7 +490,16 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     writer.report_progress()
     writer.do_queued_work()  # Do work resumed from cache.
     for path in args.paths:  # Copy file data to Keep.
     writer.report_progress()
     writer.do_queued_work()  # Do work resumed from cache.
     for path in args.paths:  # Copy file data to Keep.
-        if os.path.isdir(path):
+        if path == '-':
+            writer.start_new_stream()
+            writer.start_new_file(args.filename)
+            r = sys.stdin.read(64*1024)
+            while r:
+                # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
+                # CollectionWriter.write().
+                super(arvados.collection.ResumableCollectionWriter, writer).write(r)
+                r = sys.stdin.read(64*1024)
+        elif os.path.isdir(path):
             writer.write_directory_tree(
                 path, max_manifest_depth=args.max_manifest_depth)
         else:
             writer.write_directory_tree(
                 path, max_manifest_depth=args.max_manifest_depth)
         else:
@@ -454,20 +513,25 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     if args.stream:
         output = writer.manifest_text()
         if args.normalize:
     if args.stream:
         output = writer.manifest_text()
         if args.normalize:
-            output = CollectionReader(output).manifest_text(normalize=True)
+            output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
         try:
             manifest_text = writer.manifest_text()
             if args.normalize:
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
         try:
             manifest_text = writer.manifest_text()
             if args.normalize:
-                manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True)
+                manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
+            replication_attr = 'replication_desired'
+            if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
+                # API called it 'redundancy' before #3410.
+                replication_attr = 'redundancy'
             # Register the resulting collection in Arvados.
             collection = api_client.collections().create(
                 body={
                     'owner_uuid': project_uuid,
                     'name': collection_name,
             # Register the resulting collection in Arvados.
             collection = api_client.collections().create(
                 body={
                     'owner_uuid': project_uuid,
                     'name': collection_name,
-                    'manifest_text': manifest_text
+                    'manifest_text': manifest_text,
+                    replication_attr: args.replication,
                     },
                 ensure_unique_name=True
                 ).execute(num_retries=args.retries)
                     },
                 ensure_unique_name=True
                 ).execute(num_retries=args.retries)