5824: Merge branch 'master' into 5824-keep-web
[arvados.git] / sdk / go / keepclient / support.go
index f5554618feece00d04c34982347ffcc8e1e405aa..4a210243defcd0bd33ea130dd17c96b2c151534f 100644 (file)
@@ -235,7 +235,7 @@ func (this *KeepClient) putReplicas(
 
        // Take the hash of locator and timestamp in order to identify this
        // specific transaction in log statements.
-       requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
+       requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
 
        // Calculate the ordering for uploading to servers
        sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots()
@@ -269,34 +269,53 @@ func (this *KeepClient) putReplicas(
                replicasPerThread = remaining_replicas
        }
 
-       for remaining_replicas > 0 {
-               for active*replicasPerThread < remaining_replicas {
-                       // Start some upload requests
-                       if next_server < len(sv) {
-                               log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
-                               go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
-                               next_server += 1
-                               active += 1
-                       } else {
-                               if active == 0 {
-                                       return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+       retriesRemaining := 1 + this.Retries
+       var retryServers []string
+
+       for retriesRemaining > 0 {
+               retriesRemaining -= 1
+               next_server = 0
+               retryServers = []string{}
+               for remaining_replicas > 0 {
+                       for active*replicasPerThread < remaining_replicas {
+                               // Start some upload requests
+                               if next_server < len(sv) {
+                                       log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+                                       go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+                                       next_server += 1
+                                       active += 1
                                } else {
-                                       break
+                                       if active == 0 && retriesRemaining == 0 {
+                                               return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+                                       } else {
+                                               break
+                                       }
+                               }
+                       }
+                       log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+                               requestId, remaining_replicas, active)
+
+                       // Now wait for something to happen.
+                       if active > 0 {
+                               status := <-upload_status
+                               active -= 1
+
+                               if status.statusCode == 200 {
+                                       // good news!
+                                       remaining_replicas -= status.replicas_stored
+                                       locator = status.response
+                               } else if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+                                       (status.statusCode >= 500 && status.statusCode != 503) {
+                                       // Timeout, too many requests, or other server side failure
+                                       // Do not retry when status code is 503, which means the keep server is full
+                                       retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
                                }
+                       } else {
+                               break
                        }
                }
-               log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
-                       requestId, remaining_replicas, active)
-
-               // Now wait for something to happen.
-               status := <-upload_status
-               active -= 1
 
-               if status.statusCode == 200 {
-                       // good news!
-                       remaining_replicas -= status.replicas_stored
-                       locator = status.response
-               }
+               sv = retryServers
        }
 
        return locator, this.Want_replicas, nil