projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch 'main' into 21440-process-panel-reorg
[arvados.git]
/
services
/
keep-balance
/
change_set.go
diff --git
a/services/keep-balance/change_set.go
b/services/keep-balance/change_set.go
index 5437f761937747d199eba3ebd9a0696d2c2c0583..c3579556bb5f174781753676f0208d794a5ee620 100644
(file)
--- a/
services/keep-balance/change_set.go
+++ b/
services/keep-balance/change_set.go
@@
-2,14
+2,14
@@
//
// SPDX-License-Identifier: AGPL-3.0
//
// SPDX-License-Identifier: AGPL-3.0
-package
main
+package
keepbalance
import (
"encoding/json"
"fmt"
"sync"
import (
"encoding/json"
"fmt"
"sync"
- "git.
curoverse.com
/arvados.git/sdk/go/arvados"
+ "git.
arvados.org
/arvados.git/sdk/go/arvados"
)
// Pull is a request to retrieve a block from a remote server, and
)
// Pull is a request to retrieve a block from a remote server, and
@@
-60,22
+60,35
@@
func (t Trash) MarshalJSON() ([]byte, error) {
// ChangeSet is a set of change requests that will be sent to a
// keepstore server.
type ChangeSet struct {
// ChangeSet is a set of change requests that will be sent to a
// keepstore server.
type ChangeSet struct {
- Pulls []Pull
- Trashes []Trash
- mutex sync.Mutex
+ PullLimit int
+ TrashLimit int
+
+ Pulls []Pull
+ PullsDeferred int // number that weren't added because of PullLimit
+ Trashes []Trash
+ TrashesDeferred int // number that weren't added because of TrashLimit
+ mutex sync.Mutex
}
// AddPull adds a Pull operation.
func (cs *ChangeSet) AddPull(p Pull) {
cs.mutex.Lock()
}
// AddPull adds a Pull operation.
func (cs *ChangeSet) AddPull(p Pull) {
cs.mutex.Lock()
- cs.Pulls = append(cs.Pulls, p)
+ if len(cs.Pulls) < cs.PullLimit {
+ cs.Pulls = append(cs.Pulls, p)
+ } else {
+ cs.PullsDeferred++
+ }
cs.mutex.Unlock()
}
// AddTrash adds a Trash operation
func (cs *ChangeSet) AddTrash(t Trash) {
cs.mutex.Lock()
cs.mutex.Unlock()
}
// AddTrash adds a Trash operation
func (cs *ChangeSet) AddTrash(t Trash) {
cs.mutex.Lock()
- cs.Trashes = append(cs.Trashes, t)
+ if len(cs.Trashes) < cs.TrashLimit {
+ cs.Trashes = append(cs.Trashes, t)
+ } else {
+ cs.TrashesDeferred++
+ }
cs.mutex.Unlock()
}
cs.mutex.Unlock()
}
@@
-83,5
+96,5
@@
func (cs *ChangeSet) AddTrash(t Trash) {
func (cs *ChangeSet) String() string {
cs.mutex.Lock()
defer cs.mutex.Unlock()
func (cs *ChangeSet) String() string {
cs.mutex.Lock()
defer cs.mutex.Unlock()
- return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}
", len(cs.Pulls), len(cs.Trashes)
)
+ return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}
Deferred{Pulls:%d Trashes:%d}", len(cs.Pulls), len(cs.Trashes), cs.PullsDeferred, cs.TrashesDeferred
)
}
}