Started focusing on Keep Server responses again. Switched to using blockdigest instea...
authormishaz <misha@curoverse.com>
Wed, 7 Jan 2015 04:16:40 +0000 (04:16 +0000)
committerTom Clegg <tom@curoverse.com>
Fri, 13 Feb 2015 21:23:53 +0000 (16:23 -0500)
services/datamanager/datamanager.go
services/datamanager/keep/keep.go

index 6393787e01748d2cffbc5897fbb67c248dcfb229..7b79f0f5bc03903e6e6f46250d8a83c91ff7ca45 100644 (file)
@@ -7,7 +7,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/util"
        "git.curoverse.com/arvados.git/services/datamanager/collection"
-//     "git.curoverse.com/arvados.git/services/datamanager/keep"
+       "git.curoverse.com/arvados.git/services/datamanager/keep"
        "log"
 )
 
@@ -27,29 +27,35 @@ func main() {
 
        // TODO(misha): Read Collections and Keep Contents concurrently as goroutines.
 
-       readCollections := collection.GetCollections(
-               collection.GetCollectionsParams{
-                       Client: arv, BatchSize: 500})
+       // readCollections := collection.GetCollections(
+       //      collection.GetCollectionsParams{
+       //              Client: arv, BatchSize: 500})
 
-       //log.Printf("Read Collections: %v", readCollections)
+       // UserUsage := ComputeSizeOfOwnedCollections(readCollections)
+       // log.Printf("Uuid to Size used: %v", UserUsage)
 
-       UserUsage := ComputeSizeOfOwnedCollections(readCollections)
-       log.Printf("Uuid to Size used: %v", UserUsage)
+       // // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
+       // // lots of behaviors can become warnings (and obviously we can't
+       // // write anything).
+       // // if !readCollections.ReadAllCollections {
+       // //   log.Fatalf("Did not read all collections")
+       // // }
 
-       // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
-       // lots of behaviors can become warnings (and obviously we can't
-       // write anything).
-       // if !readCollections.ReadAllCollections {
-       //      log.Fatalf("Did not read all collections")
-       // }
+       // log.Printf("Read and processed %d collections",
+       //      len(readCollections.UuidToCollection))
 
-       log.Printf("Read and processed %d collections",
-               len(readCollections.UuidToCollection))
+       readServers := keep.GetKeepServers(
+               keep.GetKeepServersParams{Client: arv, Limit: 1000})
 
-       // readServers := keep.GetKeepServers(
-       //      keep.GetKeepServersParams{Client: arv, Limit: 1000})
+       log.Printf("Returned %d keep disks", len(readServers.ServerToContents))
 
-       // log.Printf("Returned %d keep disks", len(readServers.AddressToContents))
+       blockReplicationCounts := make(map[int]int)
+       for _, infos := range readServers.BlockToServers {
+               replication := len(infos)
+               blockReplicationCounts[replication] += 1
+       }
+
+       log.Printf("Replication level distribution: %v", blockReplicationCounts)
 }
 
 func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) (
index e600c4ac071fb9b67ee9c3f861ade9594ee48177..91af2014aee83272608b8653e0f861a3880262ce 100644 (file)
@@ -8,6 +8,7 @@ import (
        "fmt"
        //"git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
        "git.curoverse.com/arvados.git/sdk/go/util"
        "log"
@@ -23,14 +24,22 @@ type ServerAddress struct {
        Port int `json:"service_port"`
 }
 
+// Info about a particular block returned by the server
 type BlockInfo struct {
-       Digest     string
+       Digest     blockdigest.BlockDigest
        Size       int
        Mtime      int  // TODO(misha): Replace this with a timestamp.
 }
 
+// Info about a specified block given by a server
+type BlockServerInfo struct {
+       ServerIndex int
+       Size        int
+       Mtime       int  // TODO(misha): Replace this with a timestamp.
+}
+
 type ServerContents struct {
-       BlockDigestToInfo map[string]BlockInfo
+       BlockDigestToInfo map[blockdigest.BlockDigest]BlockInfo
 }
 
 type ServerResponse struct {
@@ -40,7 +49,10 @@ type ServerResponse struct {
 
 type ReadServers struct {
        ReadAllServers bool
-       AddressToContents map[ServerAddress]ServerContents
+       KeepServerIndexToAddress []ServerAddress
+       KeepServerAddressToIndex map[ServerAddress]int
+       ServerToContents map[ServerAddress]ServerContents
+       BlockToServers map[blockdigest.BlockDigest][]BlockServerInfo
 }
 
 type GetKeepServersParams struct {
@@ -130,6 +142,14 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
                        numAvailable)
        }
 
+       results.KeepServerIndexToAddress = sdkResponse.KeepServers
+       results.KeepServerAddressToIndex = make(map[ServerAddress]int)
+       for i, address := range results.KeepServerIndexToAddress {
+               results.KeepServerAddressToIndex[address] = i
+       }
+
+       log.Printf("Got Server Addresses: %v", results)
+
        // This is safe for concurrent use
        client := http.Client{}
 
@@ -139,7 +159,8 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
                go GetServerContents(keepServer, client, responseChan)
        }
 
-       results.AddressToContents = make(map[ServerAddress]ServerContents)
+       results.ServerToContents = make(map[ServerAddress]ServerContents)
+       results.BlockToServers = make(map[blockdigest.BlockDigest][]BlockServerInfo)
 
        // Read all the responses
        for i := range sdkResponse.KeepServers {
@@ -148,7 +169,15 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
                log.Printf("Received channel response from %v containing %d files",
                        response.Address,
                        len(response.Contents.BlockDigestToInfo))
-               results.AddressToContents[response.Address] = response.Contents
+               results.ServerToContents[response.Address] = response.Contents
+               serverIndex := results.KeepServerAddressToIndex[response.Address]
+               for _, blockInfo := range response.Contents.BlockDigestToInfo {
+                       results.BlockToServers[blockInfo.Digest] = append(
+                               results.BlockToServers[blockInfo.Digest],
+                               BlockServerInfo{ServerIndex: serverIndex,
+                                       Size: blockInfo.Size,
+                                       Mtime: blockInfo.Mtime})
+               }
        }
        return
 }
@@ -183,7 +212,8 @@ func GetServerContents(keepServer ServerAddress,
 
        response := ServerResponse{}
        response.Address = keepServer
-       response.Contents.BlockDigestToInfo = make(map[string]BlockInfo)
+       response.Contents.BlockDigestToInfo =
+               make(map[blockdigest.BlockDigest]BlockInfo)
        scanner := bufio.NewScanner(resp.Body)
        numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
        for scanner.Scan() {