X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/552c153523f55886867e54bad5db5eff166d0709..HEAD:/services/keep-balance/change_set.go diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go index 5437f76193..771e277d60 100644 --- a/services/keep-balance/change_set.go +++ b/services/keep-balance/change_set.go @@ -2,14 +2,15 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepbalance import ( "encoding/json" "fmt" "sync" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/services/keepstore" ) // Pull is a request to retrieve a block from a remote server, and @@ -23,13 +24,8 @@ type Pull struct { // MarshalJSON formats a pull request the way keepstore wants to see // it. func (p Pull) MarshalJSON() ([]byte, error) { - type KeepstorePullRequest struct { - Locator string `json:"locator"` - Servers []string `json:"servers"` - MountUUID string `json:"mount_uuid"` - } - return json.Marshal(KeepstorePullRequest{ - Locator: string(p.SizedDigest[:32]), + return json.Marshal(keepstore.PullListItem{ + Locator: string(p.SizedDigest), Servers: []string{p.From.URLBase()}, MountUUID: p.To.KeepMount.UUID, }) @@ -45,13 +41,8 @@ type Trash struct { // MarshalJSON formats a trash request the way keepstore wants to see // it, i.e., as a bare locator with no +size hint. func (t Trash) MarshalJSON() ([]byte, error) { - type KeepstoreTrashRequest struct { - Locator string `json:"locator"` - BlockMtime int64 `json:"block_mtime"` - MountUUID string `json:"mount_uuid"` - } - return json.Marshal(KeepstoreTrashRequest{ - Locator: string(t.SizedDigest[:32]), + return json.Marshal(keepstore.TrashListItem{ + Locator: string(t.SizedDigest), BlockMtime: t.Mtime, MountUUID: t.From.KeepMount.UUID, }) @@ -60,22 +51,35 @@ func (t Trash) MarshalJSON() ([]byte, error) { // ChangeSet is a set of change requests that will be sent to a // keepstore server. type ChangeSet struct { - Pulls []Pull - Trashes []Trash - mutex sync.Mutex + PullLimit int + TrashLimit int + + Pulls []Pull + PullsDeferred int // number that weren't added because of PullLimit + Trashes []Trash + TrashesDeferred int // number that weren't added because of TrashLimit + mutex sync.Mutex } // AddPull adds a Pull operation. func (cs *ChangeSet) AddPull(p Pull) { cs.mutex.Lock() - cs.Pulls = append(cs.Pulls, p) + if len(cs.Pulls) < cs.PullLimit { + cs.Pulls = append(cs.Pulls, p) + } else { + cs.PullsDeferred++ + } cs.mutex.Unlock() } // AddTrash adds a Trash operation func (cs *ChangeSet) AddTrash(t Trash) { cs.mutex.Lock() - cs.Trashes = append(cs.Trashes, t) + if len(cs.Trashes) < cs.TrashLimit { + cs.Trashes = append(cs.Trashes, t) + } else { + cs.TrashesDeferred++ + } cs.mutex.Unlock() } @@ -83,5 +87,5 @@ func (cs *ChangeSet) AddTrash(t Trash) { func (cs *ChangeSet) String() string { cs.mutex.Lock() defer cs.mutex.Unlock() - return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes)) + return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d} Deferred{Pulls:%d Trashes:%d}", len(cs.Pulls), len(cs.Trashes), cs.PullsDeferred, cs.TrashesDeferred) }