Merge branch 'master' into 1646-arv-put
[arvados.git] / sdk / python / arvados.py
index 7af06096702ea41a3a0109325e1ab968b9bc9271..8208e836672461aeadfc31d7788ab2e735c8a2c0 100644 (file)
@@ -24,6 +24,20 @@ from apiclient.discovery import build
 if 'ARVADOS_DEBUG' in os.environ:
     logging.basicConfig(level=logging.DEBUG)
 
+class errors:
+    class SyntaxError(Exception):
+        pass
+    class AssertionError(Exception):
+        pass
+    class NotFoundError(Exception):
+        pass
+    class CommandFailedError(Exception):
+        pass
+    class KeepWriteError(Exception):
+        pass
+    class NotImplementedError(Exception):
+        pass
+
 class CredentialsFromEnv(object):
     @staticmethod
     def http_request(self, uri, **kwargs):
@@ -177,8 +191,9 @@ class util:
         p = subprocess.Popen(execargs, **kwargs)
         stdoutdata, stderrdata = p.communicate(None)
         if p.returncode != 0:
-            raise Exception("run_command %s exit %d:\n%s" %
-                            (execargs, p.returncode, stderrdata))
+            raise errors.CommandFailedError(
+                "run_command %s exit %d:\n%s" %
+                (execargs, p.returncode, stderrdata))
         return stdoutdata, stderrdata
 
     @staticmethod
@@ -244,8 +259,8 @@ class util:
                 elif re.search('\.tar$', f.name()):
                     p = util.tar_extractor(path, '')
                 else:
-                    raise Exception("tarball_extract cannot handle filename %s"
-                                    % f.name())
+                    raise errors.AssertionError(
+                        "tarball_extract cannot handle filename %s" % f.name())
                 while True:
                     buf = f.read(2**20)
                     if len(buf) == 0:
@@ -255,7 +270,8 @@ class util:
                 p.wait()
                 if p.returncode != 0:
                     lockfile.close()
-                    raise Exception("tar exited %d" % p.returncode)
+                    raise errors.CommandFailedError(
+                        "tar exited %d" % p.returncode)
             os.symlink(tarball, os.path.join(path, '.locator'))
         tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
         lockfile.close()
@@ -299,8 +315,8 @@ class util:
 
             for f in CollectionReader(zipball).all_files():
                 if not re.search('\.zip$', f.name()):
-                    raise Exception("zipball_extract cannot handle filename %s"
-                                    % f.name())
+                    raise errors.NotImplementedError(
+                        "zipball_extract cannot handle filename %s" % f.name())
                 zip_filename = os.path.join(path, os.path.basename(f.name()))
                 zip_file = open(zip_filename, 'wb')
                 while True:
@@ -320,7 +336,8 @@ class util:
                 p.wait()
                 if p.returncode != 0:
                     lockfile.close()
-                    raise Exception("unzip exited %d" % p.returncode)
+                    raise errors.CommandFailedError(
+                        "unzip exited %d" % p.returncode)
                 os.unlink(zip_filename)
             os.symlink(zipball, os.path.join(path, '.locator'))
         tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
@@ -384,7 +401,10 @@ class util:
                         outfile.write(buf)
                     outfile.close()
         if len(files_got) < len(files):
-            raise Exception("Wanted files %s but only got %s from %s" % (files, files_got, map(lambda z: z.name(), list(CollectionReader(collection).all_files()))))
+            raise errors.AssertionError(
+                "Wanted files %s but only got %s from %s" %
+                (files, files_got,
+                 [z.name() for z in CollectionReader(collection).all_files()]))
         os.symlink(collection_hash, os.path.join(path, '.locator'))
 
         lockfile.close()
@@ -435,9 +455,9 @@ class util:
                     outfile.write(buf)
                 outfile.close()
         if len(files_got) < len(files):
-            raise Exception("Wanted files %s but only got %s from %s" %
-                            (files, files_got, map(lambda z: z.name(),
-                                                   list(stream.all_files()))))
+            raise errors.AssertionError(
+                "Wanted files %s but only got %s from %s" %
+                (files, files_got, [z.name() for z in stream.all_files()]))
         lockfile.close()
         return path
 
@@ -546,7 +566,7 @@ class StreamReader(object):
                 pos, size, name = tok.split(':',2)
                 self.files += [[int(pos), int(size), name]]
             else:
-                raise Exception("Invalid manifest format")
+                raise errors.SyntaxError("Invalid manifest format")
 
     def tokens(self):
         return self._tokens
@@ -737,7 +757,9 @@ class CollectionWriter(object):
     def set_current_file_name(self, newfilename):
         newfilename = re.sub(r' ', '\\\\040', newfilename)
         if re.search(r'[ \t\n]', newfilename):
-            raise AssertionError("Manifest filenames cannot contain whitespace")
+            raise errors.AssertionError(
+                "Manifest filenames cannot contain whitespace: %s" %
+                newfilename)
         self._current_file_name = newfilename
     def current_file_name(self):
         return self._current_file_name
@@ -745,7 +767,12 @@ class CollectionWriter(object):
         if self._current_file_name == None:
             if self._current_file_pos == self._current_stream_length:
                 return
-            raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
+            raise errors.AssertionError(
+                "Cannot finish an unnamed file " +
+                "(%d bytes at offset %d in '%s' stream)" %
+                (self._current_stream_length - self._current_file_pos,
+                 self._current_file_pos,
+                 self._current_stream_name))
         self._current_stream_files += [[self._current_file_pos,
                                        self._current_stream_length - self._current_file_pos,
                                        self._current_file_name]]
@@ -755,7 +782,8 @@ class CollectionWriter(object):
         self.set_current_stream_name(newstreamname)
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[ \t\n]', newstreamname):
-            raise AssertionError("Manifest stream names cannot contain whitespace")
+            raise errors.AssertionError(
+                "Manifest stream names cannot contain whitespace")
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
     def current_stream_name(self):
         return self._current_stream_name
@@ -765,7 +793,9 @@ class CollectionWriter(object):
         if len(self._current_stream_files) == 0:
             pass
         elif self._current_stream_name == None:
-            raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
+            raise errors.AssertionError(
+                "Cannot finish an unnamed stream (%d bytes in %d files)" %
+                (self._current_stream_length, len(self._current_stream_files)))
         else:
             self._finished_streams += [[self._current_stream_name,
                                        self._current_stream_locators,
@@ -794,6 +824,11 @@ class CollectionWriter(object):
                 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
             manifest += "\n"
         return manifest
+    def data_locators(self):
+        ret = []
+        for name, locators, files in self._finished_streams:
+            ret += locators
+        return ret
 
 global_client_object = None
 
@@ -957,7 +992,7 @@ class KeepClient(object):
             except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
                 logging.info("Request fail: GET %s => %s: %s" %
                              (url, type(e), str(e)))
-        raise Exception("Not found: %s" % expect_hash)
+        raise errors.NotFoundError("Block not found: %s" % expect_hash)
 
     def put(self, data, **kwargs):
         if 'KEEP_LOCAL_STORE' in os.environ:
@@ -983,8 +1018,9 @@ class KeepClient(object):
         have_copies = thread_limiter.done()
         if have_copies == want_copies:
             return (data_hash + '+' + str(len(data)))
-        raise Exception("Write fail for %s: wanted %d but wrote %d" %
-                        (data_hash, want_copies, have_copies))
+        raise errors.KeepWriteError(
+            "Write fail for %s: wanted %d but wrote %d" %
+            (data_hash, want_copies, have_copies))
 
     @staticmethod
     def sign_for_old_server(data_hash, data):
@@ -1006,7 +1042,8 @@ class KeepClient(object):
     def local_store_get(locator):
         r = re.search('^([0-9a-f]{32,})', locator)
         if not r:
-            raise Exception("Keep.get: invalid data locator '%s'" % locator)
+            raise errors.NotFoundError(
+                "Invalid data locator: '%s'" % locator)
         if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
             return ''
         with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: