package main
import (
+ "context"
"encoding/json"
"fmt"
"io"
"sync"
"time"
+ "git.curoverse.com/arvados.git/lib/config"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
type runSuite struct {
stub stubServer
- config Config
+ config *arvados.Cluster
+ client *arvados.Client
+}
+
+func (s *runSuite) newServer(options *RunOptions) *Server {
+ srv := &Server{
+ Cluster: s.config,
+ ArvClient: s.client,
+ RunOptions: *options,
+ Metrics: newMetrics(),
+ Logger: options.Logger,
+ Dumper: options.Dumper,
+ }
+ srv.init(context.Background())
+ return srv
}
// make a log.Logger that writes to the current test's c.Log().
}
func (s *runSuite) SetUpTest(c *check.C) {
- s.config = Config{
- Client: arvados.Client{
- AuthToken: "xyzzy",
- APIHost: "zzzzz.arvadosapi.com",
- Client: s.stub.Start()},
- KeepServiceTypes: []string{"disk"},
- RunPeriod: arvados.Duration(time.Second),
- }
+ cfg, err := config.NewLoader(nil, nil).Load()
+ c.Assert(err, check.Equals, nil)
+ s.config, err = cfg.GetCluster("")
+ c.Assert(err, check.Equals, nil)
+
+ s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
+ arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
+
+ s.client = &arvados.Client{
+ AuthToken: "xyzzy",
+ APIHost: "zzzzz.arvadosapi.com",
+ Client: s.stub.Start()}
+
s.stub.serveDiscoveryDoc()
s.stub.logf = c.Logf
}
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- _, err = srv.Run()
+ srv := s.newServer(&opts)
+ _, err := srv.runOnce()
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
}
-func (s *runSuite) TestServiceTypes(c *check.C) {
- opts := RunOptions{
- CommitPulls: true,
- CommitTrash: true,
- Logger: s.logger(c),
- }
- s.config.KeepServiceTypes = []string{"unlisted-type"}
- s.stub.serveCurrentUserAdmin()
- s.stub.serveFooBarFileCollections()
- s.stub.serveKeepServices(stubServices)
- s.stub.serveKeepstoreMounts()
- indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
- trashReqs := s.stub.serveKeepstoreTrash()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- _, err = srv.Run()
- c.Check(err, check.IsNil)
- c.Check(indexReqs.Count(), check.Equals, 0)
- c.Check(trashReqs.Count(), check.Equals, 0)
-}
-
func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
opts := RunOptions{
CommitPulls: true,
s.stub.serveKeepstoreMounts()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- _, err = srv.Run()
+ srv := s.newServer(&opts)
+ _, err := srv.runOnce()
c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- _, err = srv.Run()
+ srv := s.newServer(&opts)
+ _, err := srv.runOnce()
c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
func (s *runSuite) TestWriteLostBlocks(c *check.C) {
lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
c.Assert(err, check.IsNil)
- s.config.LostBlocksFile = lostf.Name()
+ s.config.Collections.BlobMissingReport = lostf.Name()
defer os.Remove(lostf.Name())
opts := RunOptions{
CommitPulls: true,
s.stub.serveKeepstoreIndexFoo1()
s.stub.serveKeepstoreTrash()
s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
+ srv := s.newServer(&opts)
c.Assert(err, check.IsNil)
- _, err = srv.Run()
+ _, err = srv.runOnce()
c.Check(err, check.IsNil)
lost, err := ioutil.ReadFile(lostf.Name())
c.Assert(err, check.IsNil)
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- bal, err := srv.Run()
+ srv := s.newServer(&opts)
+ bal, err := srv.runOnce()
c.Check(err, check.IsNil)
for _, req := range collReqs.reqs {
c.Check(req.Form.Get("include_trash"), check.Equals, "true")
func (s *runSuite) TestCommit(c *check.C) {
lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
c.Assert(err, check.IsNil)
- s.config.LostBlocksFile = lostf.Name()
+ s.config.Collections.BlobMissingReport = lostf.Name()
defer os.Remove(lostf.Name())
- s.config.Listen = ":"
s.config.ManagementToken = "xyzzy"
opts := RunOptions{
CommitPulls: true,
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- bal, err := srv.Run()
+ srv := s.newServer(&opts)
+ bal, err := srv.runOnce()
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 8)
c.Check(pullReqs.Count(), check.Equals, 4)
}
func (s *runSuite) TestRunForever(c *check.C) {
- s.config.Listen = ":"
s.config.ManagementToken = "xyzzy"
opts := RunOptions{
CommitPulls: true,
pullReqs := s.stub.serveKeepstorePull()
stop := make(chan interface{})
- s.config.RunPeriod = arvados.Duration(time.Millisecond)
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
+ s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
+ srv := s.newServer(&opts)
done := make(chan bool)
go func() {
- srv.RunForever(stop)
+ srv.runForever(stop)
close(done)
}()
}
func (s *runSuite) getMetrics(c *check.C, srv *Server) string {
- resp, err := http.Get("http://" + srv.listening + "/metrics")
- c.Assert(err, check.IsNil)
- c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+ req := httptest.NewRequest("GET", "/metrics", nil)
+ resp := httptest.NewRecorder()
+ srv.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+
+ req = httptest.NewRequest("GET", "/metrics?api_token=xyzzy", nil)
+ resp = httptest.NewRecorder()
+ srv.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
- resp, err = http.Get("http://" + srv.listening + "/metrics?api_token=xyzzy")
- c.Assert(err, check.IsNil)
- c.Check(resp.StatusCode, check.Equals, http.StatusOK)
buf, err := ioutil.ReadAll(resp.Body)
c.Check(err, check.IsNil)
return string(buf)
"testing"
"time"
+ "git.curoverse.com/arvados.git/lib/config"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
var _ = check.Suite(&integrationSuite{})
type integrationSuite struct {
- config Config
+ config *arvados.Cluster
+ client *arvados.Client
keepClient *keepclient.KeepClient
}
}
func (s *integrationSuite) SetUpTest(c *check.C) {
- s.config = Config{
- Client: arvados.Client{
- APIHost: os.Getenv("ARVADOS_API_HOST"),
- AuthToken: arvadostest.DataManagerToken,
- Insecure: true,
- },
- KeepServiceTypes: []string{"disk"},
- RunPeriod: arvados.Duration(time.Second),
+ cfg, err := config.NewLoader(nil, nil).Load()
+ c.Assert(err, check.Equals, nil)
+ s.config, err = cfg.GetCluster("")
+ c.Assert(err, check.Equals, nil)
+ s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
+
+ s.client = &arvados.Client{
+ APIHost: os.Getenv("ARVADOS_API_HOST"),
+ AuthToken: arvadostest.DataManagerToken,
+ Insecure: true,
}
}
Logger: logger,
Metrics: newMetrics(),
}
- nextOpts, err := bal.Run(s.config, opts)
+ nextOpts, err := bal.Run(s.client, s.config, opts)
c.Check(err, check.IsNil)
c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
c.Check(nextOpts.CommitPulls, check.Equals, true)
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
"git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "github.com/julienschmidt/httprouter"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)
}
type Server struct {
- http.Handler
Cluster *arvados.Cluster
ArvClient *arvados.Client
RunOptions RunOptions
Metrics *metrics
+ httpHandler http.Handler
+
Logger logrus.FieldLogger
Dumper logrus.FieldLogger
}
+// ServeHTTP implements service.Handler.
+func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ srv.httpHandler.ServeHTTP(w, r)
+}
+
// CheckHealth implements service.Handler.
func (srv *Server) CheckHealth() error {
return nil
// Start sets up and runs the balancer.
func (srv *Server) Start(ctx context.Context) {
- if srv.RunOptions.Logger == nil {
- srv.RunOptions.Logger = ctxlog.FromContext(ctx)
- }
-
- srv.Logger = srv.RunOptions.Logger
- srv.Dumper = srv.RunOptions.Dumper
+ srv.init(ctx)
var err error
if srv.RunOptions.Once {
- _, err = srv.run()
+ _, err = srv.runOnce()
} else {
err = srv.runForever(nil)
}
}
}
-func (srv *Server) run() (*Balancer, error) {
+func (srv *Server) init(ctx context.Context) {
+ if srv.RunOptions.Logger == nil {
+ srv.RunOptions.Logger = ctxlog.FromContext(ctx)
+ }
+
+ srv.Logger = srv.RunOptions.Logger
+ srv.Dumper = srv.RunOptions.Dumper
+
+ if srv.Cluster.ManagementToken == "" {
+ srv.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
+ })
+ } else {
+ mux := httprouter.New()
+ metricsH := promhttp.HandlerFor(srv.Metrics.reg, promhttp.HandlerOpts{
+ ErrorLog: srv.Logger,
+ })
+ mux.Handler("GET", "/metrics", metricsH)
+ mux.Handler("GET", "/metrics.json", metricsH)
+ srv.httpHandler = auth.RequireLiteralToken(srv.Cluster.ManagementToken, mux)
+ }
+}
+
+func (srv *Server) runOnce() (*Balancer, error) {
bal := &Balancer{
Logger: srv.Logger,
Dumper: srv.Dumper,
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- _, err := srv.run()
+ _, err := srv.runOnce()
if err != nil {
logger.Print("run failed: ", err)
} else {