Added code for generating pull lists.
authormishaz <misha@curoverse.com>
Tue, 19 May 2015 00:54:29 +0000 (00:54 +0000)
committermishaz <misha@curoverse.com>
Tue, 19 May 2015 00:54:29 +0000 (00:54 +0000)
services/datamanager/datamanager.go
services/datamanager/summary/canonical_string.go [new file with mode: 0644]
services/datamanager/summary/pull_list.go [new file with mode: 0644]
services/datamanager/summary/pull_list_test.go [new file with mode: 0644]

index d3efe621731c0e1a93cb4dc333fdd9537bad65c2..301aa81f4e9816e95d00e1828693c1d92aeac502 100644 (file)
@@ -4,7 +4,9 @@ package main
 
 import (
        "flag"
+       "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/sdk/go/util"
        "git.curoverse.com/arvados.git/services/datamanager/collection"
@@ -122,6 +124,21 @@ func singlerun() {
                        rlbss.Count)
        }
 
+       kc, err := keepclient.MakeKeepClient(&arv)
+       if err != nil {
+               loggerutil.FatalWithMessage(arvLogger,
+                       fmt.Sprintf("Error setting up keep client %s", err.Error()))
+       }
+
+       pullServers := summary.ComputePullServers(&kc,
+               &keepServerInfo,
+               readCollections.BlockToReplication,
+               replicationSummary.UnderReplicatedBlocks)
+
+       pullLists := summary.BuildPullLists(pullServers)
+
+       summary.WritePullLists(arvLogger, pullLists)
+
        // Log that we're finished. We force the recording, since go will
        // not wait for the write timer before exiting.
        if arvLogger != nil {
diff --git a/services/datamanager/summary/canonical_string.go b/services/datamanager/summary/canonical_string.go
new file mode 100644 (file)
index 0000000..94f0676
--- /dev/null
@@ -0,0 +1,27 @@
+/* Ensures that we only have one copy of each unique string. This is
+/* not designed for concurrent access. */
+package summary
+
+// This code should probably be moved somewhere more universal.
+
+type CanonicalString struct {
+       m map[string]string
+}
+
+func (cs *CanonicalString) Get(s string) (r string) {
+       if cs.m == nil {
+               cs.m = make(map[string]string)
+       }
+       value, found := cs.m[s]
+       if found {
+               return value
+       }
+
+       // s may be a substring of a much larger string.
+       // If we store s, it will prevent that larger string from getting
+       // garbage collected.
+       // If this is something you worry about you should change this code
+       // to make an explict copy of s using a byte array.
+       cs.m[s] = s
+       return s
+}
diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go
new file mode 100644 (file)
index 0000000..fb1fed1
--- /dev/null
@@ -0,0 +1,177 @@
+// Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
+package summary
+
+import (
+       "encoding/json"
+       "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/logger"
+       "git.curoverse.com/arvados.git/services/datamanager/keep"
+       "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+       "log"
+       "os"
+       "strings"
+)
+
+type Locator struct {
+       Digest blockdigest.BlockDigest
+       // TODO(misha): Add size field to the Locator (and MarshalJSON() below)
+}
+
+func (l Locator) MarshalJSON() ([]byte, error) {
+       return []byte("\"" + l.Digest.String() + "\""), nil
+}
+
+// One entry in the Pull List
+type PullListEntry struct {
+       Locator Locator  `json:"locator"`
+       Servers []string `json:"servers"`
+}
+
+// The Pull List for a particular server
+type PullList struct {
+       Entries []PullListEntry `json:"blocks"`
+}
+
+// EntriesByDigest implements sort.Interface for []PullListEntry
+// based on the Digest.
+type EntriesByDigest []PullListEntry
+
+func (a EntriesByDigest) Len() int      { return len(a) }
+func (a EntriesByDigest) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+func (a EntriesByDigest) Less(i, j int) bool {
+       di, dj := a[i].Locator.Digest, a[j].Locator.Digest
+       return di.H < dj.H || (di.H == dj.H && di.L < dj.L)
+}
+
+// For a given under-replicated block, this structure represents which
+// servers should pull the specified block and which servers they can
+// pull it from.
+type PullServers struct {
+       To   []string // Servers that should pull the specified block
+       From []string // Servers that already contain the specified block
+}
+
+// Creates a map from block locator to PullServers with one entry for
+// each under-replicated block.
+func ComputePullServers(kc *keepclient.KeepClient,
+       keepServerInfo *keep.ReadServers,
+       blockToDesiredReplication map[blockdigest.BlockDigest]int,
+       underReplicated BlockSet) (m map[Locator]PullServers) {
+       m = map[Locator]PullServers{}
+       // We use CanonicalString to avoid filling memory with dupicate
+       // copies of the same string.
+       var cs CanonicalString
+
+       for block, _ := range underReplicated {
+               serversStoringBlock := keepServerInfo.BlockToServers[block]
+               numCopies := len(serversStoringBlock)
+               numCopiesMissing := blockToDesiredReplication[block] - numCopies
+               if numCopiesMissing > 0 {
+                       // We expect this to always be true, since the block was listed
+                       // in underReplicated.
+                       // TODO(misha): Consider asserting the above conditional.
+
+                       if numCopies > 0 {
+                               // I believe that we should expect this to always be true.
+
+                               // A server's host-port string appears as a key in this map
+                               // iff it contains the block.
+                               serverHasBlock := map[string]struct{}{}
+                               for _, info := range serversStoringBlock {
+                                       sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
+                                       serverHasBlock[sa.HostPort()] = struct{}{}
+                               }
+
+                               roots := keepclient.NewRootSorter(kc.ServiceRoots(),
+                                       block.String()).GetSortedRoots()
+
+                               l := Locator{Digest: block}
+                               m[l] = CreatePullServers(cs, serverHasBlock, roots, numCopiesMissing)
+                       }
+               }
+       }
+       return m
+}
+
+// Creates a pull list in which the To and From fields preserve the
+// ordering of sorted servers and the contents are all canonical
+// strings.
+func CreatePullServers(cs CanonicalString,
+       serverHasBlock map[string]struct{},
+       sortedServers []string,
+       maxToFields int) (ps PullServers) {
+
+       ps = PullServers{
+               To:   make([]string, 0, maxToFields),
+               From: make([]string, 0, len(serverHasBlock)),
+       }
+
+       for _, host := range sortedServers {
+               // Strip the protocol portion of the url.
+               // Use the canonical copy of the string to avoid memory waste.
+               server := cs.Get(RemoveProtocolPrefix(host))
+               _, hasBlock := serverHasBlock[server]
+               if hasBlock {
+                       ps.From = append(ps.From, server)
+               } else if len(ps.To) < maxToFields {
+                       ps.To = append(ps.To, server)
+               }
+       }
+
+       return
+}
+
+// Strips the protocol prefix from a url.
+func RemoveProtocolPrefix(url string) string {
+       return url[(strings.LastIndex(url, "/") + 1):]
+}
+
+// Produces a PullList for each keep server.
+func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
+       spl = map[string]PullList{}
+       // We don't worry about canonicalizing our strings here, because we
+       // assume lps was created by ComputePullServers() which already
+       // canonicalized the strings for us.
+       for locator, pullServers := range lps {
+               for _, destination := range pullServers.To {
+                       pullList, pullListExists := spl[destination]
+                       if !pullListExists {
+                               pullList = PullList{Entries: []PullListEntry{}}
+                               spl[destination] = pullList
+                       }
+                       pullList.Entries = append(pullList.Entries,
+                               PullListEntry{Locator: locator, Servers: pullServers.From})
+                       spl[destination] = pullList
+               }
+       }
+       return
+}
+
+// Writes each pull list to a file.
+// The filename is based on the hostname.
+//
+// This is just a hack for prototyping, it is not expected to be used
+// in production.
+func WritePullLists(arvLogger *logger.Logger,
+       pullLists map[string]PullList) {
+       r := strings.NewReplacer(":", ".")
+       for host, list := range pullLists {
+               filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
+               pullListFile, err := os.Create(filename)
+               if err != nil {
+                       loggerutil.FatalWithMessage(arvLogger,
+                               fmt.Sprintf("Failed to open %s: %v", filename, err))
+               }
+               defer pullListFile.Close()
+
+               enc := json.NewEncoder(pullListFile)
+               err = enc.Encode(list)
+               if err != nil {
+                       loggerutil.FatalWithMessage(arvLogger,
+                               fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
+               }
+               log.Printf("Wrote pull list to %s.", filename)
+       }
+}
diff --git a/services/datamanager/summary/pull_list_test.go b/services/datamanager/summary/pull_list_test.go
new file mode 100644 (file)
index 0000000..dd73bf5
--- /dev/null
@@ -0,0 +1,267 @@
+package summary
+
+import (
+       "encoding/json"
+       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
+       . "gopkg.in/check.v1"
+       "sort"
+       "testing"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       TestingT(t)
+}
+
+type MySuite struct{}
+
+var _ = Suite(&MySuite{})
+
+// Helper method to declare string sets more succinctly
+// Could be placed somewhere more general.
+func stringSet(slice ...string) (m map[string]struct{}) {
+       m = map[string]struct{}{}
+       for _, s := range slice {
+               m[s] = struct{}{}
+       }
+       return
+}
+
+func (s *MySuite) TestPullListPrintsJSONCorrectly(c *C) {
+       pl := PullList{Entries: []PullListEntry{PullListEntry{
+               Locator: Locator{Digest: blockdigest.MakeTestBlockDigest(0xBadBeef)},
+               Servers: []string{"keep0.qr1hi.arvadosapi.com:25107",
+                       "keep1.qr1hi.arvadosapi.com:25108"}}}}
+
+       b, err := json.Marshal(pl)
+       c.Assert(err, IsNil)
+       expectedOutput := `{"blocks":[{"locator":"0000000000000000000000000badbeef",` +
+               `"servers":["keep0.qr1hi.arvadosapi.com:25107",` +
+               `"keep1.qr1hi.arvadosapi.com:25108"]}]}`
+       c.Check(string(b), Equals, expectedOutput)
+}
+
+func (s *MySuite) TestCreatePullServers(c *C) {
+       var cs CanonicalString
+       c.Check(
+               CreatePullServers(cs,
+                       stringSet(),
+                       []string{},
+                       5),
+               DeepEquals,
+               PullServers{To: []string{}, From: []string{}})
+
+       c.Check(
+               CreatePullServers(cs,
+                       stringSet("keep0:25107", "keep1:25108"),
+                       []string{},
+                       5),
+               DeepEquals,
+               PullServers{To: []string{}, From: []string{}})
+
+       c.Check(
+               CreatePullServers(cs,
+                       stringSet("keep0:25107", "keep1:25108"),
+                       []string{"keep0:25107"},
+                       5),
+               DeepEquals,
+               PullServers{To: []string{}, From: []string{"keep0:25107"}})
+
+       c.Check(
+               CreatePullServers(cs,
+                       stringSet("keep0:25107", "keep1:25108"),
+                       []string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
+                       5),
+               DeepEquals,
+               PullServers{To: []string{"keep3:25110", "keep2:25109"},
+                       From: []string{"keep1:25108", "keep0:25107"}})
+
+       c.Check(
+               CreatePullServers(cs,
+                       stringSet("keep0:25107", "keep1:25108"),
+                       []string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
+                       1),
+               DeepEquals,
+               PullServers{To: []string{"keep3:25110"},
+                       From: []string{"keep1:25108", "keep0:25107"}})
+
+       c.Check(
+               CreatePullServers(cs,
+                       stringSet("keep0:25107", "keep1:25108"),
+                       []string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
+                       0),
+               DeepEquals,
+               PullServers{To: []string{},
+                       From: []string{"keep1:25108", "keep0:25107"}})
+}
+
+// Checks whether two pull list maps are equal. Since pull lists are
+// ordered arbitrarily, we need to sort them by digest before
+// comparing them for deep equality.
+type pullListMapEqualsChecker struct {
+       *CheckerInfo
+}
+
+func (c *pullListMapEqualsChecker) Check(params []interface{}, names []string) (result bool, error string) {
+       obtained, ok := params[0].(map[string]PullList)
+       if !ok {
+               return false, "First parameter is not a PullList map"
+       }
+       expected, ok := params[1].(map[string]PullList)
+       if !ok {
+               return false, "Second parameter is not a PullList map"
+       }
+
+       for _, v := range obtained {
+               sort.Sort(EntriesByDigest(v.Entries))
+       }
+       for _, v := range expected {
+               sort.Sort(EntriesByDigest(v.Entries))
+       }
+
+       return DeepEquals.Check(params, names)
+}
+
+var PullListMapEquals Checker = &pullListMapEqualsChecker{
+       &CheckerInfo{Name: "PullListMapEquals", Params: []string{"obtained", "expected"}},
+}
+
+func (s *MySuite) TestBuildPullLists(c *C) {
+       c.Check(
+               BuildPullLists(map[Locator]PullServers{}),
+               PullListMapEquals,
+               map[string]PullList{})
+
+       locator1 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xBadBeef)}
+       c.Check(
+               BuildPullLists(map[Locator]PullServers{
+                       locator1: PullServers{To: []string{}, From: []string{}}}),
+               PullListMapEquals,
+               map[string]PullList{})
+
+       c.Check(
+               BuildPullLists(map[Locator]PullServers{
+                       locator1: PullServers{To: []string{}, From: []string{"f1", "f2"}}}),
+               PullListMapEquals,
+               map[string]PullList{})
+
+       c.Check(
+               BuildPullLists(map[Locator]PullServers{
+                       locator1: PullServers{To: []string{"t1"}, From: []string{"f1", "f2"}}}),
+               PullListMapEquals,
+               map[string]PullList{"t1": PullList{Entries: []PullListEntry{PullListEntry{
+                       Locator: locator1,
+                       Servers: []string{"f1", "f2"}}}}})
+
+       c.Check(
+               BuildPullLists(map[Locator]PullServers{
+                       locator1: PullServers{To: []string{"t1"}, From: []string{}}}),
+               PullListMapEquals,
+               map[string]PullList{"t1": PullList{Entries: []PullListEntry{PullListEntry{
+                       Locator: locator1,
+                       Servers: []string{}}}}})
+
+       c.Check(
+               BuildPullLists(map[Locator]PullServers{
+                       locator1: PullServers{To: []string{"t1", "t2"}, From: []string{"f1", "f2"}}}),
+               PullListMapEquals,
+               map[string]PullList{
+                       "t1": PullList{Entries: []PullListEntry{PullListEntry{
+                               Locator: locator1,
+                               Servers: []string{"f1", "f2"}}}},
+                       "t2": PullList{Entries: []PullListEntry{PullListEntry{
+                               Locator: locator1,
+                               Servers: []string{"f1", "f2"}}}},
+               })
+
+       locator2 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xCabbed)}
+       c.Check(
+               BuildPullLists(map[Locator]PullServers{
+                       locator1: PullServers{To: []string{"t1"}, From: []string{"f1", "f2"}},
+                       locator2: PullServers{To: []string{"t2"}, From: []string{"f3", "f4"}}}),
+               PullListMapEquals,
+               map[string]PullList{
+                       "t1": PullList{Entries: []PullListEntry{PullListEntry{
+                               Locator: locator1,
+                               Servers: []string{"f1", "f2"}}}},
+                       "t2": PullList{Entries: []PullListEntry{PullListEntry{
+                               Locator: locator2,
+                               Servers: []string{"f3", "f4"}}}},
+               })
+
+       c.Check(
+               BuildPullLists(map[Locator]PullServers{
+                       locator1: PullServers{To: []string{"t1"}, From: []string{"f1", "f2"}},
+                       locator2: PullServers{To: []string{"t2", "t1"}, From: []string{"f3", "f4"}}}),
+               PullListMapEquals,
+               map[string]PullList{
+                       "t1": PullList{Entries: []PullListEntry{
+                               PullListEntry{
+                                       Locator: locator1,
+                                       Servers: []string{"f1", "f2"}},
+                               PullListEntry{
+                                       Locator: locator2,
+                                       Servers: []string{"f3", "f4"}}}},
+                       "t2": PullList{Entries: []PullListEntry{PullListEntry{
+                               Locator: locator2,
+                               Servers: []string{"f3", "f4"}}}},
+               })
+
+       locator3 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xDeadBeef)}
+       locator4 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xFedBeef)}
+       c.Check(
+               BuildPullLists(map[Locator]PullServers{
+                       locator1: PullServers{To: []string{"t1"}, From: []string{"f1", "f2"}},
+                       locator2: PullServers{To: []string{"t2", "t1"}, From: []string{"f3", "f4"}},
+                       locator3: PullServers{To: []string{"t3", "t2", "t1"}, From: []string{"f4", "f5"}},
+                       locator4: PullServers{To: []string{"t4", "t3", "t2", "t1"}, From: []string{"f1", "f5"}},
+               }),
+               PullListMapEquals,
+               map[string]PullList{
+                       "t1": PullList{Entries: []PullListEntry{
+                               PullListEntry{
+                                       Locator: locator1,
+                                       Servers: []string{"f1", "f2"}},
+                               PullListEntry{
+                                       Locator: locator2,
+                                       Servers: []string{"f3", "f4"}},
+                               PullListEntry{
+                                       Locator: locator3,
+                                       Servers: []string{"f4", "f5"}},
+                               PullListEntry{
+                                       Locator: locator4,
+                                       Servers: []string{"f1", "f5"}},
+                       }},
+                       "t2": PullList{Entries: []PullListEntry{
+                               PullListEntry{
+                                       Locator: locator2,
+                                       Servers: []string{"f3", "f4"}},
+                               PullListEntry{
+                                       Locator: locator3,
+                                       Servers: []string{"f4", "f5"}},
+                               PullListEntry{
+                                       Locator: locator4,
+                                       Servers: []string{"f1", "f5"}},
+                       }},
+                       "t3": PullList{Entries: []PullListEntry{
+                               PullListEntry{
+                                       Locator: locator3,
+                                       Servers: []string{"f4", "f5"}},
+                               PullListEntry{
+                                       Locator: locator4,
+                                       Servers: []string{"f1", "f5"}},
+                       }},
+                       "t4": PullList{Entries: []PullListEntry{
+                               PullListEntry{
+                                       Locator: locator4,
+                                       Servers: []string{"f1", "f5"}},
+                       }},
+               })
+}
+
+func (s *MySuite) TestRemoveProtocolPrefix(c *C) {
+       c.Check(RemoveProtocolPrefix("blah"), Equals, "blah")
+       c.Check(RemoveProtocolPrefix("bl/ah"), Equals, "ah")
+       c.Check(RemoveProtocolPrefix("http://blah.com"), Equals, "blah.com")
+       c.Check(RemoveProtocolPrefix("https://blah.com:8900"), Equals, "blah.com:8900")
+}