Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/prometheus/client_golang/prometheus"
)
const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
}
// Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
if v.ContainerName == "" {
return errors.New("no container name given")
}
} else if !ok {
return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
}
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.DeviceID()}
+ v.container.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+ v.container.stats.errCounters = errCounters.MustCurryWith(lbls)
+ v.container.stats.ioBytes = ioBytes.MustCurryWith(lbls)
+
return nil
}
switch {
case err == nil:
return err
+ case strings.Contains(err.Error(), "StatusCode=503"):
+ // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
+ return VolumeBusyError
case strings.Contains(err.Error(), "Not Found"):
// "storage: service returned without a response body (404 Not Found)"
return os.ErrNotExist
}
func (c *azureContainer) Exists() (bool, error) {
+ c.stats.TickOps("exists")
c.stats.Tick(&c.stats.Ops)
ok, err := c.ctr.Exists()
c.stats.TickErr(err)
}
func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
+ c.stats.TickOps("get_metadata")
c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
b := c.ctr.GetBlobReference(bname)
err := b.GetMetadata(nil)
}
func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
+ c.stats.TickOps("get_properties")
c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
b := c.ctr.GetBlobReference(bname)
err := b.GetProperties(nil)
}
func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
+ c.stats.TickOps("get")
c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
b := c.ctr.GetBlobReference(bname)
rdr, err := b.Get(nil)
}
func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
+ c.stats.TickOps("get_range")
c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
b := c.ctr.GetBlobReference(bname)
rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
}
func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
+ c.stats.TickOps("create")
c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
if size != 0 {
rdr = &readerWithAzureLen{
}
func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
+ c.stats.TickOps("set_metadata")
c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
b := c.ctr.GetBlobReference(bname)
b.Metadata = m
}
func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+ c.stats.TickOps("list")
c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
resp, err := c.ctr.ListBlobs(params)
c.stats.TickErr(err)
}
func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
+ c.stats.TickOps("delete")
c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
b := c.ctr.GetBlobReference(bname)
err := b.Delete(opts)
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "github.com/prometheus/client_golang/prometheus"
)
var testCluster = &arvados.Cluster{
// - permissions on, authenticated request, unsigned locator
// - permissions on, unauthenticated request, signed locator
// - permissions on, authenticated request, expired locator
+ // - permissions on, authenticated request, signed locator, transient error from backend
//
func TestGetHandler(t *testing.T) {
defer teardown()
ExpectStatusCode(t,
"Authenticated request, expired locator",
ExpiredError.HTTPCode, response)
+
+ // Authenticated request, signed locator
+ // => 503 Server busy (transient error)
+
+ // Set up the block owning volume to respond with errors
+ vols[0].(*MockVolume).Bad = true
+ vols[0].(*MockVolume).BadVolumeError = VolumeBusyError
+ response = IssueRequest(&RequestTester{
+ method: "GET",
+ uri: signedLocator,
+ apiToken: knownToken,
+ })
+ // A transient error from one volume while the other doesn't find the block
+ // should make the service return a 503 so that clients can retry.
+ ExpectStatusCode(t,
+ "Volume backend busy",
+ 503, response)
}
// Test PutBlockHandler on the following situations:
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter(testCluster)
+ loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
loggingRouter.ServeHTTP(response, req)
return response
}
if rt.apiToken != "" {
req.Header.Set("Authorization", "Bearer "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter(testCluster)
+ loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
loggingRouter.ServeHTTP(response, req)
return response
}
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- MakeRESTRouter(testCluster).ServeHTTP(resp, req)
+ MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
ok <- struct{}{}
}()
"git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/gorilla/mux"
+ "github.com/prometheus/client_golang/prometheus"
)
type router struct {
limiter httpserver.RequestCounter
cluster *arvados.Cluster
remoteProxy remoteProxy
+ metrics *nodeMetrics
}
// MakeRESTRouter returns a new router that forwards all Keep requests
// to the appropriate handlers.
-func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
rtr := &router{
Router: mux.NewRouter(),
cluster: cluster,
+ metrics: &nodeMetrics{reg: reg},
}
rtr.HandleFunc(
rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
+ rtr.metrics.setupBufferPoolMetrics(bufs)
+ rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
+ rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
+ rtr.metrics.setupRequestMetrics(rtr.limiter)
- instrumented := httpserver.Instrument(nil, nil,
+ instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
}
if !os.IsNotExist(err) {
log.Printf("%s: Get(%s): %s", vol, hash, err)
}
+ // If some volume returns a transient error, return it to the caller
+ // instead of "Not found" so it can retry.
+ if err == VolumeBusyError {
+ errorToCaller = err.(*KeepError)
+ }
continue
}
// Check the file checksum.
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/coreos/go-systemd/daemon"
+ "github.com/prometheus/client_golang/prometheus"
)
var version = "dev"
DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
ExpiredError = &KeepError{401, "Expired permission signature"}
NotFoundError = &KeepError{404, "Not Found"}
+ VolumeBusyError = &KeepError{503, "Volume backend busy"}
GenericError = &KeepError{500, "Fail"}
FullError = &KeepError{503, "Full"}
SizeRequiredError = &KeepError{411, "Missing Content-Length"}
log.Printf("keepstore %s started", version)
- err = theConfig.Start()
+ metricsRegistry := prometheus.NewRegistry()
+
+ err = theConfig.Start(metricsRegistry)
if err != nil {
log.Fatal(err)
}
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
// Middleware/handler stack
- router := MakeRESTRouter(cluster)
+ router := MakeRESTRouter(cluster, metricsRegistry)
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
"strings"
"sync"
"time"
+
+ "github.com/prometheus/client_golang/prometheus"
)
// A TestableVolume allows test suites to manipulate the state of an
Timestamps map[string]time.Time
// Bad volumes return an error for every operation.
- Bad bool
+ Bad bool
+ BadVolumeError error
// Touchable volumes' Touch() method succeeds for a locator
// that has been Put().
v.gotCall("Compare")
<-v.Gate
if v.Bad {
- return errors.New("Bad volume")
+ return v.BadVolumeError
} else if block, ok := v.Store[loc]; ok {
if fmt.Sprintf("%x", md5.Sum(block)) != loc {
return DiskHashError
v.gotCall("Get")
<-v.Gate
if v.Bad {
- return 0, errors.New("Bad volume")
+ return 0, v.BadVolumeError
} else if block, ok := v.Store[loc]; ok {
copy(buf[:len(block)], block)
return len(block), nil
v.gotCall("Put")
<-v.Gate
if v.Bad {
- return errors.New("Bad volume")
+ return v.BadVolumeError
}
if v.Readonly {
return MethodDisabledError
var mtime time.Time
var err error
if v.Bad {
- err = errors.New("Bad volume")
+ err = v.BadVolumeError
} else if t, ok := v.Timestamps[loc]; ok {
mtime = t
} else {
return "Mock"
}
-func (v *MockVolume) Start() error {
+func (v *MockVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
return nil
}