// Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
+
package summary
import (
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/services/datamanager/keep"
- "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
"log"
"os"
"strings"
)
+// Locator is a block digest
type Locator blockdigest.DigestWithSize
+// MarshalJSON encoding
func (l Locator) MarshalJSON() ([]byte, error) {
return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
}
-// One entry in the Pull List
+// PullRequest represents one entry in the Pull List
type PullRequest struct {
Locator Locator `json:"locator"`
Servers []string `json:"servers"`
}
-// The Pull List for a particular server
+// PullList for a particular server
type PullList []PullRequest
// PullListByLocator implements sort.Interface for PullList based on
return false
}
+// PullServers struct
// For a given under-replicated block, this structure represents which
// servers should pull the specified block and which servers they can
// pull it from.
From []string // Servers that already contain the specified block
}
-// Creates a map from block locator to PullServers with one entry for
-// each under-replicated block.
+// ComputePullServers creates a map from block locator to PullServers
+// with one entry for each under-replicated block.
//
// This method ignores zero-replica blocks since there are no servers
// to pull them from, so callers should feel free to omit them, but
blockToDesiredReplication map[blockdigest.DigestWithSize]int,
underReplicated BlockSet) (m map[Locator]PullServers) {
m = map[Locator]PullServers{}
- // We use CanonicalString to avoid filling memory with dupicate
+ // We use CanonicalString to avoid filling memory with duplicate
// copies of the same string.
var cs CanonicalString
writableServers[cs.Get(url)] = struct{}{}
}
- for block, _ := range underReplicated {
+ for block := range underReplicated {
serversStoringBlock := keepServerInfo.BlockToServers[block]
numCopies := len(serversStoringBlock)
numCopiesMissing := blockToDesiredReplication[block] - numCopies
serverHasBlock := map[string]struct{}{}
for _, info := range serversStoringBlock {
sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
- serverHasBlock[cs.Get(sa.HostPort())] = struct{}{}
+ serverHasBlock[cs.Get(sa.URL())] = struct{}{}
}
roots := keepclient.NewRootSorter(kc.LocalRoots(),
return m
}
-// Creates a pull list in which the To and From fields preserve the
-// ordering of sorted servers and the contents are all canonical
-// strings.
+// CreatePullServers creates a pull list in which the To and From
+// fields preserve the ordering of sorted servers and the contents
+// are all canonical strings.
func CreatePullServers(cs CanonicalString,
serverHasBlock map[string]struct{},
writableServers map[string]struct{},
for _, host := range sortedServers {
// Strip the protocol portion of the url.
// Use the canonical copy of the string to avoid memory waste.
- server := cs.Get(RemoveProtocolPrefix(host))
+ server := cs.Get(host)
_, hasBlock := serverHasBlock[server]
if hasBlock {
// The from field should include the protocol.
return
}
-// Strips the protocol prefix from a url.
+// RemoveProtocolPrefix strips the protocol prefix from a url.
func RemoveProtocolPrefix(url string) string {
return url[(strings.LastIndex(url, "/") + 1):]
}
-// Produces a PullList for each keep server.
+// BuildPullLists produces a PullList for each keep server.
func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
spl = map[string]PullList{}
// We don't worry about canonicalizing our strings here, because we
return
}
-// Writes each pull list to a file.
+// WritePullLists writes each pull list to a file.
// The filename is based on the hostname.
//
// This is just a hack for prototyping, it is not expected to be used
// in production.
func WritePullLists(arvLogger *logger.Logger,
- pullLists map[string]PullList) {
+ pullLists map[string]PullList,
+ dryRun bool) error {
r := strings.NewReplacer(":", ".")
+
for host, list := range pullLists {
- filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
+ if arvLogger != nil {
+ // We need a local variable because Update doesn't call our mutator func until later,
+ // when our list variable might have been reused by the next loop iteration.
+ host := host
+ listLen := len(list)
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ pullListInfo := logger.GetOrCreateMap(p, "pull_list_len")
+ pullListInfo[host] = listLen
+ })
+ }
+
+ if dryRun {
+ log.Print("dry run, not sending pull list to service %s with %d blocks", host, len(list))
+ continue
+ }
+
+ filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
pullListFile, err := os.Create(filename)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to open %s: %v", filename, err))
+ return err
}
defer pullListFile.Close()
enc := json.NewEncoder(pullListFile)
err = enc.Encode(list)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
+ return err
}
log.Printf("Wrote pull list to %s.", filename)
}
+
+ return nil
}