import (
"context"
+ "encoding/json"
"fmt"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/jmoiron/sqlx"
)
func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
Limit: &limit,
Order: "modified_at, uuid",
Count: "none",
- Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+ Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
IncludeTrash: true,
IncludeOldVersions: true,
}
return nil
}
+
+func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ defer bal.time("update_collections", "wall clock time to update collections")()
+ threshold := time.Now()
+ thresholdStr := threshold.Format(time.RFC3339Nano)
+
+ var err error
+ collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
+ go func() {
+ defer close(collQ)
+ err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
+ if coll.ModifiedAt.After(threshold) {
+ return io.EOF
+ }
+ if coll.IsTrashed {
+ return nil
+ }
+ collQ <- coll
+ return nil
+ }, func(done, total int) {
+ bal.logf("update collections: %d/%d", done, total)
+ })
+ if err == io.EOF {
+ err = nil
+ } else if err != nil {
+ bal.logf("error updating collections: %s", err)
+ }
+ }()
+
+ db, err := bal.db(cluster)
+ if err != nil {
+ return err
+ }
+
+ var updated int64
+ var wg sync.WaitGroup
+ for i := 0; i < runtime.NumCPU(); i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for coll := range collQ {
+ blkids, err := coll.SizedDigests()
+ if err != nil {
+ bal.logf("%s: %s", coll.UUID, err)
+ continue
+ }
+ repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+ tx, err := db.Beginx()
+ if err != nil {
+ bal.logf("error opening transaction: %s", coll.UUID, err)
+ cancel()
+ continue
+ }
+ classes, _ := json.Marshal(coll.StorageClassesDesired)
+ _, err = tx.ExecContext(ctx, `update collections set
+ replication_confirmed=$1,
+ replication_confirmed_at=$2,
+ storage_classes_confirmed=$3,
+ storage_classes_confirmed_at=$2
+ where uuid=$4`,
+ repl, thresholdStr, classes, coll.UUID)
+ if err != nil {
+ tx.Rollback()
+ } else {
+ err = tx.Commit()
+ }
+ if err != nil {
+ bal.logf("%s: update failed: %s", coll.UUID, err)
+ continue
+ }
+ atomic.AddInt64(&updated, 1)
+ }
+ }()
+ }
+ wg.Wait()
+ bal.logf("updated %d collections", updated)
+ return err
+}
+
+func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
+ db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+ if err != nil {
+ return nil, err
+ }
+ if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+ db.SetMaxOpenConns(p)
+ }
+ if err := db.Ping(); err != nil {
+ return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
+ }
+ return db, nil
+}