package keepclient
import (
+ "bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
+ "math/rand"
"net/http"
"os"
"strconv"
"strings"
+ "time"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/asyncbuf"
)
// DebugPrintf emits debug messages. The easiest way to enable
}
func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo []string, body io.Reader,
- uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) {
+ uploadStatusChan chan<- uploadStatus, expectedLength int, reqid string) {
var req *http.Request
var err error
return
}
- req.ContentLength = expectedLength
+ req.ContentLength = int64(expectedLength)
if expectedLength > 0 {
req.Body = ioutil.NopCloser(body)
} else {
}
}
-func (kc *KeepClient) putReplicas(
- hash string,
- getReader func() io.Reader,
- expectedLength int64) (locator string, replicas int, err error) {
-
- reqid := kc.getRequestID()
+func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ var resp arvados.BlockWriteResponse
+ var getReader func() io.Reader
+ if req.Data == nil && req.Reader == nil {
+ return resp, errors.New("invalid BlockWriteOptions: Data and Reader are both nil")
+ }
+ if req.DataSize < 0 {
+ return resp, fmt.Errorf("invalid BlockWriteOptions: negative DataSize %d", req.DataSize)
+ }
+ if req.DataSize > BLOCKSIZE || len(req.Data) > BLOCKSIZE {
+ return resp, ErrOversizeBlock
+ }
+ if req.Data != nil {
+ if req.DataSize > len(req.Data) {
+ return resp, errors.New("invalid BlockWriteOptions: DataSize > len(Data)")
+ }
+ if req.DataSize == 0 {
+ req.DataSize = len(req.Data)
+ }
+ getReader = func() io.Reader { return bytes.NewReader(req.Data[:req.DataSize]) }
+ } else {
+ buf := asyncbuf.NewBuffer(make([]byte, 0, req.DataSize))
+ reader := req.Reader
+ if req.Hash != "" {
+ reader = HashCheckingReader{req.Reader, md5.New(), req.Hash}
+ }
+ go func() {
+ _, err := io.Copy(buf, reader)
+ buf.CloseWithError(err)
+ }()
+ getReader = buf.NewReader
+ }
+ if req.Hash == "" {
+ m := md5.New()
+ _, err := io.Copy(m, getReader())
+ if err != nil {
+ return resp, err
+ }
+ req.Hash = fmt.Sprintf("%x", m.Sum(nil))
+ }
+ if req.StorageClasses == nil {
+ if len(kc.StorageClasses) > 0 {
+ req.StorageClasses = kc.StorageClasses
+ } else {
+ req.StorageClasses = kc.DefaultStorageClasses
+ }
+ }
+ if req.Replicas == 0 {
+ req.Replicas = kc.Want_replicas
+ }
+ if req.RequestID == "" {
+ req.RequestID = kc.getRequestID()
+ }
+ if req.Attempts == 0 {
+ req.Attempts = 1 + kc.Retries
+ }
// Calculate the ordering for uploading to servers
- sv := NewRootSorter(kc.WritableLocalRoots(), hash).GetSortedRoots()
+ sv := NewRootSorter(kc.WritableLocalRoots(), req.Hash).GetSortedRoots()
// The next server to try contacting
nextServer := 0
}()
}()
- replicasWanted := kc.Want_replicas
replicasTodo := map[string]int{}
- for _, c := range kc.StorageClasses {
- replicasTodo[c] = replicasWanted
+ for _, c := range req.StorageClasses {
+ replicasTodo[c] = req.Replicas
}
- replicasDone := 0
replicasPerThread := kc.replicasPerService
if replicasPerThread < 1 {
// unlimited or unknown
- replicasPerThread = replicasWanted
+ replicasPerThread = req.Replicas
}
- retriesRemaining := 1 + kc.Retries
+ delay := delayCalculator{InitialMaxDelay: kc.RetryDelay}
+ retriesRemaining := req.Attempts
var retryServers []string
lastError := make(map[string]string)
}
}
if !trackingClasses {
- maxConcurrency = replicasWanted - replicasDone
+ maxConcurrency = req.Replicas - resp.Replicas
}
if maxConcurrency < 1 {
// If there are no non-zero entries in
for active*replicasPerThread < maxConcurrency {
// Start some upload requests
if nextServer < len(sv) {
- DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer])
- go kc.uploadToKeepServer(sv[nextServer], hash, classesTodo, getReader(), uploadStatusChan, expectedLength, reqid)
+ DebugPrintf("DEBUG: [%s] Begin upload %s to %s", req.RequestID, req.Hash, sv[nextServer])
+ go kc.uploadToKeepServer(sv[nextServer], req.Hash, classesTodo, getReader(), uploadStatusChan, req.DataSize, req.RequestID)
nextServer++
active++
} else {
msg += resp + "; "
}
msg = msg[:len(msg)-2]
- return locator, replicasDone, InsufficientReplicasError(errors.New(msg))
+ return resp, InsufficientReplicasError{error: errors.New(msg)}
}
break
}
}
- DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", reqid, replicasTodo, active)
+ DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", req.RequestID, replicasTodo, active)
if active < 1 {
break
}
if status.statusCode == http.StatusOK {
delete(lastError, status.url)
- replicasDone += status.replicasStored
+ resp.Replicas += status.replicasStored
if len(status.classesStored) == 0 {
// Server doesn't report
// storage classes. Give up
delete(replicasTodo, className)
}
}
- locator = status.response
+ resp.Locator = status.response
} else {
msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
if len(msg) > 100 {
}
if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
- (status.statusCode >= 500 && status.statusCode != 503) {
+ (status.statusCode >= 500 && status.statusCode != http.StatusInsufficientStorage) {
// Timeout, too many requests, or other server side failure
- // Do not retry when status code is 503, which means the keep server is full
+ // (do not auto-retry status 507 "full")
retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
}
}
sv = retryServers
+ if len(sv) > 0 {
+ time.Sleep(delay.Next())
+ }
}
- return locator, replicasDone, nil
+ return resp, nil
}
func parseStorageClassesConfirmedHeader(hdr string) (map[string]int, error) {
}
return classesStored, nil
}
+
+// delayCalculator calculates a series of delays for implementing
+// exponential backoff with jitter. The first call to Next() returns
+// a random duration between MinimumRetryDelay and the specified
+// InitialMaxDelay (or DefaultRetryDelay if 0). The max delay is
+// doubled on each subsequent call to Next(), up to 10x the initial
+// max delay.
+type delayCalculator struct {
+ InitialMaxDelay time.Duration
+ n int // number of delays returned so far
+ nextmax time.Duration
+ limit time.Duration
+}
+
+func (dc *delayCalculator) Next() time.Duration {
+ if dc.nextmax <= MinimumRetryDelay {
+ // initialize
+ if dc.InitialMaxDelay > 0 {
+ dc.nextmax = dc.InitialMaxDelay
+ } else {
+ dc.nextmax = DefaultRetryDelay
+ }
+ dc.limit = 10 * dc.nextmax
+ }
+ d := time.Duration(rand.Float64() * float64(dc.nextmax))
+ if d < MinimumRetryDelay {
+ d = MinimumRetryDelay
+ }
+ dc.nextmax *= 2
+ if dc.nextmax > dc.limit {
+ dc.nextmax = dc.limit
+ }
+ return d
+}