17574: Update replication_confirmed fields after keep-balance run.
[arvados.git] / services / keep-balance / collection.go
index 1659918cafe20c62162abfb1e841a40f80170c0a..3afb1ccc5500ccba647940b8f31254a1cd117861 100644 (file)
@@ -6,10 +6,16 @@ package main
 
 import (
        "context"
+       "encoding/json"
        "fmt"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/jmoiron/sqlx"
 )
 
 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
@@ -65,7 +71,7 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
                Limit:              &limit,
                Order:              "modified_at, uuid",
                Count:              "none",
-               Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+               Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
                IncludeTrash:       true,
                IncludeOldVersions: true,
        }
@@ -165,3 +171,98 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
 
        return nil
 }
+
+func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+
+       defer bal.time("update_collections", "wall clock time to update collections")()
+       threshold := time.Now()
+       thresholdStr := threshold.Format(time.RFC3339Nano)
+
+       var err error
+       collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
+       go func() {
+               defer close(collQ)
+               err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
+                       if coll.ModifiedAt.After(threshold) {
+                               return io.EOF
+                       }
+                       if coll.IsTrashed {
+                               return nil
+                       }
+                       collQ <- coll
+                       return nil
+               }, func(done, total int) {
+                       bal.logf("update collections: %d/%d", done, total)
+               })
+               if err == io.EOF {
+                       err = nil
+               } else if err != nil {
+                       bal.logf("error updating collections: %s", err)
+               }
+       }()
+
+       db, err := bal.db(cluster)
+       if err != nil {
+               return err
+       }
+
+       var updated int64
+       var wg sync.WaitGroup
+       for i := 0; i < runtime.NumCPU(); i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for coll := range collQ {
+                               blkids, err := coll.SizedDigests()
+                               if err != nil {
+                                       bal.logf("%s: %s", coll.UUID, err)
+                                       continue
+                               }
+                               repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+                               tx, err := db.Beginx()
+                               if err != nil {
+                                       bal.logf("error opening transaction: %s", coll.UUID, err)
+                                       cancel()
+                                       continue
+                               }
+                               classes, _ := json.Marshal(coll.StorageClassesDesired)
+                               _, err = tx.ExecContext(ctx, `update collections set
+                                       replication_confirmed=$1,
+                                       replication_confirmed_at=$2,
+                                       storage_classes_confirmed=$3,
+                                       storage_classes_confirmed_at=$2
+                                       where uuid=$4`,
+                                       repl, thresholdStr, classes, coll.UUID)
+                               if err != nil {
+                                       tx.Rollback()
+                               } else {
+                                       err = tx.Commit()
+                               }
+                               if err != nil {
+                                       bal.logf("%s: update failed: %s", coll.UUID, err)
+                                       continue
+                               }
+                               atomic.AddInt64(&updated, 1)
+                       }
+               }()
+       }
+       wg.Wait()
+       bal.logf("updated %d collections", updated)
+       return err
+}
+
+func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
+       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+       if err != nil {
+               return nil, err
+       }
+       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+               db.SetMaxOpenConns(p)
+       }
+       if err := db.Ping(); err != nil {
+               return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
+       }
+       return db, nil
+}