X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/55aafbb07904ca24390dd47ea960eae7cb2b909a..62d3fea589acf744bb8ead4d42e11f794633704c:/services/keep-balance/change_set.go diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go index f88cf8ea9f..771e277d60 100644 --- a/services/keep-balance/change_set.go +++ b/services/keep-balance/change_set.go @@ -2,72 +2,84 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepbalance import ( "encoding/json" "fmt" "sync" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/services/keepstore" ) // Pull is a request to retrieve a block from a remote server, and // store it locally. type Pull struct { arvados.SizedDigest - Source *KeepService + From *KeepService + To *KeepMount } // MarshalJSON formats a pull request the way keepstore wants to see // it. func (p Pull) MarshalJSON() ([]byte, error) { - type KeepstorePullRequest struct { - Locator string `json:"locator"` - Servers []string `json:"servers"` - } - return json.Marshal(KeepstorePullRequest{ - Locator: string(p.SizedDigest[:32]), - Servers: []string{p.Source.URLBase()}}) + return json.Marshal(keepstore.PullListItem{ + Locator: string(p.SizedDigest), + Servers: []string{p.From.URLBase()}, + MountUUID: p.To.KeepMount.UUID, + }) } // Trash is a request to delete a block. type Trash struct { arvados.SizedDigest Mtime int64 + From *KeepMount } // MarshalJSON formats a trash request the way keepstore wants to see // it, i.e., as a bare locator with no +size hint. func (t Trash) MarshalJSON() ([]byte, error) { - type KeepstoreTrashRequest struct { - Locator string `json:"locator"` - BlockMtime int64 `json:"block_mtime"` - } - return json.Marshal(KeepstoreTrashRequest{ - Locator: string(t.SizedDigest[:32]), - BlockMtime: t.Mtime}) + return json.Marshal(keepstore.TrashListItem{ + Locator: string(t.SizedDigest), + BlockMtime: t.Mtime, + MountUUID: t.From.KeepMount.UUID, + }) } // ChangeSet is a set of change requests that will be sent to a // keepstore server. type ChangeSet struct { - Pulls []Pull - Trashes []Trash - mutex sync.Mutex + PullLimit int + TrashLimit int + + Pulls []Pull + PullsDeferred int // number that weren't added because of PullLimit + Trashes []Trash + TrashesDeferred int // number that weren't added because of TrashLimit + mutex sync.Mutex } // AddPull adds a Pull operation. func (cs *ChangeSet) AddPull(p Pull) { cs.mutex.Lock() - cs.Pulls = append(cs.Pulls, p) + if len(cs.Pulls) < cs.PullLimit { + cs.Pulls = append(cs.Pulls, p) + } else { + cs.PullsDeferred++ + } cs.mutex.Unlock() } // AddTrash adds a Trash operation func (cs *ChangeSet) AddTrash(t Trash) { cs.mutex.Lock() - cs.Trashes = append(cs.Trashes, t) + if len(cs.Trashes) < cs.TrashLimit { + cs.Trashes = append(cs.Trashes, t) + } else { + cs.TrashesDeferred++ + } cs.mutex.Unlock() } @@ -75,5 +87,5 @@ func (cs *ChangeSet) AddTrash(t Trash) { func (cs *ChangeSet) String() string { cs.mutex.Lock() defer cs.mutex.Unlock() - return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes)) + return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d} Deferred{Pulls:%d Trashes:%d}", len(cs.Pulls), len(cs.Trashes), cs.PullsDeferred, cs.TrashesDeferred) }