From 2bc1a7a89597ab02aaeef84b82fdc51f8e375b79 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 16 Jun 2020 10:10:53 -0400 Subject: [PATCH] 16480: Use longer timeout for keepstore index requests. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/recovercollection/cmd.go | 8 ++-- sdk/go/arvados/client.go | 70 ++++++++++++++++++++--------- sdk/go/arvados/keep_service.go | 14 +++--- sdk/go/arvados/keep_service_test.go | 3 +- services/keep-balance/balance.go | 16 ++++++- 5 files changed, 78 insertions(+), 33 deletions(-) diff --git a/lib/recovercollection/cmd.go b/lib/recovercollection/cmd.go index cea4607c98..da466c31ca 100644 --- a/lib/recovercollection/cmd.go +++ b/lib/recovercollection/cmd.go @@ -204,8 +204,8 @@ var errNotFound = errors.New("not found") // Finds the timestamp of the newest copy of blk on svc. Returns // errNotFound if blk is not on svc at all. -func (rcvr recoverer) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) { - found, err := svc.Index(rcvr.client, blk) +func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) { + found, err := svc.Index(ctx, rcvr.client, blk) if err != nil { logger.WithError(err).Warn("error getting index") return time.Time{}, err @@ -236,7 +236,7 @@ var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect - // saved. But if the block's timestamp is more recent than blobsigttl, // keepstore will refuse to trash it even if told to by keep-balance. func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error { - if latest, err := rcvr.newestMtime(logger, blk, svc); err != nil { + if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil { return err } else if latest.Add(blobsigttl).After(blobsigexp) { return nil @@ -245,7 +245,7 @@ func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, return fmt.Errorf("error updating timestamp: %s", err) } logger.Debug("updated timestamp") - if latest, err := rcvr.newestMtime(logger, blk, svc); err == errNotFound { + if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound { return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index") } else if err != nil { return err diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index 1e2c07e867..562c8c1e7d 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -57,9 +57,16 @@ type Client struct { // HTTP headers to add/override in outgoing requests. SendHeader http.Header + // Timeout for requests. NewClientFromConfig and + // NewClientFromEnv return a Client with a default 5 minute + // timeout. To disable this timeout and rely on each + // http.Request's context deadline instead, set Timeout to + // zero. + Timeout time.Duration + dd *DiscoveryDocument - ctx context.Context + defaultRequestID string } // The default http.Client used by a Client with Insecure==true and @@ -67,12 +74,10 @@ type Client struct { var InsecureHTTPClient = &http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true}}, - Timeout: 5 * time.Minute} + InsecureSkipVerify: true}}} // The default http.Client used by a Client otherwise. -var DefaultSecureClient = &http.Client{ - Timeout: 5 * time.Minute} +var DefaultSecureClient = &http.Client{} // NewClientFromConfig creates a new Client that uses the endpoints in // the given cluster. @@ -87,6 +92,7 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) { Scheme: ctrlURL.Scheme, APIHost: ctrlURL.Host, Insecure: cluster.TLS.Insecure, + Timeout: 5 * time.Minute, }, nil } @@ -116,6 +122,7 @@ func NewClientFromEnv() *Client { AuthToken: os.Getenv("ARVADOS_API_TOKEN"), Insecure: insecure, KeepServiceURIs: svcs, + Timeout: 5 * time.Minute, } } @@ -131,11 +138,12 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { } if req.Header.Get("X-Request-Id") == "" { - reqid, _ := req.Context().Value(contextKeyRequestID{}).(string) - if reqid == "" { - reqid, _ = c.context().Value(contextKeyRequestID{}).(string) - } - if reqid == "" { + var reqid string + if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" { + reqid = ctxreqid + } else if c.defaultRequestID != "" { + reqid = c.defaultRequestID + } else { reqid = reqIDGen.Next() } if req.Header == nil { @@ -144,7 +152,36 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { req.Header.Set("X-Request-Id", reqid) } } - return c.httpClient().Do(req) + var cancel context.CancelFunc + if c.Timeout > 0 { + ctx := req.Context() + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout)) + req = req.WithContext(ctx) + } + resp, err := c.httpClient().Do(req) + if err == nil && cancel != nil { + // We need to call cancel() eventually, but we can't + // use "defer cancel()" because the context has to + // stay alive until the caller has finished reading + // the response body. + resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel} + } else if cancel != nil { + cancel() + } + return resp, err +} + +// cancelOnClose calls a provided CancelFunc when its wrapped +// ReadCloser's Close() method is called. +type cancelOnClose struct { + io.ReadCloser + cancel context.CancelFunc +} + +func (coc cancelOnClose) Close() error { + err := coc.ReadCloser.Close() + coc.cancel() + return err } func isRedirectStatus(code int) bool { @@ -266,7 +303,7 @@ func anythingToValues(params interface{}) (url.Values, error) { // // path must not contain a query string. func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error { - return c.RequestAndDecodeContext(c.context(), dst, method, path, body, params) + return c.RequestAndDecodeContext(context.Background(), dst, method, path, body, params) } func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, method, path string, body io.Reader, params interface{}) error { @@ -332,17 +369,10 @@ func (c *Client) UpdateBody(rsc resource) io.Reader { // header. func (c *Client) WithRequestID(reqid string) *Client { cc := *c - cc.ctx = ContextWithRequestID(cc.context(), reqid) + cc.defaultRequestID = reqid return &cc } -func (c *Client) context() context.Context { - if c.ctx == nil { - return context.Background() - } - return c.ctx -} - func (c *Client) httpClient() *http.Client { switch { case c.Client != nil: diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go index 3af7479202..da1710374e 100644 --- a/sdk/go/arvados/keep_service.go +++ b/sdk/go/arvados/keep_service.go @@ -141,20 +141,20 @@ func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error } // Index returns an unsorted list of blocks at the given mount point. -func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) { - return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix)) +func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) { + return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix)) } // Index returns an unsorted list of blocks that can be retrieved from // this server. -func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) { - return s.index(c, s.url("index/"+prefix)) +func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) { + return s.index(ctx, c, s.url("index/"+prefix)) } -func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) { - req, err := http.NewRequest("GET", url, nil) +func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { - return nil, fmt.Errorf("NewRequest(%v): %v", url, err) + return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err) } resp, err := c.Do(req) if err != nil { diff --git a/sdk/go/arvados/keep_service_test.go b/sdk/go/arvados/keep_service_test.go index 8715f74f0b..3a82f4b7ee 100644 --- a/sdk/go/arvados/keep_service_test.go +++ b/sdk/go/arvados/keep_service_test.go @@ -5,6 +5,7 @@ package arvados import ( + "context" "net/http" check "gopkg.in/check.v1" @@ -22,6 +23,6 @@ func (*KeepServiceSuite) TestIndexTimeout(c *check.C) { APIHost: "zzzzz.arvadosapi.com", AuthToken: "xyzzy", } - _, err := (&KeepService{}).IndexMount(client, "fake", "") + _, err := (&KeepService{}).IndexMount(context.Background(), client, "fake", "") c.Check(err, check.ErrorMatches, `.*timeout.*`) } diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 3c35d304cb..2617146056 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -6,6 +6,7 @@ package main import ( "bytes" + "context" "crypto/md5" "fmt" "io" @@ -126,6 +127,13 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp // succeed in clearing existing trash lists. nextRunOptions.SafeRendezvousState = rs } + + // Indexing and sending trash/pull lists can take a long time + // on a big site. Prefer a long timeout (causing slow recovery + // from undetected network problems) to a short timeout + // (causing starvation via perpetual timeout/restart cycle). + client.Timeout = 24 * time.Hour + if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { return } @@ -305,6 +313,9 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { // // It encodes the resulting information in BlockStateMap. func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer bal.time("get_state", "wall clock time to get current state")() bal.BlockStateMap = NewBlockStateMap() @@ -348,12 +359,13 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro go func(mounts []*KeepMount) { defer wg.Done() bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService) - idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "") + idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "") if err != nil { select { case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err): default: } + cancel() return } if len(errs) > 0 { @@ -391,6 +403,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro } for range collQ { } + cancel() return } bal.collScanned++ @@ -422,6 +435,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro case errs <- err: default: } + cancel() } }() -- 2.30.2