Merge branch 'master' into 7546-put-retry
authorradhika <radhika@curoverse.com>
Thu, 22 Oct 2015 19:24:43 +0000 (15:24 -0400)
committerradhika <radhika@curoverse.com>
Thu, 22 Oct 2015 19:24:43 +0000 (15:24 -0400)
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go

index aaba0695f0d0751c370fbd42741c3f6e77b9fbce..c03ba90736868d15cdba09d0638e160e60d57998 100644 (file)
@@ -442,6 +442,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
        kc, _ := MakeKeepClient(&arv)
 
        kc.Want_replicas = 2
+       kc.Retries = 0
        arv.ApiToken = "abc123"
        localRoots := make(map[string]string)
        writableLocalRoots := make(map[string]string)
@@ -552,6 +553,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+       kc.Retries = 0
 
        r, n, url2, err := kc.Get(hash)
        c.Check(err, Equals, BlockNotFound)
@@ -808,6 +810,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
        }
 
        kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+       kc.Retries = 0
 
        // This test works only if one of the failing services is
        // attempted before the succeeding service. Otherwise,
@@ -1186,3 +1189,53 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
        c.Check(err2, Equals, nil)
        c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
 }
+
+type FailThenSucceedPutHandler struct {
+       handled        chan string
+       count          int
+       successhandler StubPutHandler
+}
+
+func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if h.count == 0 {
+               resp.WriteHeader(500)
+               h.count += 1
+               h.handled <- fmt.Sprintf("http://%s", req.Host)
+       } else {
+               h.successhandler.ServeHTTP(resp, req)
+       }
+}
+
+func (s *StandaloneSuite) TestPutBRetry(c *C) {
+       st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
+               StubPutHandler{
+                       c,
+                       Md5String("foo"),
+                       "abc123",
+                       "foo",
+                       make(chan string, 5)}}
+
+       arv, _ := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+
+       kc.Want_replicas = 2
+       arv.ApiToken = "abc123"
+       localRoots := make(map[string]string)
+       writableLocalRoots := make(map[string]string)
+
+       ks := RunSomeFakeKeepServers(st, 2)
+
+       for i, k := range ks {
+               localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+               writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+               defer k.listener.Close()
+       }
+
+       kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+
+       hash, replicas, err := kc.PutB([]byte("foo"))
+
+       c.Check(err, Equals, nil)
+       c.Check(hash, Equals, "")
+       c.Check(replicas, Equals, 2)
+}
index 0791d3cf856ee7d5d1268338eafa883fe9bcbb18..36572605bcd3b07acd2e5c1e0f5e42636ba1c380 100644 (file)
@@ -235,7 +235,7 @@ func (this KeepClient) putReplicas(
 
        // Take the hash of locator and timestamp in order to identify this
        // specific transaction in log statements.
-       requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
+       requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
 
        // Calculate the ordering for uploading to servers
        sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots()
@@ -269,34 +269,52 @@ func (this KeepClient) putReplicas(
                replicasPerThread = remaining_replicas
        }
 
-       for remaining_replicas > 0 {
-               for active*replicasPerThread < remaining_replicas {
-                       // Start some upload requests
-                       if next_server < len(sv) {
-                               log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
-                               go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
-                               next_server += 1
-                               active += 1
-                       } else {
-                               if active == 0 {
-                                       return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+       retriesRemaining := 1 + this.Retries
+       var retryServers []string
+
+       for retriesRemaining > 0 {
+               retriesRemaining -= 1
+               next_server = 0
+               retryServers = []string{}
+               for remaining_replicas > 0 {
+                       for active*replicasPerThread < remaining_replicas {
+                               // Start some upload requests
+                               if next_server < len(sv) {
+                                       log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+                                       go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+                                       next_server += 1
+                                       active += 1
                                } else {
-                                       break
+                                       if active == 0 && retriesRemaining == 0 {
+                                               return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+                                       } else {
+                                               break
+                                       }
+                               }
+                       }
+                       log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+                               requestId, remaining_replicas, active)
+
+                       // Now wait for something to happen.
+                       if active > 0 {
+                               status := <-upload_status
+                               active -= 1
+
+                               if status.statusCode == 200 {
+                                       // good news!
+                                       remaining_replicas -= status.replicas_stored
+                                       locator = status.response
+                               } else if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+                                       (status.statusCode >= 500 && status.statusCode != 503) {
+                                       // Timeout, too many requests, or other server side failure
+                                       retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
                                }
+                       } else {
+                               break
                        }
                }
-               log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
-                       requestId, remaining_replicas, active)
-
-               // Now wait for something to happen.
-               status := <-upload_status
-               active -= 1
 
-               if status.statusCode == 200 {
-                       // good news!
-                       remaining_replicas -= status.replicas_stored
-                       locator = status.response
-               }
+               sv = retryServers
        }
 
        return locator, this.Want_replicas, nil