X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c900f416c36cd74675c5bf4c33ad1dbe5d1e78fa..cbfdb1b66ab9c1b6e69d1c9cd589633386267177:/services/keep-balance/change_set.go diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go index 417ea7ff8b..c3579556bb 100644 --- a/services/keep-balance/change_set.go +++ b/services/keep-balance/change_set.go @@ -1,36 +1,45 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepbalance import ( "encoding/json" "fmt" "sync" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvados" ) // Pull is a request to retrieve a block from a remote server, and // store it locally. type Pull struct { arvados.SizedDigest - Source *KeepService + From *KeepService + To *KeepMount } // MarshalJSON formats a pull request the way keepstore wants to see // it. func (p Pull) MarshalJSON() ([]byte, error) { type KeepstorePullRequest struct { - Locator string `json:"locator"` - Servers []string `json:"servers"` + Locator string `json:"locator"` + Servers []string `json:"servers"` + MountUUID string `json:"mount_uuid"` } return json.Marshal(KeepstorePullRequest{ - Locator: string(p.SizedDigest[:32]), - Servers: []string{p.Source.URLBase()}}) + Locator: string(p.SizedDigest[:32]), + Servers: []string{p.From.URLBase()}, + MountUUID: p.To.KeepMount.UUID, + }) } // Trash is a request to delete a block. type Trash struct { arvados.SizedDigest Mtime int64 + From *KeepMount } // MarshalJSON formats a trash request the way keepstore wants to see @@ -39,31 +48,47 @@ func (t Trash) MarshalJSON() ([]byte, error) { type KeepstoreTrashRequest struct { Locator string `json:"locator"` BlockMtime int64 `json:"block_mtime"` + MountUUID string `json:"mount_uuid"` } return json.Marshal(KeepstoreTrashRequest{ Locator: string(t.SizedDigest[:32]), - BlockMtime: t.Mtime}) + BlockMtime: t.Mtime, + MountUUID: t.From.KeepMount.UUID, + }) } // ChangeSet is a set of change requests that will be sent to a // keepstore server. type ChangeSet struct { - Pulls []Pull - Trashes []Trash - mutex sync.Mutex + PullLimit int + TrashLimit int + + Pulls []Pull + PullsDeferred int // number that weren't added because of PullLimit + Trashes []Trash + TrashesDeferred int // number that weren't added because of TrashLimit + mutex sync.Mutex } // AddPull adds a Pull operation. func (cs *ChangeSet) AddPull(p Pull) { cs.mutex.Lock() - cs.Pulls = append(cs.Pulls, p) + if len(cs.Pulls) < cs.PullLimit { + cs.Pulls = append(cs.Pulls, p) + } else { + cs.PullsDeferred++ + } cs.mutex.Unlock() } // AddTrash adds a Trash operation func (cs *ChangeSet) AddTrash(t Trash) { cs.mutex.Lock() - cs.Trashes = append(cs.Trashes, t) + if len(cs.Trashes) < cs.TrashLimit { + cs.Trashes = append(cs.Trashes, t) + } else { + cs.TrashesDeferred++ + } cs.mutex.Unlock() } @@ -71,5 +96,5 @@ func (cs *ChangeSet) AddTrash(t Trash) { func (cs *ChangeSet) String() string { cs.mutex.Lock() defer cs.mutex.Unlock() - return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes)) + return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d} Deferred{Pulls:%d Trashes:%d}", len(cs.Pulls), len(cs.Trashes), cs.PullsDeferred, cs.TrashesDeferred) }