3198: Support for truncating files. A few tests. Next step work on random access...
[arvados.git] / sdk / python / arvados / commands / put.py
index 6abaf770871431c0d9cf06f88e58da91a93f9b23..95ba17280142cac3b51db05c376bb04b62953d4d 100644 (file)
@@ -3,7 +3,6 @@
 # TODO:
 # --md5sum - display md5 of each file as read from disk
 
 # TODO:
 # --md5sum - display md5 of each file as read from disk
 
-import apiclient.errors
 import argparse
 import arvados
 import base64
 import argparse
 import arvados
 import base64
@@ -18,6 +17,7 @@ import signal
 import socket
 import sys
 import tempfile
 import socket
 import sys
 import tempfile
+from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
 
 
 import arvados.commands._util as arv_cmd
 
@@ -27,11 +27,13 @@ api_client = None
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
-                    help="""
+                         help="""
 Local file or directory. Default: read from standard input.
 """)
 
 Local file or directory. Default: read from standard input.
 """)
 
-upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
+_group = upload_opts.add_mutually_exclusive_group()
+
+_group.add_argument('--max-manifest-depth', type=int, metavar='N',
                     default=-1, help="""
 Maximum depth of directory tree to represent in the manifest
 structure. A directory structure deeper than this will be represented
                     default=-1, help="""
 Maximum depth of directory tree to represent in the manifest
 structure. A directory structure deeper than this will be represented
@@ -40,66 +42,62 @@ a single stream. Default: -1 (unlimited), i.e., exactly one manifest
 stream per filesystem directory that contains files.
 """)
 
 stream per filesystem directory that contains files.
 """)
 
-upload_opts.add_argument('--project-uuid', metavar='UUID', help="""
-Store the collection in the specified project, instead of your Home
-project.
-""")
-
-upload_opts.add_argument('--name', help="""
-Save the collection with the specified name, rather than the default
-generic name "Saved at {time} by {username}@{host}".
+_group.add_argument('--normalize', action='store_true',
+                    help="""
+Normalize the manifest by re-ordering files and streams after writing
+data.
 """)
 
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
 """)
 
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
-                   help="""
+                    help="""
 Synonym for --stream.
 """)
 
 _group.add_argument('--stream', action='store_true',
 Synonym for --stream.
 """)
 
 _group.add_argument('--stream', action='store_true',
-                   help="""
+                    help="""
 Store the file content and display the resulting manifest on
 stdout. Do not write the manifest to Keep or save a Collection object
 in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
 Store the file content and display the resulting manifest on
 stdout. Do not write the manifest to Keep or save a Collection object
 in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
-                   help="""
+                    help="""
 Synonym for --manifest.
 """)
 
 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
 Synonym for --manifest.
 """)
 
 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
-                   help="""
+                    help="""
 Synonym for --manifest.
 """)
 
 _group.add_argument('--manifest', action='store_true',
 Synonym for --manifest.
 """)
 
 _group.add_argument('--manifest', action='store_true',
-                   help="""
+                    help="""
 Store the file data and resulting manifest in Keep, save a Collection
 object in Arvados, and display the manifest locator (Collection uuid)
 on stdout. This is the default behavior.
 """)
 
 _group.add_argument('--as-raw', action='store_true', dest='raw',
 Store the file data and resulting manifest in Keep, save a Collection
 object in Arvados, and display the manifest locator (Collection uuid)
 on stdout. This is the default behavior.
 """)
 
 _group.add_argument('--as-raw', action='store_true', dest='raw',
-                   help="""
+                    help="""
 Synonym for --raw.
 """)
 
 _group.add_argument('--raw', action='store_true',
 Synonym for --raw.
 """)
 
 _group.add_argument('--raw', action='store_true',
-                   help="""
+                    help="""
 Store the file content and display the data block locators on stdout,
 separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
 upload_opts.add_argument('--use-filename', type=str, default=None,
 Store the file content and display the data block locators on stdout,
 separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
 upload_opts.add_argument('--use-filename', type=str, default=None,
-                    dest='filename', help="""
+                         dest='filename', help="""
 Synonym for --filename.
 """)
 
 upload_opts.add_argument('--filename', type=str, default=None,
 Synonym for --filename.
 """)
 
 upload_opts.add_argument('--filename', type=str, default=None,
-                    help="""
+                         help="""
 Use the given filename in the manifest, instead of the name of the
 local file. This is useful when "-" or "/dev/stdin" is given as an
 input file. It can be used only if there is exactly one path given and
 Use the given filename in the manifest, instead of the name of the
 local file. This is useful when "-" or "/dev/stdin" is given as an
 input file. It can be used only if there is exactly one path given and
@@ -107,45 +105,55 @@ it is not a directory. Implies --manifest.
 """)
 
 upload_opts.add_argument('--portable-data-hash', action='store_true',
 """)
 
 upload_opts.add_argument('--portable-data-hash', action='store_true',
-                    help="""
+                         help="""
 Print the portable data hash instead of the Arvados UUID for the collection
 created by the upload.
 """)
 
 run_opts = argparse.ArgumentParser(add_help=False)
 Print the portable data hash instead of the Arvados UUID for the collection
 created by the upload.
 """)
 
 run_opts = argparse.ArgumentParser(add_help=False)
+
+run_opts.add_argument('--project-uuid', metavar='UUID', help="""
+Store the collection in the specified project, instead of your Home
+project.
+""")
+
+run_opts.add_argument('--name', help="""
+Save the collection with the specified name.
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
-                   help="""
+                    help="""
 Display human-readable progress on stderr (bytes and, if possible,
 percentage of total data size). This is the default behavior when
 stderr is a tty.
 """)
 
 _group.add_argument('--no-progress', action='store_true',
 Display human-readable progress on stderr (bytes and, if possible,
 percentage of total data size). This is the default behavior when
 stderr is a tty.
 """)
 
 _group.add_argument('--no-progress', action='store_true',
-                   help="""
+                    help="""
 Do not display human-readable progress on stderr, even if stderr is a
 tty.
 """)
 
 _group.add_argument('--batch-progress', action='store_true',
 Do not display human-readable progress on stderr, even if stderr is a
 tty.
 """)
 
 _group.add_argument('--batch-progress', action='store_true',
-                   help="""
+                    help="""
 Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
 Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
-                   help="""
+                    help="""
 Continue interrupted uploads from cached state (default).
 """)
 _group.add_argument('--no-resume', action='store_false', dest='resume',
 Continue interrupted uploads from cached state (default).
 """)
 _group.add_argument('--no-resume', action='store_false', dest='resume',
-                   help="""
+                    help="""
 Do not continue interrupted uploads from cached state.
 """)
 
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
 Do not continue interrupted uploads from cached state.
 """)
 
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
-    parents=[upload_opts, run_opts])
+    parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 
 def parse_arguments(arguments):
     args = arg_parser.parse_args(arguments)
 
 def parse_arguments(arguments):
     args = arg_parser.parse_args(arguments)
@@ -246,23 +254,26 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
                    ['bytes_written', '_seen_inputs'])
 
     def __init__(self, cache=None, reporter=None, bytes_expected=None,
                    ['bytes_written', '_seen_inputs'])
 
     def __init__(self, cache=None, reporter=None, bytes_expected=None,
-                 api_client=None):
+                 api_client=None, num_retries=0):
         self.bytes_written = 0
         self._seen_inputs = []
         self.cache = cache
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
         self._seen_inputs = []
         self.cache = cache
         self.reporter = reporter
         self.bytes_expected = bytes_expected
-        super(ArvPutCollectionWriter, self).__init__(api_client)
+        super(ArvPutCollectionWriter, self).__init__(
+            api_client, num_retries=num_retries)
 
     @classmethod
 
     @classmethod
-    def from_cache(cls, cache, reporter=None, bytes_expected=None):
+    def from_cache(cls, cache, reporter=None, bytes_expected=None,
+                   num_retries=0):
         try:
             state = cache.load()
             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
         try:
             state = cache.load()
             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state, cache, reporter, bytes_expected)
+            writer = cls.from_state(state, cache, reporter, bytes_expected,
+                                    num_retries=num_retries)
         except (TypeError, ValueError,
                 arvados.errors.StaleWriterStateError) as error:
         except (TypeError, ValueError,
                 arvados.errors.StaleWriterStateError) as error:
-            return cls(cache, reporter, bytes_expected)
+            return cls(cache, reporter, bytes_expected, num_retries=num_retries)
         else:
             return writer
 
         else:
             return writer
 
@@ -284,12 +295,12 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
 
     def flush_data(self):
         start_buffer_len = self._data_buffer_len
 
     def flush_data(self):
         start_buffer_len = self._data_buffer_len
-        start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
+        start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
         super(ArvPutCollectionWriter, self).flush_data()
         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
             self.bytes_written += (start_buffer_len - self._data_buffer_len)
             self.report_progress()
         super(ArvPutCollectionWriter, self).flush_data()
         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
             self.bytes_written += (start_buffer_len - self._data_buffer_len)
             self.report_progress()
-            if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+            if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
                 self.cache_state()
 
     def _record_new_input(self, input_type, source_name, dest_name):
                 self.cache_state()
 
     def _record_new_input(self, input_type, source_name, dest_name):
@@ -348,57 +359,48 @@ def progress_writer(progress_func, outfile=sys.stderr):
 def exit_signal_handler(sigcode, frame):
     sys.exit(-sigcode)
 
 def exit_signal_handler(sigcode, frame):
     sys.exit(-sigcode)
 
-def check_project_exists(project_uuid):
-    try:
-        api_client.groups().get(uuid=project_uuid).execute()
-    except (apiclient.errors.Error, arvados.errors.NotFoundError) as error:
-        raise ValueError("Project {} not found ({})".format(project_uuid,
-                                                            error))
+def desired_project_uuid(api_client, project_uuid, num_retries):
+    if not project_uuid:
+        query = api_client.users().current()
+    elif arvados.util.user_uuid_pattern.match(project_uuid):
+        query = api_client.users().get(uuid=project_uuid)
+    elif arvados.util.group_uuid_pattern.match(project_uuid):
+        query = api_client.groups().get(uuid=project_uuid)
     else:
     else:
-        return True
-
-def prep_project_link(args, stderr, project_exists=check_project_exists):
-    # Given the user's command line arguments, return a dictionary with data
-    # to create the desired project link for this Collection, or None.
-    # Raises ValueError if the arguments request something impossible.
-    making_collection = not (args.raw or args.stream)
-    if not making_collection:
-        if args.name or args.project_uuid:
-            raise ValueError("Requested a Link without creating a Collection")
-        return None
-    link = {'tail_uuid': args.project_uuid,
-            'link_class': 'name',
-            'name': args.name}
-    if not link['tail_uuid']:
-        link['tail_uuid'] = api_client.users().current().execute()['uuid']
-    elif not project_exists(link['tail_uuid']):
-        raise ValueError("Project {} not found".format(args.project_uuid))
-    if not link['name']:
-        link['name'] = "Saved at {} by {}@{}".format(
-            datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
-            pwd.getpwuid(os.getuid()).pw_name,
-            socket.gethostname())
-        stderr.write(
-            "arv-put: No --name specified. Saving as \"%s\"\n" % link['name'])
-    link['owner_uuid'] = link['tail_uuid']
-    return link
-
-def create_project_link(locator, link):
-    link['head_uuid'] = locator
-    return api_client.links().create(body=link).execute()
+        raise ValueError("Not a valid project UUID: {}".format(project_uuid))
+    return query.execute(num_retries=num_retries)['uuid']
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
+
+    args = parse_arguments(arguments)
+    status = 0
     if api_client is None:
         api_client = arvados.api('v1')
     if api_client is None:
         api_client = arvados.api('v1')
-    status = 0
 
 
-    args = parse_arguments(arguments)
+    # Determine the name to use
+    if args.name:
+        if args.stream or args.raw:
+            print >>stderr, "Cannot use --name with --stream or --raw"
+            sys.exit(1)
+        collection_name = args.name
+    else:
+        collection_name = "Saved at {} by {}@{}".format(
+            datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
+            pwd.getpwuid(os.getuid()).pw_name,
+            socket.gethostname())
+
+    if args.project_uuid and (args.stream or args.raw):
+        print >>stderr, "Cannot use --project-uuid with --stream or --raw"
+        sys.exit(1)
+
+    # Determine the parent project
     try:
     try:
-        project_link = prep_project_link(args, stderr)
-    except ValueError as error:
-        print >>stderr, "arv-put: {}.".format(error)
-        sys.exit(2)
+        project_uuid = desired_project_uuid(api_client, args.project_uuid,
+                                            args.retries)
+    except (apiclient_errors.Error, ValueError) as error:
+        print >>stderr, error
+        sys.exit(1)
 
     if args.progress:
         reporter = progress_writer(human_progress)
 
     if args.progress:
         reporter = progress_writer(human_progress)
@@ -421,10 +423,11 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             sys.exit(1)
 
     if resume_cache is None:
             sys.exit(1)
 
     if resume_cache is None:
-        writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
+        writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
+                                        num_retries=args.retries)
     else:
         writer = ArvPutCollectionWriter.from_cache(
     else:
         writer = ArvPutCollectionWriter.from_cache(
-            resume_cache, reporter, bytes_expected)
+            resume_cache, reporter, bytes_expected, num_retries=args.retries)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
@@ -452,35 +455,37 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     if args.stream:
         output = writer.manifest_text()
 
     if args.stream:
         output = writer.manifest_text()
+        if args.normalize:
+            output = CollectionReader(output).manifest_text(normalize=True)
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
-        # Register the resulting collection in Arvados.
-        collection = api_client.collections().create(
-            body={
-                'manifest_text': writer.manifest_text(),
-                'owner_uuid': project_link['tail_uuid']
-                },
-            ).execute()
-
-        if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
-            output = collection['portable_data_hash']
-        else:
-            output = collection['uuid']
-
-        if project_link is not None:
-            # Update collection name
-            try:
-                if 'name' in collection:
-                    arvados.api().collections().update(uuid=collection['uuid'],
-                                                       body={"name": project_link["name"]}).execute()
-                else:
-                    create_project_link(output, project_link)
-            except apiclient.errors.Error as error:
-                print >>stderr, (
-                    "arv-put: Error adding Collection to project: {}.".format(
-                        error))
-                status = 1
+        try:
+            manifest_text = writer.manifest_text()
+            if args.normalize:
+                manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True)
+            # Register the resulting collection in Arvados.
+            collection = api_client.collections().create(
+                body={
+                    'owner_uuid': project_uuid,
+                    'name': collection_name,
+                    'manifest_text': manifest_text
+                    },
+                ensure_unique_name=True
+                ).execute(num_retries=args.retries)
+
+            print >>stderr, "Collection saved as '%s'" % collection['name']
+
+            if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
+                output = collection['portable_data_hash']
+            else:
+                output = collection['uuid']
+
+        except apiclient_errors.Error as error:
+            print >>stderr, (
+                "arv-put: Error creating Collection on project: {}.".format(
+                    error))
+            status = 1
 
     # Print the locator (uuid) of the new collection.
     stdout.write(output)
 
     # Print the locator (uuid) of the new collection.
     stdout.write(output)