5824: Merge branch 'master' into 5824-keep-web
authorTom Clegg <tom@curoverse.com>
Thu, 29 Oct 2015 20:31:37 +0000 (16:31 -0400)
committerTom Clegg <tom@curoverse.com>
Thu, 29 Oct 2015 20:31:37 +0000 (16:31 -0400)
1  2 
doc/_config.yml
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/support.go
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go

diff --combined doc/_config.yml
index 6d0bdb2f28a01e0329b4b3ed80ec9f0326e02243,75cb997d78b0fe0d80d869023185a11dbfa0bfbd..2f37f5af0803c360f2c0cce2d08389d64b21249f
@@@ -153,8 -153,8 +153,9 @@@ navbar
        - install/install-shell-server.html.textile.liquid
        - install/create-standard-objects.html.textile.liquid
        - install/install-keepstore.html.textile.liquid
+       - install/configure-azure-blob-storage.html.textile.liquid
        - install/install-keepproxy.html.textile.liquid
 +      #- install/install-keep-web.html.textile.liquid
        - install/install-crunch-dispatch.html.textile.liquid
        - install/install-compute-node.html.textile.liquid
      - Helpful hints:
index e74ae2cf08da0401bab6b808cf7333af05ef8a36,2f809b32560b7e6072fd6242963e18b2ab9430df..0e6fadcc3548c99d2e85c30219a74ffdce42bf72
@@@ -22,7 -22,33 +22,33 @@@ import 
  // A Keep "block" is 64MB.
  const BLOCKSIZE = 64 * 1024 * 1024
  
- var BlockNotFound = errors.New("Block not found")
+ // Error interface with an error and boolean indicating whether the error is temporary
+ type Error interface {
+       error
+       Temporary() bool
+ }
+ // multipleResponseError is of type Error
+ type multipleResponseError struct {
+       error
+       isTemp bool
+ }
+ func (e *multipleResponseError) Temporary() bool {
+       return e.isTemp
+ }
+ // BlockNotFound is a multipleResponseError where isTemp is false
+ var BlockNotFound = &ErrNotFound{multipleResponseError{
+       error:  errors.New("Block not found"),
+       isTemp: false,
+ }}
+ // ErrNotFound is a multipleResponseError where isTemp can be true or false
+ type ErrNotFound struct {
+       multipleResponseError
+ }
  var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
  var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
  var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
@@@ -145,7 -171,12 +171,12 @@@ func (kc *KeepClient) getOrHead(method 
        var errs []string
  
        tries_remaining := 1 + kc.Retries
        serversToTry := kc.getSortedRoots(locator)
+       numServers := len(serversToTry)
+       count404 := 0
        var retryList []string
  
        for tries_remaining > 0 {
                                retryList = append(retryList, host)
                        } else if resp.StatusCode != http.StatusOK {
                                var respbody []byte
 -                              respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
 +                              respbody, _ = ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
                                resp.Body.Close()
                                errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
                                        url, resp.StatusCode, bytes.TrimSpace(respbody)))
                                        // server side failure, transient
                                        // error, can try again.
                                        retryList = append(retryList, host)
+                               } else if resp.StatusCode == 404 {
+                                       count404++
                                }
                        } else {
                                // Success.
        }
        log.Printf("DEBUG: %s %s failed: %v", method, locator, errs)
  
-       return nil, 0, "", BlockNotFound
+       var err error
+       if count404 == numServers {
+               err = BlockNotFound
+       } else {
+               err = &ErrNotFound{multipleResponseError{
+                       error:  fmt.Errorf("%s %s failed: %v", method, locator, errs),
+                       isTemp: len(serversToTry) > 0,
+               }}
+       }
+       return nil, 0, "", err
  }
  
  // Get() retrieves a block, given a locator. Returns a reader, the
index f5554618feece00d04c34982347ffcc8e1e405aa,bd9b2e35f79be1fe5c6c1fc125b4449875f97f2c..4a210243defcd0bd33ea130dd17c96b2c151534f
@@@ -169,7 -169,7 +169,7 @@@ type uploadStatus struct 
        response        string
  }
  
 -func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
 +func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
        upload_status chan<- uploadStatus, expectedLength int64, requestId string) {
  
        var req *http.Request
        defer resp.Body.Close()
        defer io.Copy(ioutil.Discard, resp.Body)
  
 -      respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
 +      respbody, err2 := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
        response := strings.TrimSpace(string(respbody))
        if err2 != nil && err2 != io.EOF {
                log.Printf("[%v] Upload %v error: %v response: %v", requestId, url, err2.Error(), response)
        }
  }
  
 -func (this KeepClient) putReplicas(
 +func (this *KeepClient) putReplicas(
        hash string,
        tr *streamer.AsyncStream,
        expectedLength int64) (locator string, replicas int, err error) {
  
        // Take the hash of locator and timestamp in order to identify this
        // specific transaction in log statements.
-       requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
+       requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
  
        // Calculate the ordering for uploading to servers
        sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots()
                replicasPerThread = remaining_replicas
        }
  
-       for remaining_replicas > 0 {
-               for active*replicasPerThread < remaining_replicas {
-                       // Start some upload requests
-                       if next_server < len(sv) {
-                               log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
-                               go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
-                               next_server += 1
-                               active += 1
-                       } else {
-                               if active == 0 {
-                                       return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+       retriesRemaining := 1 + this.Retries
+       var retryServers []string
+       for retriesRemaining > 0 {
+               retriesRemaining -= 1
+               next_server = 0
+               retryServers = []string{}
+               for remaining_replicas > 0 {
+                       for active*replicasPerThread < remaining_replicas {
+                               // Start some upload requests
+                               if next_server < len(sv) {
+                                       log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+                                       go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+                                       next_server += 1
+                                       active += 1
                                } else {
-                                       break
+                                       if active == 0 && retriesRemaining == 0 {
+                                               return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+                                       } else {
+                                               break
+                                       }
+                               }
+                       }
+                       log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+                               requestId, remaining_replicas, active)
+                       // Now wait for something to happen.
+                       if active > 0 {
+                               status := <-upload_status
+                               active -= 1
+                               if status.statusCode == 200 {
+                                       // good news!
+                                       remaining_replicas -= status.replicas_stored
+                                       locator = status.response
+                               } else if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+                                       (status.statusCode >= 500 && status.statusCode != 503) {
+                                       // Timeout, too many requests, or other server side failure
+                                       // Do not retry when status code is 503, which means the keep server is full
+                                       retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
                                }
+                       } else {
+                               break
                        }
                }
-               log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
-                       requestId, remaining_replicas, active)
-               // Now wait for something to happen.
-               status := <-upload_status
-               active -= 1
  
-               if status.statusCode == 200 {
-                       // good news!
-                       remaining_replicas -= status.replicas_stored
-                       locator = status.response
-               }
+               sv = retryServers
        }
  
        return locator, this.Want_replicas, nil
index f7677ec5fed3c28e8bab6c62fd452b0ce687ae9a,d3dbeaf89e420d4546d0fedab643d9b15e9849c9..1a1189658d7ba1473aa33036cce60276932cb9b8
@@@ -37,7 -37,7 +37,7 @@@ func main() 
                pidfile          string
        )
  
 -      flagset := flag.NewFlagSet("default", flag.ExitOnError)
 +      flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
  
        flagset.StringVar(
                &listen,
@@@ -201,7 -201,7 +201,7 @@@ func GetRemoteAddress(req *http.Request
        return req.RemoteAddr
  }
  
 -func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
 +func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
        var auth string
        if auth = req.Header.Get("Authorization"); auth == "" {
                return false, ""
@@@ -331,7 -331,7 +331,7 @@@ func (this GetBlockHandler) ServeHTTP(r
  
        var pass bool
        var tok string
 -      if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
 +      if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
                status, err = http.StatusForbidden, BadAuthorizationHeader
                return
        }
                log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
        }
  
-       switch err {
+       switch respErr := err.(type) {
        case nil:
                status = http.StatusOK
                resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
                                err = ContentLengthMismatch
                        }
                }
-       case keepclient.BlockNotFound:
-               status = http.StatusNotFound
+       case keepclient.Error:
+               if respErr == keepclient.BlockNotFound {
+                       status = http.StatusNotFound
+               } else if respErr.Temporary() {
+                       status = http.StatusBadGateway
+               } else {
+                       status = 422
+               }
        default:
-               status = http.StatusBadGateway
+               status = http.StatusInternalServerError
        }
  }
  
@@@ -432,7 -438,7 +438,7 @@@ func (this PutBlockHandler) ServeHTTP(r
  
        var pass bool
        var tok string
 -      if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
 +      if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
                err = BadAuthorizationHeader
                status = http.StatusForbidden
                return
@@@ -515,7 -521,7 +521,7 @@@ func (handler IndexHandler) ServeHTTP(r
  
        kc := *handler.KeepClient
  
 -      ok, token := CheckAuthorizationHeader(kc, handler.ApiTokenCache, req)
 +      ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req)
        if !ok {
                status, err = http.StatusForbidden, BadAuthorizationHeader
                return
index 917f0124adbec7a10305e700ea1bd76ea2d39f75,997163eca42c965f41109af3ad51de22e65c80d3..2c75ec1616e3b404a9547b6e2f969ce9fc1fe9f2
@@@ -30,6 -30,12 +30,12 @@@ var _ = Suite(&ServerRequiredSuite{}
  // Tests that require the Keep server running
  type ServerRequiredSuite struct{}
  
+ // Gocheck boilerplate
+ var _ = Suite(&NoKeepServerSuite{})
+ // Test with no keepserver to simulate errors
+ type NoKeepServerSuite struct{}
  // Wait (up to 1 second) for keepproxy to listen on a port. This
  // avoids a race condition where we hit a "connection refused" error
  // because we start testing the proxy too soon.
@@@ -53,7 -59,7 +59,7 @@@ func closeListener() 
  
  func (s *ServerRequiredSuite) SetUpSuite(c *C) {
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
  }
  
  func (s *ServerRequiredSuite) SetUpTest(c *C) {
  }
  
  func (s *ServerRequiredSuite) TearDownSuite(c *C) {
-       arvadostest.StopKeep()
+       arvadostest.StopKeep(2)
+       arvadostest.StopAPI()
+ }
+ func (s *NoKeepServerSuite) SetUpSuite(c *C) {
+       arvadostest.StartAPI()
+ }
+ func (s *NoKeepServerSuite) SetUpTest(c *C) {
+       arvadostest.ResetEnv()
+ }
+ func (s *NoKeepServerSuite) TearDownSuite(c *C) {
        arvadostest.StopAPI()
  }
  
@@@ -103,7 -121,7 +121,7 @@@ func setupProxyService() 
        }
  }
  
 -func runProxy(c *C, args []string, port int, bogusClientToken bool) keepclient.KeepClient {
 +func runProxy(c *C, args []string, port int, bogusClientToken bool) *keepclient.KeepClient {
        if bogusClientToken {
                os.Setenv("ARVADOS_API_TOKEN", "bogus-token")
        }
                go main()
        }
  
 -      return kc
 +      return &kc
  }
  
  func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
@@@ -251,7 -269,9 +269,9 @@@ func (s *ServerRequiredSuite) TestPutAs
  
        {
                _, _, err := kc.Ask(hash)
-               c.Check(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                log.Print("Ask 1")
        }
  
  
        {
                blocklen, _, err := kc.Ask(hash)
-               c.Assert(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
                log.Print("Ask 2")
        }
  
        {
                _, blocklen, _, err := kc.Get(hash)
-               c.Assert(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
                log.Print("Get")
        }
@@@ -291,7 -315,9 +315,9 @@@ func (s *ServerRequiredSuite) TestGetDi
  
        {
                _, _, err := kc.Ask(hash)
-               c.Check(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                log.Print("Ask 1")
        }
  
  
        {
                blocklen, _, err := kc.Ask(hash)
-               c.Assert(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
                log.Print("Ask 2")
        }
  
        {
                _, blocklen, _, err := kc.Get(hash)
-               c.Assert(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
                log.Print("Get")
        }
@@@ -473,3 -503,84 +503,84 @@@ func (s *ServerRequiredSuite) TestGetIn
        _, err = kc.GetIndex("proxy", "xyz")
        c.Assert((err != nil), Equals, true)
  }
+ func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
+       kc := runProxy(c, []string{"keepproxy"}, 28852, false)
+       waitForListener()
+       defer closeListener()
+       // Put a test block
+       hash, rep, err := kc.PutB([]byte("foo"))
+       c.Check(err, Equals, nil)
+       c.Check(rep, Equals, 2)
+       for _, token := range []string{
+               "nosuchtoken",
+               "2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx", // expired
+       } {
+               // Change token to given bad token
+               kc.Arvados.ApiToken = token
+               // Ask should result in error
+               _, _, err = kc.Ask(hash)
+               c.Check(err, NotNil)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound.Temporary(), Equals, false)
+               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
+               // Get should result in error
+               _, _, _, err = kc.Get(hash)
+               c.Check(err, NotNil)
+               errNotFound, _ = err.(keepclient.ErrNotFound)
+               c.Check(errNotFound.Temporary(), Equals, false)
+               c.Assert(strings.Contains(err.Error(), "HTTP 403 \"Missing or invalid Authorization header\""), Equals, true)
+       }
+ }
+ func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, Equals, nil)
+       // keepclient with no such keep server
+       kc := keepclient.New(&arv)
+       locals := map[string]string{
+               "proxy": "http://localhost:12345",
+       }
+       kc.SetServiceRoots(locals, nil, nil)
+       // Ask should result in temporary connection refused error
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+       _, _, err = kc.Ask(hash)
+       c.Check(err, NotNil)
+       errNotFound, _ := err.(*keepclient.ErrNotFound)
+       c.Check(errNotFound.Temporary(), Equals, true)
+       c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
+       // Get should result in temporary connection refused error
+       _, _, _, err = kc.Get(hash)
+       c.Check(err, NotNil)
+       errNotFound, _ = err.(*keepclient.ErrNotFound)
+       c.Check(errNotFound.Temporary(), Equals, true)
+       c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
+ }
+ func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
+       kc := runProxy(c, []string{"keepproxy"}, 29999, false)
+       waitForListener()
+       defer closeListener()
+       // Ask should result in temporary connection refused error
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+       _, _, err := kc.Ask(hash)
+       c.Check(err, NotNil)
+       errNotFound, _ := err.(*keepclient.ErrNotFound)
+       c.Check(errNotFound.Temporary(), Equals, true)
+       c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
+       // Get should result in temporary connection refused error
+       _, _, _, err = kc.Get(hash)
+       c.Check(err, NotNil)
+       errNotFound, _ = err.(*keepclient.ErrNotFound)
+       c.Check(errNotFound.Temporary(), Equals, true)
+       c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
+ }