21189: Replace -commit-X with Balance{Pull,Trash}Limit configs.
[arvados.git] / services / keep-balance / change_set.go
index f88cf8ea9fdb6fd68be5cb5c5cbc1186434147bf..c3579556bb5f174781753676f0208d794a5ee620 100644 (file)
@@ -2,39 +2,44 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepbalance
 
 import (
        "encoding/json"
        "fmt"
        "sync"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
 )
 
 // Pull is a request to retrieve a block from a remote server, and
 // store it locally.
 type Pull struct {
        arvados.SizedDigest
-       Source *KeepService
+       From *KeepService
+       To   *KeepMount
 }
 
 // MarshalJSON formats a pull request the way keepstore wants to see
 // it.
 func (p Pull) MarshalJSON() ([]byte, error) {
        type KeepstorePullRequest struct {
-               Locator string   `json:"locator"`
-               Servers []string `json:"servers"`
+               Locator   string   `json:"locator"`
+               Servers   []string `json:"servers"`
+               MountUUID string   `json:"mount_uuid"`
        }
        return json.Marshal(KeepstorePullRequest{
-               Locator: string(p.SizedDigest[:32]),
-               Servers: []string{p.Source.URLBase()}})
+               Locator:   string(p.SizedDigest[:32]),
+               Servers:   []string{p.From.URLBase()},
+               MountUUID: p.To.KeepMount.UUID,
+       })
 }
 
 // Trash is a request to delete a block.
 type Trash struct {
        arvados.SizedDigest
        Mtime int64
+       From  *KeepMount
 }
 
 // MarshalJSON formats a trash request the way keepstore wants to see
@@ -43,31 +48,47 @@ func (t Trash) MarshalJSON() ([]byte, error) {
        type KeepstoreTrashRequest struct {
                Locator    string `json:"locator"`
                BlockMtime int64  `json:"block_mtime"`
+               MountUUID  string `json:"mount_uuid"`
        }
        return json.Marshal(KeepstoreTrashRequest{
                Locator:    string(t.SizedDigest[:32]),
-               BlockMtime: t.Mtime})
+               BlockMtime: t.Mtime,
+               MountUUID:  t.From.KeepMount.UUID,
+       })
 }
 
 // ChangeSet is a set of change requests that will be sent to a
 // keepstore server.
 type ChangeSet struct {
-       Pulls   []Pull
-       Trashes []Trash
-       mutex   sync.Mutex
+       PullLimit  int
+       TrashLimit int
+
+       Pulls           []Pull
+       PullsDeferred   int // number that weren't added because of PullLimit
+       Trashes         []Trash
+       TrashesDeferred int // number that weren't added because of TrashLimit
+       mutex           sync.Mutex
 }
 
 // AddPull adds a Pull operation.
 func (cs *ChangeSet) AddPull(p Pull) {
        cs.mutex.Lock()
-       cs.Pulls = append(cs.Pulls, p)
+       if len(cs.Pulls) < cs.PullLimit {
+               cs.Pulls = append(cs.Pulls, p)
+       } else {
+               cs.PullsDeferred++
+       }
        cs.mutex.Unlock()
 }
 
 // AddTrash adds a Trash operation
 func (cs *ChangeSet) AddTrash(t Trash) {
        cs.mutex.Lock()
-       cs.Trashes = append(cs.Trashes, t)
+       if len(cs.Trashes) < cs.TrashLimit {
+               cs.Trashes = append(cs.Trashes, t)
+       } else {
+               cs.TrashesDeferred++
+       }
        cs.mutex.Unlock()
 }
 
@@ -75,5 +96,5 @@ func (cs *ChangeSet) AddTrash(t Trash) {
 func (cs *ChangeSet) String() string {
        cs.mutex.Lock()
        defer cs.mutex.Unlock()
-       return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
+       return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d} Deferred{Pulls:%d Trashes:%d}", len(cs.Pulls), len(cs.Trashes), cs.PullsDeferred, cs.TrashesDeferred)
 }