- defer close(upload_status)
-
- // Desired number of replicas
- remaining_replicas := this.Want_replicas
-
- for remaining_replicas > 0 {
- for active < remaining_replicas {
- // Start some upload requests
- if next_server < len(sv) {
- log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
- next_server += 1
- active += 1
- } else {
- if active == 0 {
- return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+ defer func() {
+ // Wait for any abandoned uploads (e.g., we started
+ // two uploads and the first replied with replicas=2)
+ // to finish before closing the status channel.
+ go func() {
+ for active > 0 {
+ <-upload_status
+ }
+ close(upload_status)
+ }()
+ }()
+
+ replicasDone := 0
+ replicasTodo := this.Want_replicas
+
+ replicasPerThread := this.replicasPerService
+ if replicasPerThread < 1 {
+ // unlimited or unknown
+ replicasPerThread = replicasTodo
+ }
+
+ retriesRemaining := 1 + this.Retries
+ var retryServers []string
+
+ for retriesRemaining > 0 {
+ retriesRemaining -= 1
+ next_server = 0
+ retryServers = []string{}
+ for replicasTodo > 0 {
+ for active*replicasPerThread < replicasTodo {
+ // Start some upload requests
+ if next_server < len(sv) {
+ DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
+ go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+ next_server += 1
+ active += 1