//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepbalance
import (
"encoding/json"
"sync"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/services/keepstore"
)
// Pull is a request to retrieve a block from a remote server, and
// MarshalJSON formats a pull request the way keepstore wants to see
// it.
func (p Pull) MarshalJSON() ([]byte, error) {
- type KeepstorePullRequest struct {
- Locator string `json:"locator"`
- Servers []string `json:"servers"`
- MountUUID string `json:"mount_uuid"`
- }
- return json.Marshal(KeepstorePullRequest{
- Locator: string(p.SizedDigest[:32]),
+ return json.Marshal(keepstore.PullListItem{
+ Locator: string(p.SizedDigest),
Servers: []string{p.From.URLBase()},
MountUUID: p.To.KeepMount.UUID,
})
// MarshalJSON formats a trash request the way keepstore wants to see
// it, i.e., as a bare locator with no +size hint.
func (t Trash) MarshalJSON() ([]byte, error) {
- type KeepstoreTrashRequest struct {
- Locator string `json:"locator"`
- BlockMtime int64 `json:"block_mtime"`
- MountUUID string `json:"mount_uuid"`
- }
- return json.Marshal(KeepstoreTrashRequest{
- Locator: string(t.SizedDigest[:32]),
+ return json.Marshal(keepstore.TrashListItem{
+ Locator: string(t.SizedDigest),
BlockMtime: t.Mtime,
MountUUID: t.From.KeepMount.UUID,
})
// ChangeSet is a set of change requests that will be sent to a
// keepstore server.
type ChangeSet struct {
- Pulls []Pull
- Trashes []Trash
- mutex sync.Mutex
+ PullLimit int
+ TrashLimit int
+
+ Pulls []Pull
+ PullsDeferred int // number that weren't added because of PullLimit
+ Trashes []Trash
+ TrashesDeferred int // number that weren't added because of TrashLimit
+ mutex sync.Mutex
}
// AddPull adds a Pull operation.
func (cs *ChangeSet) AddPull(p Pull) {
cs.mutex.Lock()
- cs.Pulls = append(cs.Pulls, p)
+ if len(cs.Pulls) < cs.PullLimit {
+ cs.Pulls = append(cs.Pulls, p)
+ } else {
+ cs.PullsDeferred++
+ }
cs.mutex.Unlock()
}
// AddTrash adds a Trash operation
func (cs *ChangeSet) AddTrash(t Trash) {
cs.mutex.Lock()
- cs.Trashes = append(cs.Trashes, t)
+ if len(cs.Trashes) < cs.TrashLimit {
+ cs.Trashes = append(cs.Trashes, t)
+ } else {
+ cs.TrashesDeferred++
+ }
cs.mutex.Unlock()
}
func (cs *ChangeSet) String() string {
cs.mutex.Lock()
defer cs.mutex.Unlock()
- return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
+ return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d} Deferred{Pulls:%d Trashes:%d}", len(cs.Pulls), len(cs.Trashes), cs.PullsDeferred, cs.TrashesDeferred)
}