// Take the hash of locator and timestamp in order to identify this
// specific transaction in log statements.
- requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
+ requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
// Calculate the ordering for uploading to servers
sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots()
replicasPerThread = remaining_replicas
}
- for remaining_replicas > 0 {
- for active*replicasPerThread < remaining_replicas {
- // Start some upload requests
- if next_server < len(sv) {
- log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
- next_server += 1
- active += 1
- } else {
- if active == 0 {
- return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+ retriesRemaining := 1 + this.Retries
+ var retryServers []string
+
+ for retriesRemaining > 0 {
+ retriesRemaining -= 1
+ next_server = 0
+ retryServers = []string{}
+ for remaining_replicas > 0 {
+ for active*replicasPerThread < remaining_replicas {
+ // Start some upload requests
+ if next_server < len(sv) {
+ log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+ go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+ next_server += 1
+ active += 1
} else {
- break
+ if active == 0 && retriesRemaining == 0 {
+ return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+ } else {
+ break
+ }
+ }
+ }
+ log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+ requestId, remaining_replicas, active)
+
+ // Now wait for something to happen.
+ if active > 0 {
+ status := <-upload_status
+ active -= 1
+
+ if status.statusCode == 200 {
+ // good news!
+ remaining_replicas -= status.replicas_stored
+ locator = status.response
+ } else if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+ (status.statusCode >= 500 && status.statusCode != 503) {
+ // Timeout, too many requests, or other server side failure
+ // Do not retry when status code is 503, which means the keep server is full
+ retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
}
+ } else {
+ break
}
}
- log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
- requestId, remaining_replicas, active)
-
- // Now wait for something to happen.
- status := <-upload_status
- active -= 1
- if status.statusCode == 200 {
- // good news!
- remaining_replicas -= status.replicas_stored
- locator = status.response
- }
+ sv = retryServers
}
return locator, this.Want_replicas, nil