Merge branch 'master' of git.curoverse.com:arvados refs #2751
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 27 May 2014 19:50:04 +0000 (15:50 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 27 May 2014 19:50:04 +0000 (15:50 -0400)
sdk/go/src/arvados.org/keepclient/keepclient.go
sdk/go/src/arvados.org/keepclient/keepclient_test.go
sdk/go/src/arvados.org/keepclient/support.go
sdk/python/arvados/api.py
sdk/python/arvados/errors.py
sdk/python/arvados/keep.py
sdk/python/run_test_server.py
sdk/python/test_keep_client.py
services/keep/src/arvados.org/keepproxy/keepproxy.go
services/keep/src/arvados.org/keepproxy/keepproxy_test.go
services/keep/src/keep/keep.go

index 8d26b32790dc4f5f164d526332163bce80162b38..ee91d6f1a178705223439417391188d13b97d76a 100644 (file)
@@ -9,9 +9,12 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "log"
        "net/http"
        "os"
+       "regexp"
        "sort"
+       "strings"
        "sync"
        "sync/atomic"
        "unsafe"
@@ -77,13 +80,13 @@ func MakeKeepClient() (kc KeepClient, err error) {
 // KeepClient.Want_replicas.  Returns the number of replicas that were written
 // and if there was an error.  Note this will return InsufficientReplias
 // whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) {
+func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
 
        // Buffer for reads from 'r'
        var bufsize int
        if expectedLength > 0 {
                if expectedLength > BLOCKSIZE {
-                       return 0, OversizeBlockError
+                       return "", 0, OversizeBlockError
                }
                bufsize = int(expectedLength)
        } else {
@@ -100,7 +103,7 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
 // replicas is given in KeepClient.Want_replicas.  Returns the number of
 // replicas that were written and if there was an error.  Note this will return
 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
+func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
        t := streamer.AsyncStreamFromSlice(buf)
        defer t.Close()
 
@@ -111,19 +114,18 @@ func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error)
 // of replicas is given in KeepClient.Want_replicas.  Returns the number of
 // replicas that were written and if there was an error.  Note this will return
 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) {
-       hash = fmt.Sprintf("%x", md5.Sum(buffer))
-       replicas, err = this.PutHB(hash, buffer)
-       return hash, replicas, err
+func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
+       hash := fmt.Sprintf("%x", md5.Sum(buffer))
+       return this.PutHB(hash, buffer)
 }
 
 // Put a block, given a Reader.  This will read the entire reader into a buffer
-// to computed the hash.  The desired number of replicas is given in
+// to compute the hash.  The desired number of replicas is given in
 // KeepClient.Want_replicas.  Returns the number of replicas that were written
 // and if there was an error.  Note this will return InsufficientReplias
 // whenever 0 <= replicas < this.Wants_replicas.  Also nhote that if the block
 // hash and data size are available, PutHR() is more efficient.
-func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) {
+func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
        if buffer, err := ioutil.ReadAll(r); err != nil {
                return "", 0, err
        } else {
@@ -243,3 +245,49 @@ func (this *KeepClient) SetServiceRoots(svc []string) {
        atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
                unsafe.Pointer(&roots))
 }
+
+type Locator struct {
+       Hash      string
+       Size      int
+       Signature string
+       Timestamp string
+}
+
+func MakeLocator2(hash string, hints string) (locator Locator) {
+       locator.Hash = hash
+       if hints != "" {
+               signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
+               for _, hint := range strings.Split(hints, "+") {
+                       if hint != "" {
+                               if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
+                                       fmt.Sscanf(hint, "%d", &locator.Size)
+                               } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
+                                       locator.Signature = m[1]
+                                       locator.Timestamp = m[2]
+                               } else if match, _ := regexp.MatchString("^[:upper:]", hint); match {
+                                       // Any unknown hint that starts with an uppercase letter is
+                                       // presumed to be valid and ignored, to permit forward compatibility.
+                               } else {
+                                       // Unknown format; not a valid locator.
+                                       return Locator{"", 0, "", ""}
+                               }
+                       }
+               }
+       }
+       return locator
+}
+
+func MakeLocator(path string) Locator {
+       pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
+       if err != nil {
+               log.Print("Don't like regexp", err)
+       }
+
+       sm := pathpattern.FindStringSubmatch(path)
+       if sm == nil {
+               log.Print("Failed match ", path)
+               return Locator{"", 0, "", ""}
+       }
+
+       return MakeLocator2(sm[1], sm[2])
+}
index 291d8f896c2aa5e19c5f3651c3c478ebe7d68152..8eedadd64b8a87d79a1a7f9dfe0fc9eb478138e1 100644 (file)
@@ -163,7 +163,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 
                        <-st.handled
                        status := <-upload_status
-                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
+                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
                })
 
        log.Printf("TestUploadToStubKeepServer done")
@@ -196,7 +196,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                        <-st.handled
 
                        status := <-upload_status
-                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
+                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
                })
 
        log.Printf("TestUploadToStubKeepServerBufferReader done")
@@ -386,7 +386,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
        <-fh.handled
 
        c.Check(err, Equals, nil)
-       c.Check(phash, Equals, hash)
+       c.Check(phash, Equals, "")
        c.Check(replicas, Equals, 2)
        c.Check(<-st.handled, Equals, shuff[1])
        c.Check(<-st.handled, Equals, shuff[2])
@@ -598,7 +598,7 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
        }
        {
                hash2, replicas, err := kc.PutB([]byte("foo"))
-               c.Check(hash2, Equals, hash)
+               c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3))
                c.Check(replicas, Equals, 2)
                c.Check(err, Equals, nil)
        }
@@ -687,3 +687,12 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
 
        log.Printf("TestPutProxy done")
 }
+
+func (s *StandaloneSuite) TestMakeLocator(c *C) {
+       l := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
+
+       c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
+       c.Check(l.Size, Equals, 3)
+       c.Check(l.Signature, Equals, "abcde")
+       c.Check(l.Timestamp, Equals, "12345678")
+}
index 913f7c78ffbeb2b9d37e446f0e6aca8ba3a3d8ed..b1d59f06d35db5c0ccb2ca36e8d586d1fd7326cd 100644 (file)
@@ -7,10 +7,12 @@ import (
        "errors"
        "fmt"
        "io"
+       "io/ioutil"
        "log"
        "net/http"
        "os"
        "strconv"
+       "strings"
 )
 
 type keepDisk struct {
@@ -154,6 +156,7 @@ type uploadStatus struct {
        url             string
        statusCode      int
        replicas_stored int
+       response        string
 }
 
 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
@@ -165,7 +168,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
        var err error
        var url = fmt.Sprintf("%s/%s", host, hash)
        if req, err = http.NewRequest("PUT", url, nil); err != nil {
-               upload_status <- uploadStatus{err, url, 0, 0}
+               upload_status <- uploadStatus{err, url, 0, 0, ""}
                body.Close()
                return
        }
@@ -185,7 +188,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 
        var resp *http.Response
        if resp, err = this.Client.Do(req); err != nil {
-               upload_status <- uploadStatus{err, url, 0, 0}
+               upload_status <- uploadStatus{err, url, 0, 0, ""}
                body.Close()
                return
        }
@@ -195,17 +198,25 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
                fmt.Sscanf(xr, "%d", &rep)
        }
 
+       respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+       if err2 != nil && err2 != io.EOF {
+               upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
+               return
+       }
+
+       locator := strings.TrimSpace(string(respbody))
+
        if resp.StatusCode == http.StatusOK {
-               upload_status <- uploadStatus{nil, url, resp.StatusCode, rep}
+               upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
        } else {
-               upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep}
+               upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, locator}
        }
 }
 
 func (this KeepClient) putReplicas(
        hash string,
        tr *streamer.AsyncStream,
-       expectedLength int64) (replicas int, err error) {
+       expectedLength int64) (locator string, replicas int, err error) {
 
        // Calculate the ordering for uploading to servers
        sv := this.shuffledServiceRoots(hash)
@@ -233,7 +244,7 @@ func (this KeepClient) putReplicas(
                                active += 1
                        } else {
                                if active == 0 {
-                                       return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+                                       return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
                                } else {
                                        break
                                }
@@ -245,6 +256,7 @@ func (this KeepClient) putReplicas(
                if status.statusCode == 200 {
                        // good news!
                        remaining_replicas -= status.replicas_stored
+                       locator = status.response
                } else {
                        // writing to keep server failed for some reason
                        log.Printf("Keep server put to %v failed with '%v'",
@@ -254,5 +266,5 @@ func (this KeepClient) putReplicas(
                log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
        }
 
-       return this.Want_replicas, nil
+       return locator, this.Want_replicas, nil
 }
index 30acdc4f5d4d1f04afe31d80b21169831b668b75..699c319651fed54318fc7957410d1e78ed1eafa3 100644 (file)
@@ -19,6 +19,10 @@ class CredentialsFromEnv(object):
         from httplib import BadStatusLine
         if 'headers' not in kwargs:
             kwargs['headers'] = {}
+
+        if config.get("ARVADOS_EXTERNAL_CLIENT", "") == "true":
+            kwargs['headers']['X-External-Client'] = '1'
+
         kwargs['headers']['Authorization'] = 'OAuth2 %s' % config.get('ARVADOS_API_TOKEN', 'ARVADOS_API_TOKEN_not_set')
         try:
             return self.orig_http_request(uri, **kwargs)
@@ -89,4 +93,3 @@ def api(version=None, cache=True):
             'arvados', apiVersion, http=http, discoveryServiceUrl=url)
         http.cache = None
     return services[version]
-
index e4c69a3c83dff24ebc70f1ce0212931b4b77ba06..b4afb21f1131f16199e0011952d418f30218765d 100644 (file)
@@ -14,3 +14,5 @@ class KeepWriteError(Exception):
     pass
 class NotImplementedError(Exception):
     pass
+class NoKeepServersError(Exception):
+    pass
index 686b940978b23c1907d19b7403481dad6e3964fb..a93c602f46221e919d805501796e5ba8327baf41 100644 (file)
@@ -74,7 +74,7 @@ class KeepClient(object):
             with self._done_lock:
                 return (self._done < self._todo)
 
-        def save_response(self, response_body):
+        def save_response(self, response_body, replicas_stored):
             """
             Records a response body (a locator, possibly signed) returned by
             the Keep server.  It is not necessary to save more than
@@ -82,7 +82,7 @@ class KeepClient(object):
             in response to a successful request is valid.
             """
             with self._done_lock:
-                self._done += 1
+                self._done += replicas_stored
                 self._response = response_body
 
         def response(self):
@@ -123,7 +123,14 @@ class KeepClient(object):
                 url = self.args['service_root'] + self.args['data_hash']
                 api_token = config.get('ARVADOS_API_TOKEN')
                 headers = {'Authorization': "OAuth2 %s" % api_token}
+
+                if self.args['using_proxy']:
+                    # We're using a proxy, so tell the proxy how many copies we
+                    # want it to store
+                    headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
+
                 try:
+                    logging.debug("Uploading to {}".format(url))
                     resp, content = h.request(url.encode('utf-8'), 'PUT',
                                               headers=headers,
                                               body=self.args['data'])
@@ -141,7 +148,18 @@ class KeepClient(object):
                                       (str(threading.current_thread()),
                                        self.args['data_hash'],
                                        self.args['service_root']))
-                        return limiter.save_response(content.strip())
+                        replicas_stored = 1
+                        if 'x-keep-replicas-stored' in resp:
+                            # Tick the 'done' counter for the number of replica
+                            # reported stored by the server, for the case that
+                            # we're talking to a proxy or other backend that
+                            # stores to multiple copies for us.
+                            try:
+                                replicas_stored = int(resp['x-keep-replicas-stored'])
+                            except ValueError:
+                                pass
+                        return limiter.save_response(content.strip(), replicas_stored)
+
                     logging.warning("Request fail: PUT %s => %s %s" %
                                     (url, resp['status'], content))
                 except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
@@ -155,21 +173,43 @@ class KeepClient(object):
         self._cache = []
         # default 256 megabyte cache
         self.cache_max = 256 * 1024 * 1024
+        self.using_proxy = False
 
     def shuffled_service_roots(self, hash):
         if self.service_roots == None:
             self.lock.acquire()
-            try:
-                keep_disks = arvados.api().keep_disks().list().execute()['items']
-                roots = (("http%s://%s:%d/" %
-                          ('s' if f['service_ssl_flag'] else '',
-                           f['service_host'],
-                           f['service_port']))
-                         for f in keep_disks)
-                self.service_roots = sorted(set(roots))
-                logging.debug(str(self.service_roots))
-            finally:
-                self.lock.release()
+
+            # Override normal keep disk lookup with an explict proxy
+            # configuration.
+            keep_proxy_env = config.get("ARVADOS_KEEP_PROXY")
+            if keep_proxy_env != None and len(keep_proxy_env) > 0:
+
+                if keep_proxy_env[-1:] != '/':
+                    keep_proxy_env += "/"
+                self.service_roots = [keep_proxy_env]
+                self.using_proxy = True
+            else:
+                try:
+                    try:
+                        keep_services = arvados.api().keep_services().accessible().execute()['items']
+                    except Exception:
+                        keep_services = arvados.api().keep_disks().list().execute()['items']
+
+                    if len(keep_services) == 0:
+                        raise arvados.errors.NoKeepServersError()
+
+                    if 'service_type' in keep_services[0] and keep_services[0]['service_type'] == 'proxy':
+                        self.using_proxy = True
+
+                    roots = (("http%s://%s:%d/" %
+                              ('s' if f['service_ssl_flag'] else '',
+                               f['service_host'],
+                               f['service_port']))
+                             for f in keep_services)
+                    self.service_roots = sorted(set(roots))
+                    logging.debug(str(self.service_roots))
+                finally:
+                    self.lock.release()
 
         # Build an ordering with which to query the Keep servers based on the
         # contents of the hash.
@@ -352,14 +392,16 @@ class KeepClient(object):
             t = KeepClient.KeepWriterThread(data=data,
                                             data_hash=data_hash,
                                             service_root=service_root,
-                                            thread_limiter=thread_limiter)
+                                            thread_limiter=thread_limiter,
+                                            using_proxy=self.using_proxy,
+                                            want_copies=(want_copies if self.using_proxy else 1))
             t.start()
             threads += [t]
         for t in threads:
             t.join()
         have_copies = thread_limiter.done()
         # If we're done, return the response from Keep
-        if have_copies == want_copies:
+        if have_copies >= want_copies:
             return thread_limiter.response()
         raise arvados.errors.KeepWriteError(
             "Write fail for %s: wanted %d but wrote %d" %
index 4b65d72cd02ce8cd5446921223079a686dc23253..bdfdea9734c9960526f6edc9f0aeb7e8cd26f5e2 100644 (file)
@@ -127,7 +127,8 @@ def _start_keep(n, keep_args):
     keep0 = tempfile.mkdtemp()
     keep_cmd = ["bin/keep",
                 "-volumes={}".format(keep0),
-                "-listen=:{}".format(25107+n)]
+                "-listen=:{}".format(25107+n),
+                "-pid={}".format("tmp/keep{}.pid".format(n))]
 
     for arg, val in keep_args.iteritems():
         keep_cmd.append("{}={}".format(arg, val))
@@ -135,6 +136,7 @@ def _start_keep(n, keep_args):
     kp0 = subprocess.Popen(keep_cmd)
     with open("tmp/keep{}.pid".format(n), 'w') as f:
         f.write(str(kp0.pid))
+
     with open("tmp/keep{}.volume".format(n), 'w') as f:
         f.write(keep0)
 
@@ -148,7 +150,7 @@ def run_keep(blob_signing_key=None, enforce_permissions=False):
     else:
         os.environ["GOPATH"] = os.getcwd() + ":" + os.environ["GOPATH"]
 
-    subprocess.call(["go", "install", "keep"])
+    subprocess.call(["./go.sh", "install", "keep"])
 
     if not os.path.exists("tmp"):
         os.mkdir("tmp")
@@ -164,7 +166,6 @@ def run_keep(blob_signing_key=None, enforce_permissions=False):
     _start_keep(0, keep_args)
     _start_keep(1, keep_args)
 
-
     os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
     os.environ["ARVADOS_API_HOST_INSECURE"] = "true"
 
@@ -187,6 +188,7 @@ def _stop_keep(n):
     if os.path.exists("tmp/keep{}.volume".format(n)):
         with open("tmp/keep{}.volume".format(n), 'r') as r:
             shutil.rmtree(r.read(), True)
+        os.unlink("tmp/keep{}.volume".format(n))
     if os.path.exists("tmp/keep.blob_signing_key"):
         os.remove("tmp/keep.blob_signing_key")
 
@@ -197,8 +199,41 @@ def stop_keep():
     _stop_keep(0)
     _stop_keep(1)
 
-    shutil.rmtree("tmp", True)
+    os.chdir(cwd)
+
+def run_keep_proxy(auth):
+    stop_keep_proxy()
+
+    cwd = os.getcwd()
+    os.chdir(os.path.join(os.path.dirname(__file__), KEEP_SERVER_DIR))
+    if os.environ.get('GOPATH') == None:
+        os.environ["GOPATH"] = os.getcwd()
+    else:
+        os.environ["GOPATH"] = os.getcwd() + ":" + os.environ["GOPATH"]
+
+    subprocess.call(["./go.sh", "install", "arvados.org/keepproxy"])
 
+    if not os.path.exists("tmp"):
+        os.mkdir("tmp")
+
+    os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
+    os.environ["ARVADOS_API_HOST_INSECURE"] = "true"
+    os.environ["ARVADOS_API_TOKEN"] = fixture("api_client_authorizations")[auth]["api_token"]
+
+    kp0 = subprocess.Popen(["bin/keepproxy", "-pid=tmp/keepproxy.pid", "-listen=:{}".format(25101)])
+
+    authorize_with("admin")
+    api = arvados.api('v1', cache=False)
+    api.keep_services().create(body={"keep_service": {"service_host": "localhost",  "service_port": 25101, "service_type": "proxy"} }).execute()
+
+    arvados.config.settings()["ARVADOS_KEEP_PROXY"] = "http://localhost:25101"
+
+    os.chdir(cwd)
+
+def stop_keep_proxy():
+    cwd = os.getcwd()
+    os.chdir(os.path.join(os.path.dirname(__file__), KEEP_SERVER_DIR))
+    kill_server_pid("tmp/keepproxy.pid", 0)
     os.chdir(cwd)
 
 def fixture(fix):
@@ -234,5 +269,9 @@ if __name__ == "__main__":
         run_keep()
     elif args.action == 'stop_keep':
         stop_keep()
+    elif args.action == 'start_keep_proxy':
+        run_keep_proxy("admin")
+    elif args.action == 'stop_keep_proxy':
+        stop_keep_proxy()
     else:
         print('Unrecognized action "{}", actions are "start", "stop", "start_keep", "stop_keep"'.format(args.action))
index f863ad3c01e03707ffd17b11f903bcbf7f15b40d..6d0470ad41237c935a8978322840722cdef5eda7 100644 (file)
@@ -10,15 +10,26 @@ import run_test_server
 class KeepTestCase(unittest.TestCase):
     @classmethod
     def setUpClass(cls):
+        super(KeepTestCase, cls).setUpClass()
         try:
             del os.environ['KEEP_LOCAL_STORE']
         except KeyError:
             pass
+
+        # Make sure these are clear, we want to talk to the Keep servers
+        # directly.
+        os.environ["ARVADOS_KEEP_PROXY"] = ""
+        os.environ["ARVADOS_EXTERNAL_CLIENT"] = ""
+
         run_test_server.run()
         run_test_server.run_keep()
+        arvados.keep.global_client_object = None
+        arvados.config._settings = None
+        run_test_server.authorize_with("admin")
 
     @classmethod
     def tearDownClass(cls):
+        super(KeepTestCase, cls).tearDownClass()
         run_test_server.stop()
         run_test_server.stop_keep()
 
@@ -73,6 +84,7 @@ class KeepPermissionTestCase(unittest.TestCase):
             del os.environ['KEEP_LOCAL_STORE']
         except KeyError:
             pass
+
         run_test_server.run()
         run_test_server.run_keep(blob_signing_key='abcdefghijk0123456789',
                                  enforce_permissions=True)
@@ -186,3 +198,72 @@ class KeepOptionalPermission(unittest.TestCase):
         self.assertEqual(arvados.Keep.get("acbd18db4cc2f85cedef654fccc4a4d8"),
                          'foo',
                          'wrong content from Keep.get(md5("foo"))')
+
+
+class KeepProxyTestCase(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        super(KeepProxyTestCase, cls).setUpClass()
+
+        try:
+            del os.environ['KEEP_LOCAL_STORE']
+        except KeyError:
+            pass
+
+        os.environ["ARVADOS_KEEP_PROXY"] = ""
+        os.environ["ARVADOS_EXTERNAL_CLIENT"] = ""
+
+        run_test_server.run()
+        run_test_server.run_keep()
+        arvados.keep.global_client_object = None
+        arvados.config._settings = None
+        run_test_server.run_keep_proxy("admin")
+        KeepProxyTestCase.arvados_keep_proxy = arvados.config.get("ARVADOS_KEEP_PROXY")
+
+    @classmethod
+    def tearDownClass(cls):
+        super(KeepProxyTestCase, cls).tearDownClass()
+        run_test_server.stop()
+        run_test_server.stop_keep()
+        run_test_server.stop_keep_proxy()
+
+    def test_KeepProxyTest1(self):
+        # Will use ARVADOS_KEEP_PROXY environment variable that is set by
+        # run_keep_proxy() in setUpClass()
+
+        os.environ["ARVADOS_KEEP_PROXY"] = KeepProxyTestCase.arvados_keep_proxy
+        os.environ["ARVADOS_EXTERNAL_CLIENT"] = ""
+        arvados.keep.global_client_object = None
+        arvados.config._settings = None
+
+        baz_locator = arvados.Keep.put('baz')
+        self.assertEqual(baz_locator,
+                         '73feffa4b7f6bb68e44cf984c85f6e88+3',
+                         'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
+        self.assertEqual(arvados.Keep.get(baz_locator),
+                         'baz',
+                         'wrong content from Keep.get(md5("baz"))')
+
+        self.assertEqual(True, arvados.Keep.global_client_object().using_proxy)
+
+    def test_KeepProxyTest2(self):
+        # We don't want to use ARVADOS_KEEP_PROXY from run_keep_proxy() in
+        # setUpClass(), so clear it and set ARVADOS_EXTERNAL_CLIENT which will
+        # contact the API server.
+        os.environ["ARVADOS_KEEP_PROXY"] = ""
+        os.environ["ARVADOS_EXTERNAL_CLIENT"] = "true"
+        arvados.keep.global_client_object = None
+        arvados.config._settings = None
+
+        # Will send X-External-Client to server and get back the proxy from
+        # keep_services/accessible
+
+        baz_locator = arvados.Keep.put('baz2')
+        self.assertEqual(baz_locator,
+                         '91f372a266fe2bf2823cb8ec7fda31ce+4',
+                         'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
+        self.assertEqual(arvados.Keep.get(baz_locator),
+                         'baz2',
+                         'wrong content from Keep.get(md5("baz2"))')
+
+        self.assertEqual(True, arvados.Keep.global_client_object().using_proxy)
index 38e14fd2832f10d01d2997b6b66dd70f02585459..414835c0e249cd9771f475a087793d6f923bd5bb 100644 (file)
@@ -10,7 +10,9 @@ import (
        "net"
        "net/http"
        "os"
+       "os/signal"
        "sync"
+       "syscall"
        "time"
 )
 
@@ -89,10 +91,36 @@ func main() {
 
        go RefreshServicesList(&kc)
 
+       // Shut down the server gracefully (by closing the listener)
+       // if SIGTERM is received.
+       term := make(chan os.Signal, 1)
+       go func(sig <-chan os.Signal) {
+               s := <-sig
+               log.Println("caught signal:", s)
+               listener.Close()
+       }(term)
+       signal.Notify(term, syscall.SIGTERM)
+
+       if pidfile != "" {
+               f, err := os.Create(pidfile)
+               if err == nil {
+                       fmt.Fprint(f, os.Getpid())
+                       f.Close()
+               } else {
+                       log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
+               }
+       }
+
        log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
 
        // Start listening for requests.
        http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
+
+       log.Println("shutting down")
+
+       if pidfile != "" {
+               os.Remove(pidfile)
+       }
 }
 
 type ApiTokenCache struct {
@@ -233,16 +261,13 @@ func MakeRESTRouter(
        rest := mux.NewRouter()
 
        if enable_get {
-               gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
-               ghsig := rest.Handle(
-                       `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
-                       GetBlockHandler{kc, t})
-
-               gh.Methods("GET", "HEAD")
-               ghsig.Methods("GET", "HEAD")
+               rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
+                       GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+               rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
        }
 
        if enable_put {
+               rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
                rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
        }
 
@@ -261,8 +286,9 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
        kc := *this.KeepClient
 
        hash := mux.Vars(req)["hash"]
-       signature := mux.Vars(req)["signature"]
-       timestamp := mux.Vars(req)["timestamp"]
+       hints := mux.Vars(req)["hints"]
+
+       locator := keepclient.MakeLocator2(hash, hints)
 
        log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
 
@@ -276,10 +302,10 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
        var blocklen int64
 
        if req.Method == "GET" {
-               reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
+               reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
                defer reader.Close()
        } else if req.Method == "HEAD" {
-               blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
+               blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
        }
 
        resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
@@ -314,6 +340,9 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
        kc := *this.KeepClient
 
        hash := mux.Vars(req)["hash"]
+       hints := mux.Vars(req)["hints"]
+
+       locator := keepclient.MakeLocator2(hash, hints)
 
        var contentLength int64 = -1
        if req.Header.Get("Content-Length") != "" {
@@ -331,6 +360,11 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                return
        }
 
+       if locator.Size > 0 && int64(locator.Size) != contentLength {
+               http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
+               return
+       }
+
        if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
                http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
                return
@@ -346,7 +380,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
        }
 
        // Now try to put the block through
-       replicas, err := kc.PutHR(hash, req.Body, contentLength)
+       hash, replicas, err := kc.PutHR(hash, req.Body, contentLength)
 
        // Tell the client how many successful PUTs we accomplished
        resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
@@ -355,6 +389,10 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
        case nil:
                // Default will return http.StatusOK
                log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
+               n, err2 := io.WriteString(resp, hash)
+               if err2 != nil {
+                       log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
+               }
 
        case keepclient.OversizeBlockError:
                // Too much data
@@ -366,6 +404,10 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                        // client can decide if getting less than the number of
                        // replications it asked for is a fatal error.
                        // Default will return http.StatusOK
+                       n, err2 := io.WriteString(resp, hash)
+                       if err2 != nil {
+                               log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
+                       }
                } else {
                        http.Error(resp, "", http.StatusServiceUnavailable)
                }
index cedc61fbcca002b852c6897f1a2645766cf4ad7c..9e78223596cbaaea4ea561f26126a50d997dc3f0 100644 (file)
@@ -141,6 +141,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
        defer listener.Close()
 
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+       var hash2 string
 
        {
                _, _, err := kc.Ask(hash)
@@ -149,22 +150,24 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
        }
 
        {
-               hash2, rep, err := kc.PutB([]byte("foo"))
-               c.Check(hash2, Equals, hash)
+               var rep int
+               var err error
+               hash2, rep, err = kc.PutB([]byte("foo"))
+               c.Check(hash2, Equals, fmt.Sprintf("%s+3", hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
                log.Print("PutB")
        }
 
        {
-               blocklen, _, err := kc.Ask(hash)
+               blocklen, _, err := kc.Ask(hash2)
                c.Assert(err, Equals, nil)
                c.Check(blocklen, Equals, int64(3))
                log.Print("Ask 2")
        }
 
        {
-               reader, blocklen, _, err := kc.Get(hash)
+               reader, blocklen, _, err := kc.Get(hash2)
                c.Assert(err, Equals, nil)
                all, err := ioutil.ReadAll(reader)
                c.Check(all, DeepEquals, []byte("foo"))
@@ -193,7 +196,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
 
        {
                hash2, rep, err := kc.PutB([]byte("bar"))
-               c.Check(hash2, Equals, hash)
+               c.Check(hash2, Equals, "")
                c.Check(rep, Equals, 0)
                c.Check(err, Equals, keepclient.InsufficientReplicasError)
                log.Print("PutB")
@@ -232,7 +235,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
 
        {
                hash2, rep, err := kc.PutB([]byte("baz"))
-               c.Check(hash2, Equals, hash)
+               c.Check(hash2, Equals, fmt.Sprintf("%s+3", hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
                log.Print("PutB")
@@ -261,11 +264,9 @@ func (s *ServerRequiredSuite) TestPutDisabled(c *C) {
        kc := runProxy(c, []string{"keepproxy", "-no-put"}, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h", 29953)
        defer listener.Close()
 
-       hash := fmt.Sprintf("%x", md5.Sum([]byte("quux")))
-
        {
                hash2, rep, err := kc.PutB([]byte("quux"))
-               c.Check(hash2, Equals, hash)
+               c.Check(hash2, Equals, "")
                c.Check(rep, Equals, 0)
                c.Check(err, Equals, keepclient.InsufficientReplicasError)
                log.Print("PutB")
index 1dccb12fbaed16738eec05b8d8f2f3d7e2cf3a8c..fa7340d26f5f14c2e6d6242e9ea747a58dac7421 100644 (file)
@@ -125,6 +125,7 @@ func main() {
                permission_ttl_sec      int
                serialize_io            bool
                volumearg               string
+               pidfile                 string
        )
        flag.StringVar(
                &data_manager_token_file,
@@ -170,6 +171,13 @@ func main() {
                        "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
                        "supplied, Keep will scan mounted filesystems for volumes "+
                        "with a /keep top-level directory.")
+
+       flag.StringVar(
+               &pidfile,
+               "pid",
+               "",
+               "Path to write pid file")
+
        flag.Parse()
 
        // Look for local keep volumes.
@@ -257,11 +265,25 @@ func main() {
        }(term)
        signal.Notify(term, syscall.SIGTERM)
 
+       if pidfile != "" {
+               f, err := os.Create(pidfile)
+               if err == nil {
+                       fmt.Fprint(f, os.Getpid())
+                       f.Close()
+               } else {
+                       log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
+               }
+       }
+
        // Start listening for requests.
        srv := &http.Server{Addr: listen}
        srv.Serve(listener)
 
        log.Println("shutting down")
+
+       if pidfile != "" {
+               os.Remove(pidfile)
+       }
 }
 
 // MakeRESTRouter