# or omitted, pages are processed serially.
BalanceCollectionBuffers: 1000
+ # Maximum time for a rebalancing run. This ensures keep-balance
+ # eventually gives up and retries if, for example, a network
+ # error causes a hung connection that is never closed by the
+ # OS. It should be long enough that it doesn't interrupt a
+ # long-running balancing operation.
+ BalanceTimeout: 6h
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
"Collections.WebDAVCache": false,
"Collections.BalanceCollectionBatch": false,
"Collections.BalancePeriod": false,
+ "Collections.BalanceTimeout": false,
"Collections.BlobMissingReport": false,
"Collections.BalanceCollectionBuffers": false,
"Containers": true,
# or omitted, pages are processed serially.
BalanceCollectionBuffers: 1000
+ # Maximum time for a rebalancing run. This ensures keep-balance
+ # eventually gives up and retries if, for example, a network
+ # error causes a hung connection that is never closed by the
+ # OS. It should be long enough that it doesn't interrupt a
+ # long-running balancing operation.
+ BalanceTimeout: 6h
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
cluster.TLS.Insecure = true
cluster.API.MaxItemsPerResponse = 1000
cluster.API.MaxRequestAmplification = 4
+ cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
s.testHandler = &Handler{Cluster: cluster}
PostgreSQL: integrationTestCluster().PostgreSQL,
ForceLegacyAPI14: forceLegacyAPI14,
}
+ s.cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
s.cluster.TLS.Insecure = true
arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
Header: hdrOut,
Body: reqIn.Body,
}).WithContext(reqIn.Context())
-
- resp, err := client.Do(reqOut)
- return resp, err
+ return client.Do(reqOut)
}
// Copy a response (or error) to the downstream client
// Finds the timestamp of the newest copy of blk on svc. Returns
// errNotFound if blk is not on svc at all.
-func (rcvr recoverer) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
- found, err := svc.Index(rcvr.client, blk)
+func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
+ found, err := svc.Index(ctx, rcvr.client, blk)
if err != nil {
logger.WithError(err).Warn("error getting index")
return time.Time{}, err
// saved. But if the block's timestamp is more recent than blobsigttl,
// keepstore will refuse to trash it even if told to by keep-balance.
func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
- if latest, err := rcvr.newestMtime(logger, blk, svc); err != nil {
+ if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil {
return err
} else if latest.Add(blobsigttl).After(blobsigexp) {
return nil
return fmt.Errorf("error updating timestamp: %s", err)
}
logger.Debug("updated timestamp")
- if latest, err := rcvr.newestMtime(logger, blk, svc); err == errNotFound {
+ if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound {
return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
} else if err != nil {
return err
// HTTP headers to add/override in outgoing requests.
SendHeader http.Header
+ // Timeout for requests. NewClientFromConfig and
+ // NewClientFromEnv return a Client with a default 5 minute
+ // timeout. To disable this timeout and rely on each
+ // http.Request's context deadline instead, set Timeout to
+ // zero.
+ Timeout time.Duration
+
dd *DiscoveryDocument
- ctx context.Context
+ defaultRequestID string
}
// The default http.Client used by a Client with Insecure==true and
var InsecureHTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
- InsecureSkipVerify: true}},
- Timeout: 5 * time.Minute}
+ InsecureSkipVerify: true}}}
// The default http.Client used by a Client otherwise.
-var DefaultSecureClient = &http.Client{
- Timeout: 5 * time.Minute}
+var DefaultSecureClient = &http.Client{}
// NewClientFromConfig creates a new Client that uses the endpoints in
// the given cluster.
Scheme: ctrlURL.Scheme,
APIHost: ctrlURL.Host,
Insecure: cluster.TLS.Insecure,
+ Timeout: 5 * time.Minute,
}, nil
}
AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
Insecure: insecure,
KeepServiceURIs: svcs,
+ Timeout: 5 * time.Minute,
}
}
}
if req.Header.Get("X-Request-Id") == "" {
- reqid, _ := req.Context().Value(contextKeyRequestID{}).(string)
- if reqid == "" {
- reqid, _ = c.context().Value(contextKeyRequestID{}).(string)
- }
- if reqid == "" {
+ var reqid string
+ if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+ reqid = ctxreqid
+ } else if c.defaultRequestID != "" {
+ reqid = c.defaultRequestID
+ } else {
reqid = reqIDGen.Next()
}
if req.Header == nil {
req.Header.Set("X-Request-Id", reqid)
}
}
- return c.httpClient().Do(req)
+ var cancel context.CancelFunc
+ if c.Timeout > 0 {
+ ctx := req.Context()
+ ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
+ req = req.WithContext(ctx)
+ }
+ resp, err := c.httpClient().Do(req)
+ if err == nil && cancel != nil {
+ // We need to call cancel() eventually, but we can't
+ // use "defer cancel()" because the context has to
+ // stay alive until the caller has finished reading
+ // the response body.
+ resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
+ } else if cancel != nil {
+ cancel()
+ }
+ return resp, err
+}
+
+// cancelOnClose calls a provided CancelFunc when its wrapped
+// ReadCloser's Close() method is called.
+type cancelOnClose struct {
+ io.ReadCloser
+ cancel context.CancelFunc
+}
+
+func (coc cancelOnClose) Close() error {
+ err := coc.ReadCloser.Close()
+ coc.cancel()
+ return err
}
func isRedirectStatus(code int) bool {
//
// path must not contain a query string.
func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
- return c.RequestAndDecodeContext(c.context(), dst, method, path, body, params)
+ return c.RequestAndDecodeContext(context.Background(), dst, method, path, body, params)
}
func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, method, path string, body io.Reader, params interface{}) error {
// header.
func (c *Client) WithRequestID(reqid string) *Client {
cc := *c
- cc.ctx = ContextWithRequestID(cc.context(), reqid)
+ cc.defaultRequestID = reqid
return &cc
}
-func (c *Client) context() context.Context {
- if c.ctx == nil {
- return context.Background()
- }
- return c.ctx
-}
-
func (c *Client) httpClient() *http.Client {
switch {
case c.Client != nil:
BalancePeriod Duration
BalanceCollectionBatch int
BalanceCollectionBuffers int
+ BalanceTimeout Duration
WebDAVCache WebDAVCacheConfig
}
}
// Index returns an unsorted list of blocks at the given mount point.
-func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+ return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
}
// Index returns an unsorted list of blocks that can be retrieved from
// this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(c, s.url("index/"+prefix))
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+ return s.index(ctx, c, s.url("index/"+prefix))
}
-func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
- req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
+ req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
- return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+ return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
}
resp, err := c.Do(req)
if err != nil {
package arvados
import (
+ "context"
"net/http"
check "gopkg.in/check.v1"
APIHost: "zzzzz.arvadosapi.com",
AuthToken: "xyzzy",
}
- _, err := (&KeepService{}).IndexMount(client, "fake", "")
+ _, err := (&KeepService{}).IndexMount(context.Background(), client, "fake", "")
c.Check(err, check.ErrorMatches, `.*timeout.*`)
}
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io"
defer bal.time("sweep", "wall clock time to run one full sweep")()
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+ defer cancel()
+
var lbFile *os.File
if bal.LostBlocksFile != "" {
tmpfn := bal.LostBlocksFile + ".tmp"
if err = bal.CheckSanityEarly(client); err != nil {
return
}
+
+ // On a big site, indexing and sending trash/pull lists can
+ // take much longer than the usual 5 minute client
+ // timeout. From here on, we rely on the context deadline
+ // instead, aborting the entire operation if any part takes
+ // too long.
+ client.Timeout = 0
+
rs := bal.rendezvousState()
if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
if runOptions.SafeRendezvousState != "" {
bal.logf("notice: KeepServices list has changed since last run")
}
bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
- if err = bal.ClearTrashLists(client); err != nil {
+ if err = bal.ClearTrashLists(ctx, client); err != nil {
return
}
// The current rendezvous state becomes "safe" (i.e.,
// succeed in clearing existing trash lists.
nextRunOptions.SafeRendezvousState = rs
}
- if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
+
+ if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
bal.ComputeChangeSets()
lbFile = nil
}
if runOptions.CommitPulls {
- err = bal.CommitPulls(client)
+ err = bal.CommitPulls(ctx, client)
if err != nil {
// Skip trash if we can't pull. (Too cautious?)
return
}
}
if runOptions.CommitTrash {
- err = bal.CommitTrash(client)
+ err = bal.CommitTrash(ctx, client)
}
return
}
// We avoid this problem if we clear all trash lists before getting
// indexes. (We also assume there is only one rebalancing process
// running at a time.)
-func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
for _, srv := range bal.KeepServices {
srv.ChangeSet = &ChangeSet{}
}
- return bal.CommitTrash(c)
+ return bal.CommitTrash(ctx, c)
}
// GetCurrentState determines the current replication state, and the
// collection manifests in the database (API server).
//
// It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
+func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
defer bal.time("get_state", "wall clock time to get current state")()
bal.BlockStateMap = NewBlockStateMap()
go func(mounts []*KeepMount) {
defer wg.Done()
bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
- idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+ idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
if err != nil {
select {
case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
default:
}
+ cancel()
return
}
if len(errs) > 0 {
}
for range collQ {
}
+ cancel()
return
}
bal.collScanned++
wg.Add(1)
go func() {
defer wg.Done()
- err = EachCollection(c, pageSize,
+ err = EachCollection(ctx, c, pageSize,
func(coll arvados.Collection) error {
collQ <- coll
if len(errs) > 0 {
case errs <- err:
default:
}
+ cancel()
}
}()
// keepstore servers. This has the effect of increasing replication of
// existing blocks that are either underreplicated or poorly
// distributed according to rendezvous hashing.
-func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
return bal.commitAsync(c, "send pull list",
func(srv *KeepService) error {
- return srv.CommitPulls(c)
+ return srv.CommitPulls(ctx, c)
})
}
// CommitTrash sends the computed lists of trash requests to the
// keepstore servers. This has the effect of deleting blocks that are
// overreplicated or unreferenced.
-func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
return bal.commitAsync(c, "send trash list",
func(srv *KeepService) error {
- return srv.CommitTrash(c)
+ return srv.CommitTrash(ctx, c)
})
}
package main
import (
+ "context"
"fmt"
"time"
//
// If pageSize > 0 it is used as the maximum page size in each API
// call; otherwise the maximum allowed page size is requested.
-func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
if progress == nil {
progress = func(_, _ int) {}
}
for {
progress(callCount, expectCount)
var page arvados.CollectionList
- err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+ err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
if err != nil {
return err
}
package main
import (
+ "context"
"sync"
"time"
longestStreak := 0
var lastMod time.Time
sawUUID := make(map[string]bool)
- err := EachCollection(s.client, pageSize, func(c arvados.Collection) error {
+ err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
if c.ModifiedAt.IsZero() {
return nil
}
package main
import (
+ "context"
"encoding/json"
"fmt"
"io"
// CommitPulls sends the current list of pull requests to the storage
// server (even if the list is empty).
-func (srv *KeepService) CommitPulls(c *arvados.Client) error {
- return srv.put(c, "pull", srv.ChangeSet.Pulls)
+func (srv *KeepService) CommitPulls(ctx context.Context, c *arvados.Client) error {
+ return srv.put(ctx, c, "pull", srv.ChangeSet.Pulls)
}
// CommitTrash sends the current list of trash requests to the storage
// server (even if the list is empty).
-func (srv *KeepService) CommitTrash(c *arvados.Client) error {
- return srv.put(c, "trash", srv.ChangeSet.Trashes)
+func (srv *KeepService) CommitTrash(ctx context.Context, c *arvados.Client) error {
+ return srv.put(ctx, c, "trash", srv.ChangeSet.Trashes)
}
// Perform a PUT request at path, with data (as JSON) in the request
// body.
-func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+func (srv *KeepService) put(ctx context.Context, c *arvados.Client, path string, data interface{}) error {
// We'll start a goroutine to do the JSON encoding, so we can
// stream it to the http client through a Pipe, rather than
// keeping the entire encoded version in memory.
}()
url := srv.URLBase() + "/" + path
- req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+ req, err := http.NewRequestWithContext(ctx, "PUT", url, ioutil.NopCloser(jsonR))
if err != nil {
return fmt.Errorf("building request for %s: %v", url, err)
}