16480: Merge branch 'master'
authorTom Clegg <tom@tomclegg.ca>
Mon, 22 Jun 2020 15:35:35 +0000 (11:35 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 22 Jun 2020 15:35:35 +0000 (11:35 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

15 files changed:
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/controller/federation_test.go
lib/controller/handler_test.go
lib/controller/proxy.go
lib/recovercollection/cmd.go
sdk/go/arvados/client.go
sdk/go/arvados/config.go
sdk/go/arvados/keep_service.go
sdk/go/arvados/keep_service_test.go
services/keep-balance/balance.go
services/keep-balance/collection.go
services/keep-balance/collection_test.go
services/keep-balance/keep_service.go

index b9bc9c2c5ce4a28eb25015961b687cea449d503a..907acdc87847f9c052aee71c5e1d1fbe8c4f78aa 100644 (file)
@@ -440,6 +440,13 @@ Clusters:
       # or omitted, pages are processed serially.
       BalanceCollectionBuffers: 1000
 
+      # Maximum time for a rebalancing run. This ensures keep-balance
+      # eventually gives up and retries if, for example, a network
+      # error causes a hung connection that is never closed by the
+      # OS. It should be long enough that it doesn't interrupt a
+      # long-running balancing operation.
+      BalanceTimeout: 6h
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index 0ad4222f551ba1220d2459f4330e9a1e05240d44..d6b02b750de122582e35a5aa34b508861106ac40 100644 (file)
@@ -102,6 +102,7 @@ var whitelist = map[string]bool{
        "Collections.WebDAVCache":                      false,
        "Collections.BalanceCollectionBatch":           false,
        "Collections.BalancePeriod":                    false,
+       "Collections.BalanceTimeout":                   false,
        "Collections.BlobMissingReport":                false,
        "Collections.BalanceCollectionBuffers":         false,
        "Containers":                                   true,
index 758dc2677cf233b0d4d61462e7ec73d607f69174..96da19dfcdc14c6e20f0d1ea348c2423f909b1ba 100644 (file)
@@ -446,6 +446,13 @@ Clusters:
       # or omitted, pages are processed serially.
       BalanceCollectionBuffers: 1000
 
+      # Maximum time for a rebalancing run. This ensures keep-balance
+      # eventually gives up and retries if, for example, a network
+      # error causes a hung connection that is never closed by the
+      # OS. It should be long enough that it doesn't interrupt a
+      # long-running balancing operation.
+      BalanceTimeout: 6h
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index 2b0cb22b04fbed0fedcb282c4269dbb008bff1a5..6a9ad8c15f3db2132bf5c122d8ae639764dbfff7 100644 (file)
@@ -64,6 +64,7 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
        cluster.TLS.Insecure = true
        cluster.API.MaxItemsPerResponse = 1000
        cluster.API.MaxRequestAmplification = 4
+       cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
        arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
        arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
        s.testHandler = &Handler{Cluster: cluster}
index c7bce97130bfb0e4b327d3d2233a41d9c9c3b73d..ef6b9195f10be05b1dd69bcbedda800df66dfdb3 100644 (file)
@@ -52,6 +52,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
                PostgreSQL:       integrationTestCluster().PostgreSQL,
                ForceLegacyAPI14: forceLegacyAPI14,
        }
+       s.cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
        s.cluster.TLS.Insecure = true
        arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
        arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
index 939868a17b94f132644e3459292290294514e84f..d7381860ea422299406e0a38e726f6d09bb38481 100644 (file)
@@ -77,9 +77,7 @@ func (p *proxy) Do(
                Header: hdrOut,
                Body:   reqIn.Body,
        }).WithContext(reqIn.Context())
-
-       resp, err := client.Do(reqOut)
-       return resp, err
+       return client.Do(reqOut)
 }
 
 // Copy a response (or error) to the downstream client
index cea4607c98fe533fec8b37f839f1f641ce3fcccb..da466c31ca7d2a3a4a13f23fc92177d04ef087ec 100644 (file)
@@ -204,8 +204,8 @@ var errNotFound = errors.New("not found")
 
 // Finds the timestamp of the newest copy of blk on svc. Returns
 // errNotFound if blk is not on svc at all.
-func (rcvr recoverer) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
-       found, err := svc.Index(rcvr.client, blk)
+func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
+       found, err := svc.Index(ctx, rcvr.client, blk)
        if err != nil {
                logger.WithError(err).Warn("error getting index")
                return time.Time{}, err
@@ -236,7 +236,7 @@ var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -
 // saved. But if the block's timestamp is more recent than blobsigttl,
 // keepstore will refuse to trash it even if told to by keep-balance.
 func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
-       if latest, err := rcvr.newestMtime(logger, blk, svc); err != nil {
+       if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil {
                return err
        } else if latest.Add(blobsigttl).After(blobsigexp) {
                return nil
@@ -245,7 +245,7 @@ func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger,
                return fmt.Errorf("error updating timestamp: %s", err)
        }
        logger.Debug("updated timestamp")
-       if latest, err := rcvr.newestMtime(logger, blk, svc); err == errNotFound {
+       if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound {
                return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
        } else if err != nil {
                return err
index 1e2c07e867e84d6d6719fdd4f9298b005a86c6e9..562c8c1e7d7c66528a2ce0874eca034c9eb7b328 100644 (file)
@@ -57,9 +57,16 @@ type Client struct {
        // HTTP headers to add/override in outgoing requests.
        SendHeader http.Header
 
+       // Timeout for requests. NewClientFromConfig and
+       // NewClientFromEnv return a Client with a default 5 minute
+       // timeout.  To disable this timeout and rely on each
+       // http.Request's context deadline instead, set Timeout to
+       // zero.
+       Timeout time.Duration
+
        dd *DiscoveryDocument
 
-       ctx context.Context
+       defaultRequestID string
 }
 
 // The default http.Client used by a Client with Insecure==true and
@@ -67,12 +74,10 @@ type Client struct {
 var InsecureHTTPClient = &http.Client{
        Transport: &http.Transport{
                TLSClientConfig: &tls.Config{
-                       InsecureSkipVerify: true}},
-       Timeout: 5 * time.Minute}
+                       InsecureSkipVerify: true}}}
 
 // The default http.Client used by a Client otherwise.
-var DefaultSecureClient = &http.Client{
-       Timeout: 5 * time.Minute}
+var DefaultSecureClient = &http.Client{}
 
 // NewClientFromConfig creates a new Client that uses the endpoints in
 // the given cluster.
@@ -87,6 +92,7 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
                Scheme:   ctrlURL.Scheme,
                APIHost:  ctrlURL.Host,
                Insecure: cluster.TLS.Insecure,
+               Timeout:  5 * time.Minute,
        }, nil
 }
 
@@ -116,6 +122,7 @@ func NewClientFromEnv() *Client {
                AuthToken:       os.Getenv("ARVADOS_API_TOKEN"),
                Insecure:        insecure,
                KeepServiceURIs: svcs,
+               Timeout:         5 * time.Minute,
        }
 }
 
@@ -131,11 +138,12 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        }
 
        if req.Header.Get("X-Request-Id") == "" {
-               reqid, _ := req.Context().Value(contextKeyRequestID{}).(string)
-               if reqid == "" {
-                       reqid, _ = c.context().Value(contextKeyRequestID{}).(string)
-               }
-               if reqid == "" {
+               var reqid string
+               if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+                       reqid = ctxreqid
+               } else if c.defaultRequestID != "" {
+                       reqid = c.defaultRequestID
+               } else {
                        reqid = reqIDGen.Next()
                }
                if req.Header == nil {
@@ -144,7 +152,36 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
                        req.Header.Set("X-Request-Id", reqid)
                }
        }
-       return c.httpClient().Do(req)
+       var cancel context.CancelFunc
+       if c.Timeout > 0 {
+               ctx := req.Context()
+               ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
+               req = req.WithContext(ctx)
+       }
+       resp, err := c.httpClient().Do(req)
+       if err == nil && cancel != nil {
+               // We need to call cancel() eventually, but we can't
+               // use "defer cancel()" because the context has to
+               // stay alive until the caller has finished reading
+               // the response body.
+               resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
+       } else if cancel != nil {
+               cancel()
+       }
+       return resp, err
+}
+
+// cancelOnClose calls a provided CancelFunc when its wrapped
+// ReadCloser's Close() method is called.
+type cancelOnClose struct {
+       io.ReadCloser
+       cancel context.CancelFunc
+}
+
+func (coc cancelOnClose) Close() error {
+       err := coc.ReadCloser.Close()
+       coc.cancel()
+       return err
 }
 
 func isRedirectStatus(code int) bool {
@@ -266,7 +303,7 @@ func anythingToValues(params interface{}) (url.Values, error) {
 //
 // path must not contain a query string.
 func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
-       return c.RequestAndDecodeContext(c.context(), dst, method, path, body, params)
+       return c.RequestAndDecodeContext(context.Background(), dst, method, path, body, params)
 }
 
 func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, method, path string, body io.Reader, params interface{}) error {
@@ -332,17 +369,10 @@ func (c *Client) UpdateBody(rsc resource) io.Reader {
 // header.
 func (c *Client) WithRequestID(reqid string) *Client {
        cc := *c
-       cc.ctx = ContextWithRequestID(cc.context(), reqid)
+       cc.defaultRequestID = reqid
        return &cc
 }
 
-func (c *Client) context() context.Context {
-       if c.ctx == nil {
-               return context.Background()
-       }
-       return c.ctx
-}
-
 func (c *Client) httpClient() *http.Client {
        switch {
        case c.Client != nil:
index 029e223218b2a5136b8eac2238b088e2ce4fb983..a54712f330ea2b1ff2a6b8107daec5639c082c32 100644 (file)
@@ -126,6 +126,7 @@ type Cluster struct {
                BalancePeriod            Duration
                BalanceCollectionBatch   int
                BalanceCollectionBuffers int
+               BalanceTimeout           Duration
 
                WebDAVCache WebDAVCacheConfig
        }
index 3af7479202efb46df838e48246ccf9c2f0b5b100..da1710374e1e69ca4bfe6c2f77c8b990a1f7dc4e 100644 (file)
@@ -141,20 +141,20 @@ func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error
 }
 
 // Index returns an unsorted list of blocks at the given mount point.
-func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
-       return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+       return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
 }
 
 // Index returns an unsorted list of blocks that can be retrieved from
 // this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
-       return s.index(c, s.url("index/"+prefix))
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+       return s.index(ctx, c, s.url("index/"+prefix))
 }
 
-func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
-       req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
+       req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
        if err != nil {
-               return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+               return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
        }
        resp, err := c.Do(req)
        if err != nil {
index 8715f74f0bd6ec24abf72f1622c55f0f3d9cd76f..3a82f4b7ee2f0fac31f86ecd338037319c429f3a 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "context"
        "net/http"
 
        check "gopkg.in/check.v1"
@@ -22,6 +23,6 @@ func (*KeepServiceSuite) TestIndexTimeout(c *check.C) {
                APIHost:   "zzzzz.arvadosapi.com",
                AuthToken: "xyzzy",
        }
-       _, err := (&KeepService{}).IndexMount(client, "fake", "")
+       _, err := (&KeepService{}).IndexMount(context.Background(), client, "fake", "")
        c.Check(err, check.ErrorMatches, `.*timeout.*`)
 }
index 3c35d304cb395cf97485cd462f0f78ae31b523d6..86423a2976b1e0909470bd563c486aee894743af 100644 (file)
@@ -6,6 +6,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "fmt"
        "io"
@@ -71,6 +72,9 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 
        defer bal.time("sweep", "wall clock time to run one full sweep")()
 
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+       defer cancel()
+
        var lbFile *os.File
        if bal.LostBlocksFile != "" {
                tmpfn := bal.LostBlocksFile + ".tmp"
@@ -111,13 +115,21 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
        if err = bal.CheckSanityEarly(client); err != nil {
                return
        }
+
+       // On a big site, indexing and sending trash/pull lists can
+       // take much longer than the usual 5 minute client
+       // timeout. From here on, we rely on the context deadline
+       // instead, aborting the entire operation if any part takes
+       // too long.
+       client.Timeout = 0
+
        rs := bal.rendezvousState()
        if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
                if runOptions.SafeRendezvousState != "" {
                        bal.logf("notice: KeepServices list has changed since last run")
                }
                bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
-               if err = bal.ClearTrashLists(client); err != nil {
+               if err = bal.ClearTrashLists(ctx, client); err != nil {
                        return
                }
                // The current rendezvous state becomes "safe" (i.e.,
@@ -126,7 +138,8 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
                // succeed in clearing existing trash lists.
                nextRunOptions.SafeRendezvousState = rs
        }
-       if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
+
+       if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
        bal.ComputeChangeSets()
@@ -146,14 +159,14 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
                lbFile = nil
        }
        if runOptions.CommitPulls {
-               err = bal.CommitPulls(client)
+               err = bal.CommitPulls(ctx, client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
        if runOptions.CommitTrash {
-               err = bal.CommitTrash(client)
+               err = bal.CommitTrash(ctx, client)
        }
        return
 }
@@ -286,11 +299,11 @@ func (bal *Balancer) rendezvousState() string {
 // We avoid this problem if we clear all trash lists before getting
 // indexes. (We also assume there is only one rebalancing process
 // running at a time.)
-func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
        for _, srv := range bal.KeepServices {
                srv.ChangeSet = &ChangeSet{}
        }
-       return bal.CommitTrash(c)
+       return bal.CommitTrash(ctx, c)
 }
 
 // GetCurrentState determines the current replication state, and the
@@ -304,7 +317,10 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 // collection manifests in the database (API server).
 //
 // It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
+func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+
        defer bal.time("get_state", "wall clock time to get current state")()
        bal.BlockStateMap = NewBlockStateMap()
 
@@ -348,12 +364,13 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                go func(mounts []*KeepMount) {
                        defer wg.Done()
                        bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
-                       idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+                       idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
                        if err != nil {
                                select {
                                case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
                                default:
                                }
+                               cancel()
                                return
                        }
                        if len(errs) > 0 {
@@ -391,6 +408,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                                }
                                for range collQ {
                                }
+                               cancel()
                                return
                        }
                        bal.collScanned++
@@ -402,7 +420,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
        wg.Add(1)
        go func() {
                defer wg.Done()
-               err = EachCollection(c, pageSize,
+               err = EachCollection(ctx, c, pageSize,
                        func(coll arvados.Collection) error {
                                collQ <- coll
                                if len(errs) > 0 {
@@ -422,6 +440,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                        case errs <- err:
                        default:
                        }
+                       cancel()
                }
        }()
 
@@ -1084,22 +1103,22 @@ func (bal *Balancer) CheckSanityLate() error {
 // keepstore servers. This has the effect of increasing replication of
 // existing blocks that are either underreplicated or poorly
 // distributed according to rendezvous hashing.
-func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
        defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
        return bal.commitAsync(c, "send pull list",
                func(srv *KeepService) error {
-                       return srv.CommitPulls(c)
+                       return srv.CommitPulls(ctx, c)
                })
 }
 
 // CommitTrash sends the computed lists of trash requests to the
 // keepstore servers. This has the effect of deleting blocks that are
 // overreplicated or unreferenced.
-func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
        defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
        return bal.commitAsync(c, "send trash list",
                func(srv *KeepService) error {
-                       return srv.CommitTrash(c)
+                       return srv.CommitTrash(ctx, c)
                })
 }
 
index c4ddc90c419ae7e0d9c4d1b5cbadd9011f2a31fd..1659918cafe20c62162abfb1e841a40f80170c0a 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "context"
        "fmt"
        "time"
 
@@ -30,7 +31,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
 //
 // If pageSize > 0 it is used as the maximum page size in each API
 // call; otherwise the maximum allowed page size is requested.
-func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
        if progress == nil {
                progress = func(_, _ int) {}
        }
@@ -75,7 +76,7 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
        for {
                progress(callCount, expectCount)
                var page arvados.CollectionList
-               err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+               err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
                if err != nil {
                        return err
                }
index f8921c294afa075f290c2db6fd352b315d25e8ac..3ab9d07b2e2ed6bcc7220ae17aad4e6e7a665855 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "context"
        "sync"
        "time"
 
@@ -29,7 +30,7 @@ func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
                        longestStreak := 0
                        var lastMod time.Time
                        sawUUID := make(map[string]bool)
-                       err := EachCollection(s.client, pageSize, func(c arvados.Collection) error {
+                       err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
                                if c.ModifiedAt.IsZero() {
                                        return nil
                                }
index e2adf1a4b79942b9457beb2ccd31df31abbb96b9..17f8418f622f992a7025db9b9214e60c5a39f2ca 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "io"
@@ -35,19 +36,19 @@ func (srv *KeepService) URLBase() string {
 
 // CommitPulls sends the current list of pull requests to the storage
 // server (even if the list is empty).
-func (srv *KeepService) CommitPulls(c *arvados.Client) error {
-       return srv.put(c, "pull", srv.ChangeSet.Pulls)
+func (srv *KeepService) CommitPulls(ctx context.Context, c *arvados.Client) error {
+       return srv.put(ctx, c, "pull", srv.ChangeSet.Pulls)
 }
 
 // CommitTrash sends the current list of trash requests to the storage
 // server (even if the list is empty).
-func (srv *KeepService) CommitTrash(c *arvados.Client) error {
-       return srv.put(c, "trash", srv.ChangeSet.Trashes)
+func (srv *KeepService) CommitTrash(ctx context.Context, c *arvados.Client) error {
+       return srv.put(ctx, c, "trash", srv.ChangeSet.Trashes)
 }
 
 // Perform a PUT request at path, with data (as JSON) in the request
 // body.
-func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+func (srv *KeepService) put(ctx context.Context, c *arvados.Client, path string, data interface{}) error {
        // We'll start a goroutine to do the JSON encoding, so we can
        // stream it to the http client through a Pipe, rather than
        // keeping the entire encoded version in memory.
@@ -64,7 +65,7 @@ func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) er
        }()
 
        url := srv.URLBase() + "/" + path
-       req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+       req, err := http.NewRequestWithContext(ctx, "PUT", url, ioutil.NopCloser(jsonR))
        if err != nil {
                return fmt.Errorf("building request for %s: %v", url, err)
        }