"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/util"
"git.curoverse.com/arvados.git/services/datamanager/collection"
-// "git.curoverse.com/arvados.git/services/datamanager/keep"
+ "git.curoverse.com/arvados.git/services/datamanager/keep"
"log"
)
// TODO(misha): Read Collections and Keep Contents concurrently as goroutines.
- readCollections := collection.GetCollections(
- collection.GetCollectionsParams{
- Client: arv, BatchSize: 500})
+ // readCollections := collection.GetCollections(
+ // collection.GetCollectionsParams{
+ // Client: arv, BatchSize: 500})
- //log.Printf("Read Collections: %v", readCollections)
+ // UserUsage := ComputeSizeOfOwnedCollections(readCollections)
+ // log.Printf("Uuid to Size used: %v", UserUsage)
- UserUsage := ComputeSizeOfOwnedCollections(readCollections)
- log.Printf("Uuid to Size used: %v", UserUsage)
+ // // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
+ // // lots of behaviors can become warnings (and obviously we can't
+ // // write anything).
+ // // if !readCollections.ReadAllCollections {
+ // // log.Fatalf("Did not read all collections")
+ // // }
- // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
- // lots of behaviors can become warnings (and obviously we can't
- // write anything).
- // if !readCollections.ReadAllCollections {
- // log.Fatalf("Did not read all collections")
- // }
+ // log.Printf("Read and processed %d collections",
+ // len(readCollections.UuidToCollection))
- log.Printf("Read and processed %d collections",
- len(readCollections.UuidToCollection))
+ readServers := keep.GetKeepServers(
+ keep.GetKeepServersParams{Client: arv, Limit: 1000})
- // readServers := keep.GetKeepServers(
- // keep.GetKeepServersParams{Client: arv, Limit: 1000})
+ log.Printf("Returned %d keep disks", len(readServers.ServerToContents))
- // log.Printf("Returned %d keep disks", len(readServers.AddressToContents))
+ blockReplicationCounts := make(map[int]int)
+ for _, infos := range readServers.BlockToServers {
+ replication := len(infos)
+ blockReplicationCounts[replication] += 1
+ }
+
+ log.Printf("Replication level distribution: %v", blockReplicationCounts)
}
func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) (
"fmt"
//"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/blockdigest"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"git.curoverse.com/arvados.git/sdk/go/util"
"log"
Port int `json:"service_port"`
}
+// Info about a particular block returned by the server
type BlockInfo struct {
- Digest string
+ Digest blockdigest.BlockDigest
Size int
Mtime int // TODO(misha): Replace this with a timestamp.
}
+// Info about a specified block given by a server
+type BlockServerInfo struct {
+ ServerIndex int
+ Size int
+ Mtime int // TODO(misha): Replace this with a timestamp.
+}
+
type ServerContents struct {
- BlockDigestToInfo map[string]BlockInfo
+ BlockDigestToInfo map[blockdigest.BlockDigest]BlockInfo
}
type ServerResponse struct {
type ReadServers struct {
ReadAllServers bool
- AddressToContents map[ServerAddress]ServerContents
+ KeepServerIndexToAddress []ServerAddress
+ KeepServerAddressToIndex map[ServerAddress]int
+ ServerToContents map[ServerAddress]ServerContents
+ BlockToServers map[blockdigest.BlockDigest][]BlockServerInfo
}
type GetKeepServersParams struct {
numAvailable)
}
+ results.KeepServerIndexToAddress = sdkResponse.KeepServers
+ results.KeepServerAddressToIndex = make(map[ServerAddress]int)
+ for i, address := range results.KeepServerIndexToAddress {
+ results.KeepServerAddressToIndex[address] = i
+ }
+
+ log.Printf("Got Server Addresses: %v", results)
+
// This is safe for concurrent use
client := http.Client{}
go GetServerContents(keepServer, client, responseChan)
}
- results.AddressToContents = make(map[ServerAddress]ServerContents)
+ results.ServerToContents = make(map[ServerAddress]ServerContents)
+ results.BlockToServers = make(map[blockdigest.BlockDigest][]BlockServerInfo)
// Read all the responses
for i := range sdkResponse.KeepServers {
log.Printf("Received channel response from %v containing %d files",
response.Address,
len(response.Contents.BlockDigestToInfo))
- results.AddressToContents[response.Address] = response.Contents
+ results.ServerToContents[response.Address] = response.Contents
+ serverIndex := results.KeepServerAddressToIndex[response.Address]
+ for _, blockInfo := range response.Contents.BlockDigestToInfo {
+ results.BlockToServers[blockInfo.Digest] = append(
+ results.BlockToServers[blockInfo.Digest],
+ BlockServerInfo{ServerIndex: serverIndex,
+ Size: blockInfo.Size,
+ Mtime: blockInfo.Mtime})
+ }
}
return
}
response := ServerResponse{}
response.Address = keepServer
- response.Contents.BlockDigestToInfo = make(map[string]BlockInfo)
+ response.Contents.BlockDigestToInfo =
+ make(map[blockdigest.BlockDigest]BlockInfo)
scanner := bufio.NewScanner(resp.Body)
numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
for scanner.Scan() {