Merge branch 'master' into 13937-keepstore-prometheus
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 6 Mar 2019 21:09:28 +0000 (18:09 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 6 Mar 2019 21:09:28 +0000 (18:09 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

1  2 
services/keepstore/azure_blob_volume.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/volume_test.go

index 9b957815c84bfe7d3c41425646cf147fa38c4098,4f7339facf4ace001ac886a5076afc217e040c18..66956b89ee83928261bc67dcedba075c20b78397
@@@ -23,7 -23,6 +23,7 @@@ import 
  
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/Azure/azure-sdk-for-go/storage"
 +      "github.com/prometheus/client_golang/prometheus"
  )
  
  const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
@@@ -148,7 -147,7 +148,7 @@@ func (v *AzureBlobVolume) Type() strin
  }
  
  // Start implements Volume.
 -func (v *AzureBlobVolume) Start() error {
 +func (v *AzureBlobVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
        if v.ContainerName == "" {
                return errors.New("no container name given")
        }
        } else if !ok {
                return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
 +      // Set up prometheus metrics
 +      lbls := prometheus.Labels{"device_id": v.DeviceID()}
 +      v.container.stats.opsCounters = opsCounters.MustCurryWith(lbls)
 +      v.container.stats.errCounters = errCounters.MustCurryWith(lbls)
 +      v.container.stats.ioBytes = ioBytes.MustCurryWith(lbls)
 +
        return nil
  }
  
@@@ -610,6 -603,9 +610,9 @@@ func (v *AzureBlobVolume) translateErro
        switch {
        case err == nil:
                return err
+       case strings.Contains(err.Error(), "StatusCode=503"):
+               // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
+               return VolumeBusyError
        case strings.Contains(err.Error(), "Not Found"):
                // "storage: service returned without a response body (404 Not Found)"
                return os.ErrNotExist
@@@ -731,7 -727,6 +734,7 @@@ type azureContainer struct 
  }
  
  func (c *azureContainer) Exists() (bool, error) {
 +      c.stats.TickOps("exists")
        c.stats.Tick(&c.stats.Ops)
        ok, err := c.ctr.Exists()
        c.stats.TickErr(err)
  }
  
  func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
 +      c.stats.TickOps("get_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetMetadata(nil)
  }
  
  func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
 +      c.stats.TickOps("get_properties")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetProperties(nil)
  }
  
  func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
 +      c.stats.TickOps("get")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.Get(nil)
  }
  
  func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
 +      c.stats.TickOps("get_range")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
@@@ -794,7 -785,6 +797,7 @@@ func (r *readerWithAzureLen) Len() int 
  }
  
  func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
 +      c.stats.TickOps("create")
        c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
        if size != 0 {
                rdr = &readerWithAzureLen{
  }
  
  func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
 +      c.stats.TickOps("set_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        b.Metadata = m
  }
  
  func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
 +      c.stats.TickOps("list")
        c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
        resp, err := c.ctr.ListBlobs(params)
        c.stats.TickErr(err)
  }
  
  func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
 +      c.stats.TickOps("delete")
        c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.Delete(opts)
index cbfc0bcdab992cf4d729cc7c1a1d2f03effe538c,32b360b1276940c9da69bc4b44b02785ffefc97f..ad907ef10138f213e3831223d867fd3c114736d9
@@@ -28,7 -28,6 +28,7 @@@ import 
  
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 +      "github.com/prometheus/client_golang/prometheus"
  )
  
  var testCluster = &arvados.Cluster{
@@@ -50,6 -49,7 +50,7 @@@ type RequestTester struct 
  //   - permissions on, authenticated request, unsigned locator
  //   - permissions on, unauthenticated request, signed locator
  //   - permissions on, authenticated request, expired locator
+ //   - permissions on, authenticated request, signed locator, transient error from backend
  //
  func TestGetHandler(t *testing.T) {
        defer teardown()
        ExpectStatusCode(t,
                "Authenticated request, expired locator",
                ExpiredError.HTTPCode, response)
+       // Authenticated request, signed locator
+       // => 503 Server busy (transient error)
+       // Set up the block owning volume to respond with errors
+       vols[0].(*MockVolume).Bad = true
+       vols[0].(*MockVolume).BadVolumeError = VolumeBusyError
+       response = IssueRequest(&RequestTester{
+               method:   "GET",
+               uri:      signedLocator,
+               apiToken: knownToken,
+       })
+       // A transient error from one volume while the other doesn't find the block
+       // should make the service return a 503 so that clients can retry.
+       ExpectStatusCode(t,
+               "Volume backend busy",
+               503, response)
  }
  
  // Test PutBlockHandler on the following situations:
@@@ -828,7 -845,7 +846,7 @@@ func IssueRequest(rt *RequestTester) *h
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
 -      loggingRouter := MakeRESTRouter(testCluster)
 +      loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
  }
@@@ -840,7 -857,7 +858,7 @@@ func IssueHealthCheckRequest(rt *Reques
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "Bearer "+rt.apiToken)
        }
 -      loggingRouter := MakeRESTRouter(testCluster)
 +      loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
  }
@@@ -980,7 -997,7 +998,7 @@@ func TestGetHandlerClientDisconnect(t *
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
 -              MakeRESTRouter(testCluster).ServeHTTP(resp, req)
 +              MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
  
index 7da9f69adbae4c5fa2f21fe24128d66355bd7838,2a1bbc972ffa6e4fe0675291b0c923efc4d4ac8d..51dd73a513c1d4c729a6743aaabe0cefa1202c4b
@@@ -24,7 -24,6 +24,7 @@@ import 
        "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "github.com/gorilla/mux"
 +      "github.com/prometheus/client_golang/prometheus"
  )
  
  type router struct {
        limiter     httpserver.RequestCounter
        cluster     *arvados.Cluster
        remoteProxy remoteProxy
 +      metrics     *nodeMetrics
  }
  
  // MakeRESTRouter returns a new router that forwards all Keep requests
  // to the appropriate handlers.
 -func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
 +func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
        rtr := &router{
                Router:  mux.NewRouter(),
                cluster: cluster,
 +              metrics: &nodeMetrics{reg: reg},
        }
  
        rtr.HandleFunc(
        rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
  
        rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
 +      rtr.metrics.setupBufferPoolMetrics(bufs)
 +      rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
 +      rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
 +      rtr.metrics.setupRequestMetrics(rtr.limiter)
  
 -      instrumented := httpserver.Instrument(nil, nil,
 +      instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
                httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
        return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
  }
@@@ -675,6 -668,11 +675,11 @@@ func GetBlock(ctx context.Context, has
                        if !os.IsNotExist(err) {
                                log.Printf("%s: Get(%s): %s", vol, hash, err)
                        }
+                       // If some volume returns a transient error, return it to the caller
+                       // instead of "Not found" so it can retry.
+                       if err == VolumeBusyError {
+                               errorToCaller = err.(*KeepError)
+                       }
                        continue
                }
                // Check the file checksum.
index fb1e1ea54516ef9375a0ea8aad91938ea84f3e7f,a6c8cd99545c24fdc2a56f6c2ff1866682a6ed6d..fcbdddacb1d585e995c8f23a0528be2ce8c1723c
@@@ -18,7 -18,6 +18,7 @@@ import 
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/coreos/go-systemd/daemon"
 +      "github.com/prometheus/client_golang/prometheus"
  )
  
  var version = "dev"
@@@ -51,6 -50,7 +51,7 @@@ var 
        DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
        ExpiredError        = &KeepError{401, "Expired permission signature"}
        NotFoundError       = &KeepError{404, "Not Found"}
+       VolumeBusyError     = &KeepError{503, "Volume backend busy"}
        GenericError        = &KeepError{500, "Fail"}
        FullError           = &KeepError{503, "Full"}
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
@@@ -121,9 -121,7 +122,9 @@@ func main() 
  
        log.Printf("keepstore %s started", version)
  
 -      err = theConfig.Start()
 +      metricsRegistry := prometheus.NewRegistry()
 +
 +      err = theConfig.Start(metricsRegistry)
        if err != nil {
                log.Fatal(err)
        }
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
  
        // Middleware/handler stack
 -      router := MakeRESTRouter(cluster)
 +      router := MakeRESTRouter(cluster, metricsRegistry)
  
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
index fd1a56c5eb5cd6412e6703141306c29693738e93,046f3fac2e0c8c27081c22fea69a0aae7f02acda..df6a09e3ab56fbd80f6776c20cdb881e83df9233
@@@ -15,8 -15,6 +15,8 @@@ import 
        "strings"
        "sync"
        "time"
 +
 +      "github.com/prometheus/client_golang/prometheus"
  )
  
  // A TestableVolume allows test suites to manipulate the state of an
@@@ -42,7 -40,8 +42,8 @@@ type MockVolume struct 
        Timestamps map[string]time.Time
  
        // Bad volumes return an error for every operation.
-       Bad bool
+       Bad            bool
+       BadVolumeError error
  
        // Touchable volumes' Touch() method succeeds for a locator
        // that has been Put().
@@@ -106,7 -105,7 +107,7 @@@ func (v *MockVolume) Compare(ctx contex
        v.gotCall("Compare")
        <-v.Gate
        if v.Bad {
-               return errors.New("Bad volume")
+               return v.BadVolumeError
        } else if block, ok := v.Store[loc]; ok {
                if fmt.Sprintf("%x", md5.Sum(block)) != loc {
                        return DiskHashError
@@@ -124,7 -123,7 +125,7 @@@ func (v *MockVolume) Get(ctx context.Co
        v.gotCall("Get")
        <-v.Gate
        if v.Bad {
-               return 0, errors.New("Bad volume")
+               return 0, v.BadVolumeError
        } else if block, ok := v.Store[loc]; ok {
                copy(buf[:len(block)], block)
                return len(block), nil
@@@ -136,7 -135,7 +137,7 @@@ func (v *MockVolume) Put(ctx context.Co
        v.gotCall("Put")
        <-v.Gate
        if v.Bad {
-               return errors.New("Bad volume")
+               return v.BadVolumeError
        }
        if v.Readonly {
                return MethodDisabledError
@@@ -164,7 -163,7 +165,7 @@@ func (v *MockVolume) Mtime(loc string) 
        var mtime time.Time
        var err error
        if v.Bad {
-               err = errors.New("Bad volume")
+               err = v.BadVolumeError
        } else if t, ok := v.Timestamps[loc]; ok {
                mtime = t
        } else {
@@@ -213,7 -212,7 +214,7 @@@ func (v *MockVolume) Type() string 
        return "Mock"
  }
  
 -func (v *MockVolume) Start() error {
 +func (v *MockVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
        return nil
  }