projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
21384: Update arvados-google-api-client in RailsAPI
[arvados.git]
/
services
/
keep-balance
/
change_set.go
diff --git
a/services/keep-balance/change_set.go
b/services/keep-balance/change_set.go
index 417ea7ff8b68ff02e59f0cd9d0fab17521931476..c3579556bb5f174781753676f0208d794a5ee620 100644
(file)
--- a/
services/keep-balance/change_set.go
+++ b/
services/keep-balance/change_set.go
@@
-1,36
+1,45
@@
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepbalance
import (
"encoding/json"
"fmt"
"sync"
import (
"encoding/json"
"fmt"
"sync"
- "git.
curoverse.com
/arvados.git/sdk/go/arvados"
+ "git.
arvados.org
/arvados.git/sdk/go/arvados"
)
// Pull is a request to retrieve a block from a remote server, and
// store it locally.
type Pull struct {
arvados.SizedDigest
)
// Pull is a request to retrieve a block from a remote server, and
// store it locally.
type Pull struct {
arvados.SizedDigest
- Source *KeepService
+ From *KeepService
+ To *KeepMount
}
// MarshalJSON formats a pull request the way keepstore wants to see
// it.
func (p Pull) MarshalJSON() ([]byte, error) {
type KeepstorePullRequest struct {
}
// MarshalJSON formats a pull request the way keepstore wants to see
// it.
func (p Pull) MarshalJSON() ([]byte, error) {
type KeepstorePullRequest struct {
- Locator string `json:"locator"`
- Servers []string `json:"servers"`
+ Locator string `json:"locator"`
+ Servers []string `json:"servers"`
+ MountUUID string `json:"mount_uuid"`
}
return json.Marshal(KeepstorePullRequest{
}
return json.Marshal(KeepstorePullRequest{
- Locator: string(p.SizedDigest[:32]),
- Servers: []string{p.Source.URLBase()}})
+ Locator: string(p.SizedDigest[:32]),
+ Servers: []string{p.From.URLBase()},
+ MountUUID: p.To.KeepMount.UUID,
+ })
}
// Trash is a request to delete a block.
type Trash struct {
arvados.SizedDigest
Mtime int64
}
// Trash is a request to delete a block.
type Trash struct {
arvados.SizedDigest
Mtime int64
+ From *KeepMount
}
// MarshalJSON formats a trash request the way keepstore wants to see
}
// MarshalJSON formats a trash request the way keepstore wants to see
@@
-39,31
+48,47
@@
func (t Trash) MarshalJSON() ([]byte, error) {
type KeepstoreTrashRequest struct {
Locator string `json:"locator"`
BlockMtime int64 `json:"block_mtime"`
type KeepstoreTrashRequest struct {
Locator string `json:"locator"`
BlockMtime int64 `json:"block_mtime"`
+ MountUUID string `json:"mount_uuid"`
}
return json.Marshal(KeepstoreTrashRequest{
Locator: string(t.SizedDigest[:32]),
}
return json.Marshal(KeepstoreTrashRequest{
Locator: string(t.SizedDigest[:32]),
- BlockMtime: t.Mtime})
+ BlockMtime: t.Mtime,
+ MountUUID: t.From.KeepMount.UUID,
+ })
}
// ChangeSet is a set of change requests that will be sent to a
// keepstore server.
type ChangeSet struct {
}
// ChangeSet is a set of change requests that will be sent to a
// keepstore server.
type ChangeSet struct {
- Pulls []Pull
- Trashes []Trash
- mutex sync.Mutex
+ PullLimit int
+ TrashLimit int
+
+ Pulls []Pull
+ PullsDeferred int // number that weren't added because of PullLimit
+ Trashes []Trash
+ TrashesDeferred int // number that weren't added because of TrashLimit
+ mutex sync.Mutex
}
// AddPull adds a Pull operation.
func (cs *ChangeSet) AddPull(p Pull) {
cs.mutex.Lock()
}
// AddPull adds a Pull operation.
func (cs *ChangeSet) AddPull(p Pull) {
cs.mutex.Lock()
- cs.Pulls = append(cs.Pulls, p)
+ if len(cs.Pulls) < cs.PullLimit {
+ cs.Pulls = append(cs.Pulls, p)
+ } else {
+ cs.PullsDeferred++
+ }
cs.mutex.Unlock()
}
// AddTrash adds a Trash operation
func (cs *ChangeSet) AddTrash(t Trash) {
cs.mutex.Lock()
cs.mutex.Unlock()
}
// AddTrash adds a Trash operation
func (cs *ChangeSet) AddTrash(t Trash) {
cs.mutex.Lock()
- cs.Trashes = append(cs.Trashes, t)
+ if len(cs.Trashes) < cs.TrashLimit {
+ cs.Trashes = append(cs.Trashes, t)
+ } else {
+ cs.TrashesDeferred++
+ }
cs.mutex.Unlock()
}
cs.mutex.Unlock()
}
@@
-71,5
+96,5
@@
func (cs *ChangeSet) AddTrash(t Trash) {
func (cs *ChangeSet) String() string {
cs.mutex.Lock()
defer cs.mutex.Unlock()
func (cs *ChangeSet) String() string {
cs.mutex.Lock()
defer cs.mutex.Unlock()
- return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}
", len(cs.Pulls), len(cs.Trashes)
)
+ return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}
Deferred{Pulls:%d Trashes:%d}", len(cs.Pulls), len(cs.Trashes), cs.PullsDeferred, cs.TrashesDeferred
)
}
}