import (
"context"
"database/sql"
+ "fmt"
+ "net"
"sync"
"time"
)
var (
- TrashSweep = &DBLocker{key: 10001}
- ContainerLogSweep = &DBLocker{key: 10002}
- retryDelay = 5 * time.Second
+ TrashSweep = &DBLocker{key: 10001}
+ ContainerLogSweep = &DBLocker{key: 10002}
+ KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
+ KeepBalanceActive = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+ Dispatch = &DBLocker{key: 10005} // any dispatcher running
+ retryDelay = 5 * time.Second
)
// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
}
// Lock acquires the advisory lock, waiting/reconnecting if needed.
-func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
- logger := ctxlog.FromContext(ctx)
+//
+// Returns false if ctx is canceled before the lock is acquired.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
+ logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
+ var lastHeldBy string
for ; ; time.Sleep(retryDelay) {
dbl.mtx.Lock()
if dbl.conn != nil {
dbl.mtx.Unlock()
continue
}
+ if ctx.Err() != nil {
+ dbl.mtx.Unlock()
+ return false
+ }
db, err := getdb(ctx)
- if err != nil {
- logger.WithError(err).Infof("error getting database pool")
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err != nil {
+ logger.WithError(err).Info("error getting database pool")
dbl.mtx.Unlock()
continue
}
conn, err := db.Conn(ctx)
- if err != nil {
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err != nil {
logger.WithError(err).Info("error getting database connection")
dbl.mtx.Unlock()
continue
}
var locked bool
err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
- if err != nil {
- logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
+ if err == context.Canceled {
+ return false
+ } else if err != nil {
+ logger.WithError(err).Info("error getting pg_try_advisory_lock")
conn.Close()
dbl.mtx.Unlock()
continue
}
if !locked {
+ var host string
+ var port int
+ err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
+ (SELECT pid FROM pg_locks
+ WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
+ if err != nil {
+ logger.WithError(err).Info("error getting other client info")
+ } else {
+ heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
+ if lastHeldBy != heldBy {
+ logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
+ lastHeldBy = heldBy
+ }
+ }
conn.Close()
dbl.mtx.Unlock()
continue
}
- logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+ logger.Debug("acquired pg_advisory_lock")
dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
dbl.mtx.Unlock()
- return
+ return true
}
}
// Check confirms that the lock is still active (i.e., the session is
// still alive), and re-acquires if needed. Panics if Lock is not
// acquired first.
-func (dbl *DBLocker) Check() {
+//
+// Returns false if the context passed to Lock() is canceled before
+// the lock is confirmed or reacquired.
+func (dbl *DBLocker) Check() bool {
dbl.mtx.Lock()
err := dbl.conn.PingContext(dbl.ctx)
- if err == nil {
- ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err == nil {
+ ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
dbl.mtx.Unlock()
- return
+ return true
}
ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
dbl.conn.Close()
dbl.conn = nil
ctx, getdb := dbl.ctx, dbl.getdb
dbl.mtx.Unlock()
- dbl.Lock(ctx, getdb)
+ return dbl.Lock(ctx, getdb)
}
func (dbl *DBLocker) Unlock() {
if dbl.conn != nil {
_, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
if err != nil {
- ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
} else {
- ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
}
dbl.conn.Close()
dbl.conn = nil
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+ "bytes"
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
+ "github.com/sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+ cluster *arvados.Cluster
+ db *sqlx.DB
+ getdb func(context.Context) (*sqlx.DB, error)
+}
+
+var testLocker = &DBLocker{key: 999}
+
+func (s *suite) SetUpSuite(c *check.C) {
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ s.cluster, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ s.db = arvadostest.DB(c, s.cluster)
+ s.getdb = func(context.Context) (*sqlx.DB, error) { return s.db, nil }
+}
+
+func (s *suite) TestLock(c *check.C) {
+ retryDelay = 10 * time.Millisecond
+
+ var logbuf bytes.Buffer
+ logger := ctxlog.New(&logbuf, "text", "debug")
+ logger.Level = logrus.DebugLevel
+ ctx := ctxlog.Context(context.Background(), logger)
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ testLocker.Lock(ctx, s.getdb)
+ testLocker.Check()
+
+ lock2 := make(chan bool)
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ testLocker2 := &DBLocker{key: 999}
+ testLocker2.Lock(ctx, s.getdb)
+ close(lock2)
+ testLocker2.Check()
+ testLocker2.Unlock()
+ }()
+
+ // Second lock should wait for first to Unlock
+ select {
+ case <-time.After(time.Second / 10):
+ c.Check(logbuf.String(), check.Matches, `(?ms).*level=info.*DBClient="[^"]+:\d+".*ID=999.*`)
+ case <-lock2:
+ c.Log("double-lock")
+ c.Fail()
+ }
+
+ testLocker.Check()
+ testLocker.Unlock()
+
+ // Now the second lock should succeed within retryDelay
+ select {
+ case <-time.After(retryDelay * 2):
+ c.Log("timed out")
+ c.Fail()
+ case <-lock2:
+ }
+ c.Logf("%s", logbuf.String())
+}
// non-nil, true, nil -- if the token is valid
func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, bool, error) {
user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
- db, err := h.db(req.Context())
+ db, err := h.dbConnector.GetDB(req.Context())
if err != nil {
ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token)
return nil, false, err
}
func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
- db, err := h.db(req.Context())
+ db, err := h.dbConnector.GetDB(req.Context())
if err != nil {
return nil, err
}
import (
"context"
- "errors"
"fmt"
"net/http"
"net/http/httptest"
"git.arvados.org/arvados.git/lib/controller/router"
"git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
"git.arvados.org/arvados.git/sdk/go/httpserver"
- "github.com/jmoiron/sqlx"
// sqlx needs lib/pq to talk to PostgreSQL
_ "github.com/lib/pq"
proxy *proxy
secureClient *http.Client
insecureClient *http.Client
- pgdb *sqlx.DB
- pgdbMtx sync.Mutex
+ dbConnector ctrlctx.DBConnector
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (h *Handler) CheckHealth() error {
h.setupOnce.Do(h.setup)
- _, err := h.db(context.TODO())
+ _, err := h.dbConnector.GetDB(context.TODO())
if err != nil {
return err
}
mux := http.NewServeMux()
healthFuncs := make(map[string]health.Func)
- oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
+ h.dbConnector = ctrlctx.DBConnector{PostgreSQL: h.Cluster.PostgreSQL}
+ oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.dbConnector.GetDB)
h.federation = federation.New(h.Cluster, &healthFuncs)
rtr := router.New(h.federation, router.Config{
MaxRequestSize: h.Cluster.API.MaxRequestSize,
WrapCalls: api.ComposeWrappers(
- ctrlctx.WrapCallsInTransactions(h.db),
+ ctrlctx.WrapCallsInTransactions(h.dbConnector.GetDB),
oidcAuthorizer.WrapCalls,
ctrlctx.WrapCallsWithAuth(h.Cluster)),
})
- healthRoutes := health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }}
+ healthRoutes := health.Routes{"ping": func() error { _, err := h.dbConnector.GetDB(context.TODO()); return err }}
for name, f := range healthFuncs {
healthRoutes[name] = f
}
go h.containerLogSweepWorker()
}
-var errDBConnection = errors.New("database connection error")
-
-func (h *Handler) db(ctx context.Context) (*sqlx.DB, error) {
- h.pgdbMtx.Lock()
- defer h.pgdbMtx.Unlock()
- if h.pgdb != nil {
- return h.pgdb, nil
- }
-
- db, err := sqlx.Open("postgres", h.Cluster.PostgreSQL.Connection.String())
- if err != nil {
- ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
- return nil, errDBConnection
- }
- if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 {
- db.SetMaxOpenConns(p)
- }
- if err := db.Ping(); err != nil {
- ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
- return nil, errDBConnection
- }
- h.pgdb = db
- return db, nil
-}
-
type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler)
func prepend(next http.Handler, middleware middlewareFunc) http.Handler {
}
func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
+ c.Assert(s.handler.CheckHealth(), check.IsNil)
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveToken)
c.Assert(err, check.IsNil)
}
func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
+ c.Assert(s.handler.CheckHealth(), check.IsNil)
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveTokenV2)
c.Assert(err, check.IsNil)
}
func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
+ c.Assert(s.handler.CheckHealth(), check.IsNil)
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil)
c.Assert(err, check.IsNil)
coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
c.Assert(err, check.IsNil)
defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
- db, err := s.handler.db(s.ctx)
+ db, err := s.handler.dbConnector.GetDB(s.ctx)
c.Assert(err, check.IsNil)
_, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
c.Assert(err, check.IsNil)
c.Assert(err, check.IsNil)
}
}
- db, err := s.handler.db(s.ctx)
+ db, err := s.handler.dbConnector.GetDB(s.ctx)
c.Assert(err, check.IsNil)
for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} {
var rows int
logger.Debugf("interval is %v, not running worker", interval)
return
}
- locker.Lock(ctx, h.db)
+ if !locker.Lock(ctx, h.dbConnector.GetDB) {
+ // context canceled
+ return
+ }
defer locker.Unlock()
for time.Sleep(interval); ctx.Err() == nil; time.Sleep(interval) {
- locker.Check()
+ if !locker.Check() {
+ // context canceled
+ return
+ }
err := run(ctx)
if err != nil {
logger.WithError(err).Infof("%s failed", workerName)
func (h *Handler) containerLogSweepWorker() {
h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
- db, err := h.db(ctx)
+ db, err := h.dbConnector.GetDB(ctx)
if err != nil {
return err
}
"sync"
"git.arvados.org/arvados.git/lib/controller/api"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/jmoiron/sqlx"
})
return txn.tx, txn.err
}
+
+var errDBConnection = errors.New("database connection error")
+
+type DBConnector struct {
+ PostgreSQL arvados.PostgreSQL
+ pgdb *sqlx.DB
+ mtx sync.Mutex
+}
+
+func (dbc *DBConnector) GetDB(ctx context.Context) (*sqlx.DB, error) {
+ dbc.mtx.Lock()
+ defer dbc.mtx.Unlock()
+ if dbc.pgdb != nil {
+ return dbc.pgdb, nil
+ }
+ db, err := sqlx.Open("postgres", dbc.PostgreSQL.Connection.String())
+ if err != nil {
+ ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
+ return nil, errDBConnection
+ }
+ if p := dbc.PostgreSQL.ConnectionPool; p > 0 {
+ db.SetMaxOpenConns(p)
+ }
+ if err := db.Ping(); err != nil {
+ ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
+ return nil, errDBConnection
+ }
+ dbc.pgdb = db
+ return db, nil
+}
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud/container"
"git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
"git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
Registry *prometheus.Registry
InstanceSetID cloud.InstanceSetID
+ dbConnector ctrlctx.DBConnector
logger logrus.FieldLogger
instanceSet cloud.InstanceSet
pool pool
func (disp *dispatcher) initialize() {
disp.logger = ctxlog.FromContext(disp.Context)
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
disp.ArvClient.AuthToken = disp.AuthToken
if err != nil {
disp.logger.Fatalf("error initializing driver: %s", err)
}
+ dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
disp.instanceSet = instanceSet
disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
}
func (disp *dispatcher) run() {
+ defer dblock.Dispatch.Unlock()
defer close(disp.stopped)
defer disp.instanceSet.Stop()
defer disp.pool.Stop()
"sync"
"time"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
MinTimeBetweenCreateCalls: time.Millisecond,
}
+ // We need the postgresql connection info from the integration
+ // test config.
+ cfg, err := config.NewLoader(nil, ctxlog.FromContext(s.ctx)).Load()
+ c.Assert(err, check.IsNil)
+ testcluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+
s.cluster = &arvados.Cluster{
ManagementToken: "test-management-token",
+ PostgreSQL: testcluster.PostgreSQL,
Containers: arvados.ContainersConfig{
CrunchRunCommand: "crunch-run",
CrunchRunArgumentsList: []string{"--foo", "--extra='args'"},
err := s.disp.CheckHealth()
c.Check(err, check.IsNil)
- select {
- case <-done:
- c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
- case <-time.After(10 * time.Second):
- c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
+ for len(waiting) > 0 {
+ waswaiting := len(waiting)
+ select {
+ case <-done:
+ // loop will end because len(waiting)==0
+ case <-time.After(3 * time.Second):
+ if len(waiting) >= waswaiting {
+ c.Fatalf("timed out; no progress in 3s while waiting for %d containers: %q", len(waiting), waiting)
+ }
+ }
}
+ c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
deadline := time.Now().Add(5 * time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
Registry *prometheus.Registry
logger logrus.FieldLogger
+ dbConnector ctrlctx.DBConnector
lsfcli lsfcli
lsfqueue lsfqueue
arvDispatcher *dispatch.Dispatcher
func (disp *dispatcher) Start() {
disp.initOnce.Do(func() {
disp.init()
+ dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
go func() {
+ defer dblock.Dispatch.Unlock()
disp.checkLsfQueueForOrphans()
err := disp.arvDispatcher.Run(disp.Context)
if err != nil {
lsfcli: &disp.lsfcli,
}
disp.ArvClient.AuthToken = disp.AuthToken
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
type Dispatcher struct {
*dispatch.Dispatcher
- logger logrus.FieldLogger
- cluster *arvados.Cluster
- sqCheck *SqueueChecker
- slurm Slurm
+ logger logrus.FieldLogger
+ cluster *arvados.Cluster
+ sqCheck *SqueueChecker
+ slurm Slurm
+ dbConnector ctrlctx.DBConnector
done chan struct{}
err error
disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
disp.Client.AuthToken = disp.cluster.SystemRootToken
disp.Client.Insecure = disp.cluster.TLS.Insecure
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.cluster.PostgreSQL}
if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
}
func (disp *Dispatcher) run() error {
+ dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
+ defer dblock.Dispatch.Unlock()
defer disp.sqCheck.Stop()
if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
// subsequent balance operation.
//
// Run should only be called once on a given Balancer object.
-//
-// Typical usage:
-//
-// runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
nextRunOptions = runOptions
+ ctxlog.FromContext(ctx).Info("acquiring active lock")
+ if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
+ // context canceled
+ return
+ }
+ defer dblock.KeepBalanceActive.Unlock()
+
defer bal.time("sweep", "wall clock time to run one full sweep")()
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
defer cancel()
var lbFile *os.File
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"io"
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err = srv.runOnce()
+ _, err = srv.runOnce(context.Background())
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err := srv.runOnce()
+ _, err := srv.runOnce(context.Background())
c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err := srv.runOnce()
+ _, err := srv.runOnce(context.Background())
c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
c.Assert(err, check.IsNil)
- _, err = srv.runOnce()
+ _, err = srv.runOnce(context.Background())
c.Check(err, check.IsNil)
lost, err := ioutil.ReadFile(lostf.Name())
c.Assert(err, check.IsNil)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- bal, err := srv.runOnce()
+ bal, err := srv.runOnce(context.Background())
c.Check(err, check.IsNil)
for _, req := range collReqs.reqs {
c.Check(req.Form.Get("include_trash"), check.Equals, "true")
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- bal, err := srv.runOnce()
+ bal, err := srv.runOnce(context.Background())
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 8)
c.Check(pullReqs.Count(), check.Equals, 4)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- stop := make(chan interface{})
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
srv := s.newServer(&opts)
done := make(chan bool)
go func() {
- srv.runForever(stop)
+ srv.runForever(ctx)
close(done)
}()
for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
time.Sleep(time.Millisecond)
}
- stop <- true
+ cancel()
<-done
c.Check(pullReqs.Count() >= 16, check.Equals, true)
c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
import (
"bytes"
+ "context"
"io"
"os"
"strings"
Logger: logger,
Metrics: newMetrics(prometheus.NewRegistry()),
}
- nextOpts, err := bal.Run(s.client, s.config, opts)
+ nextOpts, err := bal.Run(context.Background(), s.client, s.config, opts)
c.Check(err, check.IsNil)
c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
c.Check(nextOpts.CommitPulls, check.Equals, true)
Routes: health.Routes{"ping": srv.CheckHealth},
}
- go srv.run()
+ go srv.run(ctx)
return srv
}).RunCommand(prog, args, stdin, stdout, stderr)
}
package keepbalance
import (
+ "context"
"net/http"
"os"
"os/signal"
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
return nil
}
-func (srv *Server) run() {
+func (srv *Server) run(ctx context.Context) {
var err error
if srv.RunOptions.Once {
- _, err = srv.runOnce()
+ _, err = srv.runOnce(ctx)
} else {
- err = srv.runForever(nil)
+ err = srv.runForever(ctx)
}
if err != nil {
srv.Logger.Error(err)
}
}
-func (srv *Server) runOnce() (*Balancer, error) {
+func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
bal := &Balancer{
DB: srv.DB,
Logger: srv.Logger,
LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
}
var err error
- srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
+ srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
return bal, err
}
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) runForever(stop <-chan interface{}) error {
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
logger := srv.Logger
ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
sigUSR1 := make(chan os.Signal)
signal.Notify(sigUSR1, syscall.SIGUSR1)
+ logger.Info("acquiring service lock")
+ dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+ defer dblock.KeepBalanceService.Unlock()
+
logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
for {
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- _, err := srv.runOnce()
+ if !dblock.KeepBalanceService.Check() {
+ // context canceled
+ return nil
+ }
+ _, err := srv.runOnce(ctx)
if err != nil {
logger.Print("run failed: ", err)
} else {
}
select {
- case <-stop:
+ case <-ctx.Done():
signal.Stop(sigUSR1)
return nil
case <-ticker.C: