3198: Support for truncating files. A few tests. Next step work on random access...
[arvados.git] / sdk / python / arvados / commands / put.py
index 15551fac07db8d29b6aca0da8f40b5880edd0edd..95ba17280142cac3b51db05c376bb04b62953d4d 100644 (file)
@@ -3,7 +3,6 @@
 # TODO:
 # --md5sum - display md5 of each file as read from disk
 
 # TODO:
 # --md5sum - display md5 of each file as read from disk
 
-import apiclient.errors
 import argparse
 import arvados
 import base64
 import argparse
 import arvados
 import base64
@@ -18,6 +17,7 @@ import signal
 import socket
 import sys
 import tempfile
 import socket
 import sys
 import tempfile
+from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
 
 
 import arvados.commands._util as arv_cmd
 
@@ -27,11 +27,13 @@ api_client = None
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
-                    help="""
+                         help="""
 Local file or directory. Default: read from standard input.
 """)
 
 Local file or directory. Default: read from standard input.
 """)
 
-upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
+_group = upload_opts.add_mutually_exclusive_group()
+
+_group.add_argument('--max-manifest-depth', type=int, metavar='N',
                     default=-1, help="""
 Maximum depth of directory tree to represent in the manifest
 structure. A directory structure deeper than this will be represented
                     default=-1, help="""
 Maximum depth of directory tree to represent in the manifest
 structure. A directory structure deeper than this will be represented
@@ -40,56 +42,62 @@ a single stream. Default: -1 (unlimited), i.e., exactly one manifest
 stream per filesystem directory that contains files.
 """)
 
 stream per filesystem directory that contains files.
 """)
 
+_group.add_argument('--normalize', action='store_true',
+                    help="""
+Normalize the manifest by re-ordering files and streams after writing
+data.
+""")
+
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
-                   help="""
+                    help="""
 Synonym for --stream.
 """)
 
 _group.add_argument('--stream', action='store_true',
 Synonym for --stream.
 """)
 
 _group.add_argument('--stream', action='store_true',
-                   help="""
+                    help="""
 Store the file content and display the resulting manifest on
 stdout. Do not write the manifest to Keep or save a Collection object
 in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
 Store the file content and display the resulting manifest on
 stdout. Do not write the manifest to Keep or save a Collection object
 in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
-                   help="""
+                    help="""
 Synonym for --manifest.
 """)
 
 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
 Synonym for --manifest.
 """)
 
 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
-                   help="""
+                    help="""
 Synonym for --manifest.
 """)
 
 _group.add_argument('--manifest', action='store_true',
 Synonym for --manifest.
 """)
 
 _group.add_argument('--manifest', action='store_true',
-                   help="""
+                    help="""
 Store the file data and resulting manifest in Keep, save a Collection
 object in Arvados, and display the manifest locator (Collection uuid)
 on stdout. This is the default behavior.
 """)
 
 _group.add_argument('--as-raw', action='store_true', dest='raw',
 Store the file data and resulting manifest in Keep, save a Collection
 object in Arvados, and display the manifest locator (Collection uuid)
 on stdout. This is the default behavior.
 """)
 
 _group.add_argument('--as-raw', action='store_true', dest='raw',
-                   help="""
+                    help="""
 Synonym for --raw.
 """)
 
 _group.add_argument('--raw', action='store_true',
 Synonym for --raw.
 """)
 
 _group.add_argument('--raw', action='store_true',
-                   help="""
+                    help="""
 Store the file content and display the data block locators on stdout,
 separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
 upload_opts.add_argument('--use-filename', type=str, default=None,
 Store the file content and display the data block locators on stdout,
 separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
 upload_opts.add_argument('--use-filename', type=str, default=None,
-                    dest='filename', help="""
+                         dest='filename', help="""
 Synonym for --filename.
 """)
 
 upload_opts.add_argument('--filename', type=str, default=None,
 Synonym for --filename.
 """)
 
 upload_opts.add_argument('--filename', type=str, default=None,
-                    help="""
+                         help="""
 Use the given filename in the manifest, instead of the name of the
 local file. This is useful when "-" or "/dev/stdin" is given as an
 input file. It can be used only if there is exactly one path given and
 Use the given filename in the manifest, instead of the name of the
 local file. This is useful when "-" or "/dev/stdin" is given as an
 input file. It can be used only if there is exactly one path given and
@@ -97,7 +105,7 @@ it is not a directory. Implies --manifest.
 """)
 
 upload_opts.add_argument('--portable-data-hash', action='store_true',
 """)
 
 upload_opts.add_argument('--portable-data-hash', action='store_true',
-                    help="""
+                         help="""
 Print the portable data hash instead of the Arvados UUID for the collection
 created by the upload.
 """)
 Print the portable data hash instead of the Arvados UUID for the collection
 created by the upload.
 """)
@@ -115,37 +123,37 @@ Save the collection with the specified name.
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
-                   help="""
+                    help="""
 Display human-readable progress on stderr (bytes and, if possible,
 percentage of total data size). This is the default behavior when
 stderr is a tty.
 """)
 
 _group.add_argument('--no-progress', action='store_true',
 Display human-readable progress on stderr (bytes and, if possible,
 percentage of total data size). This is the default behavior when
 stderr is a tty.
 """)
 
 _group.add_argument('--no-progress', action='store_true',
-                   help="""
+                    help="""
 Do not display human-readable progress on stderr, even if stderr is a
 tty.
 """)
 
 _group.add_argument('--batch-progress', action='store_true',
 Do not display human-readable progress on stderr, even if stderr is a
 tty.
 """)
 
 _group.add_argument('--batch-progress', action='store_true',
-                   help="""
+                    help="""
 Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
 Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
-                   help="""
+                    help="""
 Continue interrupted uploads from cached state (default).
 """)
 _group.add_argument('--no-resume', action='store_false', dest='resume',
 Continue interrupted uploads from cached state (default).
 """)
 _group.add_argument('--no-resume', action='store_false', dest='resume',
-                   help="""
+                    help="""
 Do not continue interrupted uploads from cached state.
 """)
 
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
 Do not continue interrupted uploads from cached state.
 """)
 
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
-    parents=[upload_opts, run_opts])
+    parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 
 def parse_arguments(arguments):
     args = arg_parser.parse_args(arguments)
 
 def parse_arguments(arguments):
     args = arg_parser.parse_args(arguments)
@@ -246,23 +254,26 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
                    ['bytes_written', '_seen_inputs'])
 
     def __init__(self, cache=None, reporter=None, bytes_expected=None,
                    ['bytes_written', '_seen_inputs'])
 
     def __init__(self, cache=None, reporter=None, bytes_expected=None,
-                 api_client=None):
+                 api_client=None, num_retries=0):
         self.bytes_written = 0
         self._seen_inputs = []
         self.cache = cache
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
         self._seen_inputs = []
         self.cache = cache
         self.reporter = reporter
         self.bytes_expected = bytes_expected
-        super(ArvPutCollectionWriter, self).__init__(api_client)
+        super(ArvPutCollectionWriter, self).__init__(
+            api_client, num_retries=num_retries)
 
     @classmethod
 
     @classmethod
-    def from_cache(cls, cache, reporter=None, bytes_expected=None):
+    def from_cache(cls, cache, reporter=None, bytes_expected=None,
+                   num_retries=0):
         try:
             state = cache.load()
             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
         try:
             state = cache.load()
             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state, cache, reporter, bytes_expected)
+            writer = cls.from_state(state, cache, reporter, bytes_expected,
+                                    num_retries=num_retries)
         except (TypeError, ValueError,
                 arvados.errors.StaleWriterStateError) as error:
         except (TypeError, ValueError,
                 arvados.errors.StaleWriterStateError) as error:
-            return cls(cache, reporter, bytes_expected)
+            return cls(cache, reporter, bytes_expected, num_retries=num_retries)
         else:
             return writer
 
         else:
             return writer
 
@@ -284,12 +295,12 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
 
     def flush_data(self):
         start_buffer_len = self._data_buffer_len
 
     def flush_data(self):
         start_buffer_len = self._data_buffer_len
-        start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
+        start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
         super(ArvPutCollectionWriter, self).flush_data()
         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
             self.bytes_written += (start_buffer_len - self._data_buffer_len)
             self.report_progress()
         super(ArvPutCollectionWriter, self).flush_data()
         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
             self.bytes_written += (start_buffer_len - self._data_buffer_len)
             self.report_progress()
-            if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+            if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
                 self.cache_state()
 
     def _record_new_input(self, input_type, source_name, dest_name):
                 self.cache_state()
 
     def _record_new_input(self, input_type, source_name, dest_name):
@@ -348,26 +359,24 @@ def progress_writer(progress_func, outfile=sys.stderr):
 def exit_signal_handler(sigcode, frame):
     sys.exit(-sigcode)
 
 def exit_signal_handler(sigcode, frame):
     sys.exit(-sigcode)
 
-def desired_project_uuid(api_client, project_uuid):
-    if project_uuid:
-        if arvados.util.user_uuid_pattern.match(project_uuid):
-            api_client.users().get(uuid=project_uuid).execute()
-            return project_uuid
-        elif arvados.util.group_uuid_pattern.match(project_uuid):
-            api_client.groups().get(uuid=project_uuid).execute()
-            return project_uuid
-        else:
-            raise ValueError("Not a valid project uuid: {}".format(project_uuid))
+def desired_project_uuid(api_client, project_uuid, num_retries):
+    if not project_uuid:
+        query = api_client.users().current()
+    elif arvados.util.user_uuid_pattern.match(project_uuid):
+        query = api_client.users().get(uuid=project_uuid)
+    elif arvados.util.group_uuid_pattern.match(project_uuid):
+        query = api_client.groups().get(uuid=project_uuid)
     else:
     else:
-        return api_client.users().current().execute()['uuid']
+        raise ValueError("Not a valid project UUID: {}".format(project_uuid))
+    return query.execute(num_retries=num_retries)['uuid']
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
-    if api_client is None:
-        api_client = arvados.api('v1')
-    status = 0
 
     args = parse_arguments(arguments)
 
     args = parse_arguments(arguments)
+    status = 0
+    if api_client is None:
+        api_client = arvados.api('v1')
 
     # Determine the name to use
     if args.name:
 
     # Determine the name to use
     if args.name:
@@ -387,8 +396,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     # Determine the parent project
     try:
 
     # Determine the parent project
     try:
-        project_uuid = desired_project_uuid(api_client, args.project_uuid)
-    except (apiclient.errors.Error, ValueError) as error:
+        project_uuid = desired_project_uuid(api_client, args.project_uuid,
+                                            args.retries)
+    except (apiclient_errors.Error, ValueError) as error:
         print >>stderr, error
         sys.exit(1)
 
         print >>stderr, error
         sys.exit(1)
 
@@ -413,10 +423,11 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             sys.exit(1)
 
     if resume_cache is None:
             sys.exit(1)
 
     if resume_cache is None:
-        writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
+        writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
+                                        num_retries=args.retries)
     else:
         writer = ArvPutCollectionWriter.from_cache(
     else:
         writer = ArvPutCollectionWriter.from_cache(
-            resume_cache, reporter, bytes_expected)
+            resume_cache, reporter, bytes_expected, num_retries=args.retries)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
@@ -444,19 +455,24 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     if args.stream:
         output = writer.manifest_text()
 
     if args.stream:
         output = writer.manifest_text()
+        if args.normalize:
+            output = CollectionReader(output).manifest_text(normalize=True)
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
         try:
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
         try:
+            manifest_text = writer.manifest_text()
+            if args.normalize:
+                manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True)
             # Register the resulting collection in Arvados.
             collection = api_client.collections().create(
                 body={
                     'owner_uuid': project_uuid,
                     'name': collection_name,
             # Register the resulting collection in Arvados.
             collection = api_client.collections().create(
                 body={
                     'owner_uuid': project_uuid,
                     'name': collection_name,
-                    'manifest_text': writer.manifest_text()
+                    'manifest_text': manifest_text
                     },
                 ensure_unique_name=True
                     },
                 ensure_unique_name=True
-                ).execute()
+                ).execute(num_retries=args.retries)
 
             print >>stderr, "Collection saved as '%s'" % collection['name']
 
 
             print >>stderr, "Collection saved as '%s'" % collection['name']
 
@@ -465,7 +481,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             else:
                 output = collection['uuid']
 
             else:
                 output = collection['uuid']
 
-        except apiclient.errors.Error as error:
+        except apiclient_errors.Error as error:
             print >>stderr, (
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             print >>stderr, (
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))