X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b96d5caa0056472fe67b82bd5305448d85c7d0cd..09cbdc3074b3f1e69c9c537875146f6da0a6ed8f:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 86423a2976..9f581751d9 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -2,12 +2,13 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepbalance import ( "bytes" "context" "crypto/md5" + "errors" "fmt" "io" "io/ioutil" @@ -18,11 +19,15 @@ import ( "sort" "strings" "sync" + "sync/atomic" "syscall" "time" + "git.arvados.org/arvados.git/lib/controller/dblock" "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" ) @@ -36,6 +41,7 @@ import ( // BlobSignatureTTL; and all N existing replicas of a given data block // are in the N best positions in rendezvous probe order. type Balancer struct { + DB *sqlx.DB Logger logrus.FieldLogger Dumper logrus.FieldLogger Metrics *metrics @@ -50,7 +56,7 @@ type Balancer struct { classes []string mounts int mountsByClass map[string]map[*KeepMount]bool - collScanned int + collScanned int64 serviceRoots map[string]string errors []error stats balancerStats @@ -63,16 +69,19 @@ type Balancer struct { // subsequent balance operation. // // Run should only be called once on a given Balancer object. -// -// Typical usage: -// -// runOptions, err = (&Balancer{}).Run(config, runOptions) -func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) { +func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) { nextRunOptions = runOptions + ctxlog.FromContext(ctx).Info("acquiring active lock") + if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) { + // context canceled + return + } + defer dblock.KeepBalanceActive.Unlock() + defer bal.time("sweep", "wall clock time to run one full sweep")() - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration())) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration())) defer cancel() var lbFile *os.File @@ -167,6 +176,15 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp } if runOptions.CommitTrash { err = bal.CommitTrash(ctx, client) + if err != nil { + return + } + } + if runOptions.CommitConfirmedFields { + err = bal.updateCollections(ctx, client, cluster) + if err != nil { + return + } } return } @@ -205,8 +223,8 @@ func (bal *Balancer) cleanupMounts() { rwdev := map[string]*KeepService{} for _, srv := range bal.KeepServices { for _, mnt := range srv.mounts { - if !mnt.ReadOnly && mnt.DeviceID != "" { - rwdev[mnt.DeviceID] = srv + if !mnt.ReadOnly { + rwdev[mnt.UUID] = srv } } } @@ -215,8 +233,8 @@ func (bal *Balancer) cleanupMounts() { for _, srv := range bal.KeepServices { var dedup []*KeepMount for _, mnt := range srv.mounts { - if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil { - bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID]) + if mnt.ReadOnly && rwdev[mnt.UUID] != nil { + bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID]) } else { dedup = append(dedup, mnt) } @@ -254,6 +272,29 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error { } } + mountProblem := false + type deviceMount struct { + srv *KeepService + mnt *KeepMount + } + deviceMounted := map[string]deviceMount{} // DeviceID -> mount + for _, srv := range bal.KeepServices { + for _, mnt := range srv.mounts { + if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" { + bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s", + mnt.DeviceID, + first.mnt.UUID, first.srv, + mnt.UUID, srv) + mountProblem = true + continue + } + deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt} + } + } + if mountProblem { + return errors.New("cannot continue with config errors (see above)") + } + var checkPage arvados.CollectionList if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{ Limit: new(int), @@ -345,12 +386,10 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag deviceMount := map[string]*KeepMount{} for _, srv := range bal.KeepServices { for _, mnt := range srv.mounts { - equiv := deviceMount[mnt.DeviceID] + equiv := deviceMount[mnt.UUID] if equiv == nil { equiv = mnt - if mnt.DeviceID != "" { - deviceMount[mnt.DeviceID] = equiv - } + deviceMount[mnt.UUID] = equiv } equivMount[equiv] = append(equivMount[equiv], mnt) } @@ -388,39 +427,14 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag }(mounts) } - // collQ buffers incoming collections so we can start fetching - // the next page without waiting for the current page to - // finish processing. collQ := make(chan arvados.Collection, bufs) - // Start a goroutine to process collections. (We could use a - // worker pool here, but even with a single worker we already - // process collections much faster than we can retrieve them.) + // Retrieve all collections from the database and send them to + // collQ. wg.Add(1) go func() { defer wg.Done() - for coll := range collQ { - err := bal.addCollection(coll) - if err != nil || len(errs) > 0 { - select { - case errs <- err: - default: - } - for range collQ { - } - cancel() - return - } - bal.collScanned++ - } - }() - - // Start a goroutine to retrieve all collections from the - // Arvados database and send them to collQ for processing. - wg.Add(1) - go func() { - defer wg.Done() - err = EachCollection(ctx, c, pageSize, + err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error { collQ <- coll if len(errs) > 0 { @@ -444,6 +458,27 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag } }() + // Parse manifests from collQ and pass the block hashes to + // BlockStateMap to track desired replication. + for i := 0; i < runtime.NumCPU(); i++ { + wg.Add(1) + go func() { + defer wg.Done() + for coll := range collQ { + err := bal.addCollection(coll) + if err != nil || len(errs) > 0 { + select { + case errs <- err: + default: + } + cancel() + continue + } + atomic.AddInt64(&bal.collScanned, 1) + } + }() + } + wg.Wait() if len(errs) > 0 { return <-errs @@ -460,7 +495,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error { if coll.ReplicationDesired != nil { repl = *coll.ReplicationDesired } - bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl) + bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl) // Pass pdh to IncreaseDesired only if LostBlocksFile is being // written -- otherwise it's just a waste of memory. pdh := "" @@ -530,10 +565,6 @@ func (bal *Balancer) setupLookupTables() { // effectively read-only. mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly - if len(mnt.StorageClasses) == 0 { - bal.mountsByClass["default"][mnt] = true - continue - } for class := range mnt.StorageClasses { if mbc := bal.mountsByClass[class]; mbc == nil { bal.classes = append(bal.classes, class) @@ -663,7 +694,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba // new/remaining replicas uniformly // across qualifying mounts on a given // server. - return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid) + return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid) } }) @@ -688,7 +719,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba // and returns true if all requirements are met. trySlot := func(i int) bool { slot := slots[i] - if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] { + if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] { // Already allocated a replica to this // backend device, possibly on a // different server. @@ -703,9 +734,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba slots[i].want = true wantSrv[slot.mnt.KeepService] = true wantMnt[slot.mnt] = true - if slot.mnt.DeviceID != "" { - wantDev[slot.mnt.DeviceID] = true - } + wantDev[slot.mnt.UUID] = true replWant += slot.mnt.Replication } return replProt >= desired && replWant >= desired @@ -747,7 +776,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba // haven't already been added to unsafeToDelete // because the servers report different Mtimes. for _, slot := range slots { - if slot.repl != nil && wantDev[slot.mnt.DeviceID] { + if slot.repl != nil && wantDev[slot.mnt.UUID] { unsafeToDelete[slot.repl.Mtime] = true } } @@ -830,7 +859,7 @@ func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRe if onlyCount != nil && !onlyCount[slot.mnt] { continue } - if countedDev[slot.mnt.DeviceID] { + if countedDev[slot.mnt.UUID] { continue } switch { @@ -844,9 +873,7 @@ func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRe bbs.pulling++ repl += slot.mnt.Replication } - if slot.mnt.DeviceID != "" { - countedDev[slot.mnt.DeviceID] = true - } + countedDev[slot.mnt.UUID] = true } if repl < needRepl { bbs.unachievable = true