projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch 'main' into 19582-aws-s3v2-driver
[arvados.git]
/
services
/
keep-balance
/
server.go
diff --git
a/services/keep-balance/server.go
b/services/keep-balance/server.go
index e485f5b2061f28134306d1d897b22cb62e4190e9..fd53497f789ed4f5f1db458f99e69f8e7f10c1a7 100644
(file)
--- a/
services/keep-balance/server.go
+++ b/
services/keep-balance/server.go
@@
-5,12
+5,14
@@
package keepbalance
import (
package keepbalance
import (
+ "context"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"net/http"
"os"
"os/signal"
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
@@
-62,12
+64,12
@@
func (srv *Server) Done() <-chan struct{} {
return nil
}
return nil
}
-func (srv *Server) run() {
+func (srv *Server) run(
ctx context.Context
) {
var err error
if srv.RunOptions.Once {
var err error
if srv.RunOptions.Once {
- _, err = srv.runOnce()
+ _, err = srv.runOnce(
ctx
)
} else {
} else {
- err = srv.runForever(
nil
)
+ err = srv.runForever(
ctx
)
}
if err != nil {
srv.Logger.Error(err)
}
if err != nil {
srv.Logger.Error(err)
@@
-77,7
+79,7
@@
func (srv *Server) run() {
}
}
}
}
-func (srv *Server) runOnce() (*Balancer, error) {
+func (srv *Server) runOnce(
ctx context.Context
) (*Balancer, error) {
bal := &Balancer{
DB: srv.DB,
Logger: srv.Logger,
bal := &Balancer{
DB: srv.DB,
Logger: srv.Logger,
@@
-86,13
+88,12
@@
func (srv *Server) runOnce() (*Balancer, error) {
LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
}
var err error
LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
}
var err error
- srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
+ srv.RunOptions, err = bal.Run(
ctx,
srv.ArvClient, srv.Cluster, srv.RunOptions)
return bal, err
}
return bal, err
}
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) runForever(stop <-chan interface{}) error {
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
logger := srv.Logger
ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
logger := srv.Logger
ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
@@
-102,6
+103,10
@@
func (srv *Server) runForever(stop <-chan interface{}) error {
sigUSR1 := make(chan os.Signal)
signal.Notify(sigUSR1, syscall.SIGUSR1)
sigUSR1 := make(chan os.Signal)
signal.Notify(sigUSR1, syscall.SIGUSR1)
+ logger.Info("acquiring service lock")
+ dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+ defer dblock.KeepBalanceService.Unlock()
+
logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
for {
logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
for {
@@
-110,7
+115,11
@@
func (srv *Server) runForever(stop <-chan interface{}) error {
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- _, err := srv.runOnce()
+ if !dblock.KeepBalanceService.Check() {
+ // context canceled
+ return nil
+ }
+ _, err := srv.runOnce(ctx)
if err != nil {
logger.Print("run failed: ", err)
} else {
if err != nil {
logger.Print("run failed: ", err)
} else {
@@
-118,7
+127,7
@@
func (srv *Server) runForever(stop <-chan interface{}) error {
}
select {
}
select {
- case <-
stop
:
+ case <-
ctx.Done()
:
signal.Stop(sigUSR1)
return nil
case <-ticker.C:
signal.Stop(sigUSR1)
return nil
case <-ticker.C: