}
{
hash2, replicas, err := kc.PutB(content)
- c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
+ c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
c.Check(replicas, Equals, 2)
c.Check(err, Equals, nil)
}
for arg, val in keep_args.iteritems():
keep_cmd.append("{}={}".format(arg, val))
+ logf = open(os.path.join(TEST_TMPDIR, 'keep{}.log'.format(n)), 'a+')
kp0 = subprocess.Popen(
- keep_cmd, stdin=open('/dev/null'), stdout=sys.stderr)
+ keep_cmd, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
with open(_pidfile('keep{}'.format(n)), 'w') as f:
f.write(str(kp0.pid))
stop_keep()
keep_args = {}
- if blob_signing_key:
- with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
- keep_args['--permission-key-file'] = f.name
- f.write(blob_signing_key)
+ if not blob_signing_key:
+ blob_signing_key = 'zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc'
+ with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
+ keep_args['-blob-signing-key-file'] = f.name
+ f.write(blob_signing_key)
if enforce_permissions:
- keep_args['--enforce-permissions'] = 'true'
+ keep_args['-enforce-permissions'] = 'true'
+ with open(os.path.join(TEST_TMPDIR, "keep.data-manager-token-file"), "w") as f:
+ keep_args['-data-manager-token-file'] = f.name
+ f.write(os.environ['ARVADOS_API_TOKEN'])
+ keep_args['-never-delete'] = 'false'
api = arvados.api(
version='v1',
c2.save()
c1.update()
- self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3 7ac66c0f148de9519b8bd264312c4d64\+7\+A[a-f0-9]{40}@[a-f0-9]{8} 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+ self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
if __name__ == '__main__':
maxManifestSize uint64
)
+// Collection representation
type Collection struct {
- Uuid string
- OwnerUuid string
+ UUID string
+ OwnerUUID string
ReplicationLevel int
BlockDigestToSize map[blockdigest.BlockDigest]int
TotalSize int
}
+// ReadCollections holds information about collections from API server
type ReadCollections struct {
ReadAllCollections bool
- UuidToCollection map[string]Collection
+ UUIDToCollection map[string]Collection
OwnerToCollectionSize map[string]int
BlockToDesiredReplication map[blockdigest.DigestWithSize]int
- CollectionUuidToIndex map[string]int
- CollectionIndexToUuid []string
+ CollectionUUIDToIndex map[string]int
+ CollectionIndexToUUID []string
BlockToCollectionIndices map[blockdigest.DigestWithSize][]int
}
+// GetCollectionsParams params
type GetCollectionsParams struct {
Client arvadosclient.ArvadosClient
Logger *logger.Logger
BatchSize int
}
+// SdkCollectionInfo holds collection info from api
type SdkCollectionInfo struct {
- Uuid string `json:"uuid"`
- OwnerUuid string `json:"owner_uuid"`
+ UUID string `json:"uuid"`
+ OwnerUUID string `json:"owner_uuid"`
Redundancy int `json:"redundancy"`
ModifiedAt time.Time `json:"modified_at"`
ManifestText string `json:"manifest_text"`
}
+// SdkCollectionList lists collections from api
type SdkCollectionList struct {
ItemsAvailable int `json:"items_available"`
Items []SdkCollectionInfo `json:"items"`
"File to write the heap profiles to. Leave blank to skip profiling.")
}
-// Write the heap profile to a file for later review.
+// WriteHeapProfile writes the heap profile to a file for later review.
// Since a file is expected to only contain a single heap profile this
// function overwrites the previously written profile, so it is safe
// to call multiple times in a single run.
func WriteHeapProfile() {
if heapProfileFilename != "" {
- heap_profile, err := os.Create(heapProfileFilename)
+ heapProfile, err := os.Create(heapProfileFilename)
if err != nil {
log.Fatal(err)
}
- defer heap_profile.Close()
+ defer heapProfile.Close()
- err = pprof.WriteHeapProfile(heap_profile)
+ err = pprof.WriteHeapProfile(heapProfile)
if err != nil {
log.Fatal(err)
}
}
}
+// GetCollectionsAndSummarize gets collections from api and summarizes
func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
results = GetCollections(params)
results.Summarize(params.Logger)
log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
log.Printf("Read and processed %d collections",
- len(results.UuidToCollection))
+ len(results.UUIDToCollection))
// TODO(misha): Add a "readonly" flag. If we're in readonly mode,
// lots of behaviors can become warnings (and obviously we can't
return
}
+// GetCollections gets collections from api
func GetCollections(params GetCollectionsParams) (results ReadCollections) {
if ¶ms.Client == nil {
log.Fatalf("params.Client passed to GetCollections() should " +
// that we don't have to grow the map in most cases.
maxExpectedCollections := int(
float64(initialNumberOfCollectionsAvailable) * 1.01)
- results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
+ results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
if params.Logger != nil {
params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
ProcessCollections(params.Logger,
collections.Items,
defaultReplicationLevel,
- results.UuidToCollection).Format(time.RFC3339)
+ results.UUIDToCollection).Format(time.RFC3339)
// update counts
previousTotalCollections = totalCollections
- totalCollections = len(results.UuidToCollection)
+ totalCollections = len(results.UUIDToCollection)
log.Printf("%d collections read, %d new in last batch, "+
"%s latest modified date, %.0f %d %d avg,max,total manifest size",
return string([]byte(s))
}
+// ProcessCollections read from api server
func ProcessCollections(arvLogger *logger.Logger,
receivedCollections []SdkCollectionInfo,
defaultReplicationLevel int,
- uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
+ UUIDToCollection map[string]Collection) (latestModificationDate time.Time) {
for _, sdkCollection := range receivedCollections {
- collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
- OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
+ collection := Collection{UUID: StrCopy(sdkCollection.UUID),
+ OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
ReplicationLevel: sdkCollection.Redundancy,
BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
manifest := manifest.Manifest{sdkCollection.ManifestText}
manifestSize := uint64(len(sdkCollection.ManifestText))
- if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
+ if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
totalManifestSize += manifestSize
}
if manifestSize > maxManifestSize {
blockChannel := manifest.BlockIterWithDuplicates()
for block := range blockChannel {
- if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
+ if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
message := fmt.Sprintf(
"Collection %s contains multiple sizes (%d and %d) for block %s",
- collection.Uuid,
- stored_size,
+ collection.UUID,
+ storedSize,
block.Size,
block.Digest)
loggerutil.FatalWithMessage(arvLogger, message)
for _, size := range collection.BlockDigestToSize {
collection.TotalSize += size
}
- uuidToCollection[collection.Uuid] = collection
+ UUIDToCollection[collection.UUID] = collection
// Clear out all the manifest strings that we don't need anymore.
// These hopefully form the bulk of our memory usage.
return
}
+// Summarize the collections read
func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
readCollections.OwnerToCollectionSize = make(map[string]int)
readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
- numCollections := len(readCollections.UuidToCollection)
- readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
- readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
+ numCollections := len(readCollections.UUIDToCollection)
+ readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
+ readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
- for _, coll := range readCollections.UuidToCollection {
- collectionIndex := len(readCollections.CollectionIndexToUuid)
- readCollections.CollectionIndexToUuid =
- append(readCollections.CollectionIndexToUuid, coll.Uuid)
- readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
+ for _, coll := range readCollections.UUIDToCollection {
+ collectionIndex := len(readCollections.CollectionIndexToUUID)
+ readCollections.CollectionIndexToUUID =
+ append(readCollections.CollectionIndexToUUID, coll.UUID)
+ readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
- readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
- readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+ readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
+ readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
for block, size := range coll.BlockDigestToSize {
locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
var _ = Suite(&MySuite{})
// This captures the result we expect from
-// ReadCollections.Summarize(). Because CollectionUuidToIndex is
+// ReadCollections.Summarize(). Because CollectionUUIDToIndex is
// indeterminate, we replace BlockToCollectionIndices with
// BlockToCollectionUuids.
type ExpectedSummary struct {
uuidSet := make(map[string]struct{})
summarizedBlockToCollectionUuids[digest] = uuidSet
for _, index := range indices {
- uuidSet[summarized.CollectionIndexToUuid[index]] = struct{}{}
+ uuidSet[summarized.CollectionIndexToUUID[index]] = struct{}{}
}
}
rc.Summarize(nil)
- c := rc.UuidToCollection["col0"]
+ c := rc.UUIDToCollection["col0"]
blockDigest1 := blockdigest.MakeTestDigestWithSize(1)
blockDigest2 := blockdigest.MakeTestDigestWithSize(2)
expected := ExpectedSummary{
- OwnerToCollectionSize: map[string]int{c.OwnerUuid: c.TotalSize},
+ OwnerToCollectionSize: map[string]int{c.OwnerUUID: c.TotalSize},
BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{blockDigest1: 5, blockDigest2: 5},
- BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{blockDigest1: []string{c.Uuid}, blockDigest2: []string{c.Uuid}},
+ BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{blockDigest1: []string{c.UUID}, blockDigest2: []string{c.UUID}},
}
CompareSummarizedReadCollections(checker, rc, expected)
rc.Summarize(nil)
- c0 := rc.UuidToCollection["col0"]
- c1 := rc.UuidToCollection["col1"]
+ c0 := rc.UUIDToCollection["col0"]
+ c1 := rc.UUIDToCollection["col1"]
blockDigest1 := blockdigest.MakeTestDigestWithSize(1)
blockDigest2 := blockdigest.MakeTestDigestWithSize(2)
expected := ExpectedSummary{
OwnerToCollectionSize: map[string]int{
- c0.OwnerUuid: c0.TotalSize,
- c1.OwnerUuid: c1.TotalSize,
+ c0.OwnerUUID: c0.TotalSize,
+ c1.OwnerUUID: c1.TotalSize,
},
BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{
blockDigest1: 5,
blockDigest3: 8,
},
BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{
- blockDigest1: []string{c0.Uuid},
- blockDigest2: []string{c0.Uuid, c1.Uuid},
- blockDigest3: []string{c1.Uuid},
+ blockDigest1: []string{c0.UUID},
+ blockDigest2: []string{c0.UUID, c1.UUID},
+ blockDigest3: []string{c1.UUID},
},
}
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
)
+// TestCollectionSpec with test blocks and desired replication level
type TestCollectionSpec struct {
// The desired replication level
ReplicationLevel int
Blocks []int
}
-// Creates a ReadCollections object for testing based on the give
-// specs. Only the ReadAllCollections and UuidToCollection fields are
-// populated. To populate other fields call rc.Summarize().
+// MakeTestReadCollections creates a ReadCollections object for testing
+// based on the give specs. Only the ReadAllCollections and UUIDToCollection
+// fields are populated. To populate other fields call rc.Summarize().
func MakeTestReadCollections(specs []TestCollectionSpec) (rc ReadCollections) {
rc = ReadCollections{
ReadAllCollections: true,
- UuidToCollection: map[string]Collection{},
+ UUIDToCollection: map[string]Collection{},
}
for i, spec := range specs {
c := Collection{
- Uuid: fmt.Sprintf("col%d", i),
- OwnerUuid: fmt.Sprintf("owner%d", i),
+ UUID: fmt.Sprintf("col%d", i),
+ OwnerUUID: fmt.Sprintf("owner%d", i),
ReplicationLevel: spec.ReplicationLevel,
BlockDigestToSize: map[blockdigest.BlockDigest]int{},
}
- rc.UuidToCollection[c.Uuid] = c
+ rc.UUIDToCollection[c.UUID] = c
for _, j := range spec.Blocks {
c.BlockDigestToSize[blockdigest.MakeTestBlockDigest(j)] = j
}
return
}
-// Returns a slice giving the collection index of each collection that
-// was passed in to MakeTestReadCollections. rc.Summarize() must be
-// called before this method, since Summarize() assigns an index to
-// each collection.
+// CollectionIndicesForTesting returns a slice giving the collection
+// index of each collection that was passed in to MakeTestReadCollections.
+// rc.Summarize() must be called before this method, since Summarize()
+// assigns an index to each collection.
func (rc ReadCollections) CollectionIndicesForTesting() (indices []int) {
// TODO(misha): Assert that rc.Summarize() has been called.
- numCollections := len(rc.CollectionIndexToUuid)
+ numCollections := len(rc.CollectionIndexToUUID)
indices = make([]int, numCollections)
for i := 0; i < numCollections; i++ {
- indices[i] = rc.CollectionUuidToIndex[fmt.Sprintf("col%d", i)]
+ indices[i] = rc.CollectionUUIDToIndex[fmt.Sprintf("col%d", i)]
}
return
}
package main
import (
+ "errors"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
func main() {
flag.Parse()
if minutesBetweenRuns == 0 {
- err := singlerun()
+ err := singlerun(makeArvadosClient())
if err != nil {
- log.Fatalf("Got an error: %v", err)
+ log.Fatalf("singlerun: %v", err)
}
} else {
waitTime := time.Minute * time.Duration(minutesBetweenRuns)
for {
log.Println("Beginning Run")
- err := singlerun()
+ err := singlerun(makeArvadosClient())
if err != nil {
- log.Printf("Got an error: %v", err)
+ log.Printf("singlerun: %v", err)
}
log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
time.Sleep(waitTime)
}
}
-func singlerun() error {
+func makeArvadosClient() arvadosclient.ArvadosClient {
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Fatalf("Error setting up arvados client %s", err.Error())
+ log.Fatalf("Error setting up arvados client: %s", err)
}
+ return arv
+}
- if is_admin, err := util.UserIsAdmin(arv); err != nil {
- log.Fatalf("Error querying current arvados user %s", err.Error())
- } else if !is_admin {
- log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
+func singlerun(arv arvadosclient.ArvadosClient) error {
+ var err error
+ if isAdmin, err := util.UserIsAdmin(arv); err != nil {
+ return errors.New("Error verifying admin token: " + err.Error())
+ } else if !isAdmin {
+ return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
}
var arvLogger *logger.Logger
if trashErr != nil {
return err
- } else {
- keep.SendTrashLists(keep.GetDataManagerToken(arvLogger), kc, trashLists)
}
+ keep.SendTrashLists(kc, trashLists)
return nil
}
-// Returns a data fetcher that fetches data from remote servers.
+// BuildDataFetcher returns a data fetcher that fetches data from remote servers.
func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
return func(arvLogger *logger.Logger,
readCollections *collection.ReadCollections,
--- /dev/null
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "os/exec"
+ "regexp"
+ "strings"
+ "testing"
+ "time"
+)
+
+const (
+ ActiveUserToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+ AdminToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
+)
+
+var arv arvadosclient.ArvadosClient
+var keepClient *keepclient.KeepClient
+var keepServers []string
+
+func SetupDataManagerTest(t *testing.T) {
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+
+ // start api and keep servers
+ arvadostest.ResetEnv()
+ arvadostest.StartAPI()
+ arvadostest.StartKeep()
+
+ arv = makeArvadosClient()
+
+ // keep client
+ keepClient = &keepclient.KeepClient{
+ Arvados: &arv,
+ Want_replicas: 2,
+ Using_proxy: true,
+ Client: &http.Client{},
+ }
+
+ // discover keep services
+ if err := keepClient.DiscoverKeepServers(); err != nil {
+ t.Fatalf("Error discovering keep services: %s", err)
+ }
+ keepServers = []string{}
+ for _, host := range keepClient.LocalRoots() {
+ keepServers = append(keepServers, host)
+ }
+}
+
+func TearDownDataManagerTest(t *testing.T) {
+ arvadostest.StopKeep()
+ arvadostest.StopAPI()
+}
+
+func putBlock(t *testing.T, data string) string {
+ locator, _, err := keepClient.PutB([]byte(data))
+ if err != nil {
+ t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
+ }
+ if locator == "" {
+ t.Fatalf("No locator found after putting test data")
+ }
+
+ splits := strings.Split(locator, "+")
+ return splits[0] + "+" + splits[1]
+}
+
+func getBlock(t *testing.T, locator string, data string) {
+ reader, blocklen, _, err := keepClient.Get(locator)
+ if err != nil {
+ t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
+ }
+ if reader == nil {
+ t.Fatalf("No reader found after putting test data")
+ }
+ if blocklen != int64(len(data)) {
+ t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
+ }
+
+ all, err := ioutil.ReadAll(reader)
+ if string(all) != data {
+ t.Fatalf("Data read %s did not match expected data %s", string(all), data)
+ }
+}
+
+// Create a collection using arv-put
+func createCollection(t *testing.T, data string) string {
+ tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
+ defer os.Remove(tempfile.Name())
+
+ _, err = tempfile.Write([]byte(data))
+ if err != nil {
+ t.Fatalf("Error writing to tempfile %v", err)
+ }
+
+ // arv-put
+ output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
+ if err != nil {
+ t.Fatalf("Error running arv-put %s", err)
+ }
+
+ uuid := string(output[0:27]) // trim terminating char
+ return uuid
+}
+
+// Get collection locator
+var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
+
+func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
+ manifest := getCollection(t, uuid)["manifest_text"].(string)
+
+ locator := strings.Split(manifest, " ")[1]
+ match := locatorMatcher.FindStringSubmatch(locator)
+ if match == nil {
+ t.Fatalf("No locator found in collection manifest %s", manifest)
+ }
+
+ return match[1] + "+" + match[2]
+}
+
+func getCollection(t *testing.T, uuid string) Dict {
+ getback := make(Dict)
+ err := arv.Get("collections", uuid, nil, &getback)
+ if err != nil {
+ t.Fatalf("Error getting collection %s", err)
+ }
+ if getback["uuid"] != uuid {
+ t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
+ }
+
+ return getback
+}
+
+func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
+ err := arv.Update("collections", uuid, arvadosclient.Dict{
+ "collection": arvadosclient.Dict{
+ paramName: paramValue,
+ },
+ }, &arvadosclient.Dict{})
+
+ if err != nil {
+ t.Fatalf("Error updating collection %s", err)
+ }
+}
+
+type Dict map[string]interface{}
+
+func deleteCollection(t *testing.T, uuid string) {
+ getback := make(Dict)
+ err := arv.Delete("collections", uuid, nil, &getback)
+ if err != nil {
+ t.Fatalf("Error deleting collection %s", err)
+ }
+ if getback["uuid"] != uuid {
+ t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
+ }
+}
+
+func dataManagerSingleRun(t *testing.T) {
+ err := singlerun(arv)
+ if err != nil {
+ t.Fatalf("Error during singlerun %s", err)
+ }
+}
+
+func getBlockIndexesForServer(t *testing.T, i int) []string {
+ var indexes []string
+
+ path := keepServers[i] + "/index"
+ client := http.Client{}
+ req, err := http.NewRequest("GET", path, nil)
+ req.Header.Add("Authorization", "OAuth2 "+AdminToken)
+ req.Header.Add("Content-Type", "application/octet-stream")
+ resp, err := client.Do(req)
+ defer resp.Body.Close()
+
+ if err != nil {
+ t.Fatalf("Error during %s %s", path, err)
+ }
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ t.Fatalf("Error reading response from %s %s", path, err)
+ }
+
+ lines := strings.Split(string(body), "\n")
+ for _, line := range lines {
+ indexes = append(indexes, strings.Split(line, " ")...)
+ }
+
+ return indexes
+}
+
+func getBlockIndexes(t *testing.T) [][]string {
+ var indexes [][]string
+
+ for i := 0; i < len(keepServers); i++ {
+ indexes = append(indexes, getBlockIndexesForServer(t, i))
+ }
+ return indexes
+}
+
+func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
+ blocks := getBlockIndexes(t)
+
+ for _, block := range notExpected {
+ for _, idx := range blocks {
+ if valueInArray(block, idx) {
+ t.Fatalf("Found unexpected block %s", block)
+ }
+ }
+ }
+
+ for _, block := range expected {
+ nFound := 0
+ for _, idx := range blocks {
+ if valueInArray(block, idx) {
+ nFound++
+ }
+ }
+ if nFound < minReplication {
+ t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
+ }
+ }
+}
+
+func valueInArray(value string, list []string) bool {
+ for _, v := range list {
+ if value == v {
+ return true
+ }
+ }
+ return false
+}
+
+/*
+Test env uses two keep volumes. The volume names can be found by reading the files
+ ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
+
+The keep volumes are of the dir structure:
+ volumeN/subdir/locator
+*/
+func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
+ // First get rid of any size hints in the locators
+ var trimmedBlockLocators []string
+ for _, block := range oldUnusedBlockLocators {
+ trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
+ }
+
+ // Get the working dir so that we can read keep{n}.volume files
+ wd, err := os.Getwd()
+ if err != nil {
+ t.Fatalf("Error getting working dir %s", err)
+ }
+
+ // Now cycle through the two keep volumes
+ oldTime := time.Now().AddDate(0, -2, 0)
+ for i := 0; i < 2; i++ {
+ filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
+ volumeDir, err := ioutil.ReadFile(filename)
+ if err != nil {
+ t.Fatalf("Error reading keep volume file %s %s", filename, err)
+ }
+
+ // Read the keep volume dir structure
+ volumeContents, err := ioutil.ReadDir(string(volumeDir))
+ if err != nil {
+ t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
+ }
+
+ // Read each subdir for each of the keep volume dir
+ for _, subdir := range volumeContents {
+ subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
+ subdirContents, err := ioutil.ReadDir(string(subdirName))
+ if err != nil {
+ t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
+ }
+
+ // Now we got to the files. The files are names are the block locators
+ for _, fileInfo := range subdirContents {
+ blockName := fileInfo.Name()
+ myname := fmt.Sprintf("%s/%s", subdirName, blockName)
+ if valueInArray(blockName, trimmedBlockLocators) {
+ err = os.Chtimes(myname, oldTime, oldTime)
+ }
+ }
+ }
+ }
+}
+
+func getStatus(t *testing.T, path string) interface{} {
+ client := http.Client{}
+ req, err := http.NewRequest("GET", path, nil)
+ req.Header.Add("Authorization", "OAuth2 "+AdminToken)
+ req.Header.Add("Content-Type", "application/octet-stream")
+ resp, err := client.Do(req)
+ if err != nil {
+ t.Fatalf("Error during %s %s", path, err)
+ }
+ defer resp.Body.Close()
+
+ var s interface{}
+ json.NewDecoder(resp.Body).Decode(&s)
+
+ return s
+}
+
+// Wait until PullQueue and TrashQueue are empty on all keepServers.
+func waitUntilQueuesFinishWork(t *testing.T) {
+ for _, ks := range keepServers {
+ for done := false; !done; {
+ time.Sleep(100 * time.Millisecond)
+ s := getStatus(t, ks+"/status.json")
+ for _, qName := range []string{"PullQueue", "TrashQueue"} {
+ qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
+ if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
+ done = true
+ }
+ }
+ }
+ }
+}
+
+/*
+Create some blocks and backdate some of them.
+Also create some collections and delete some of them.
+Verify block indexes.
+*/
+func TestPutAndGetBlocks(t *testing.T) {
+ defer TearDownDataManagerTest(t)
+ SetupDataManagerTest(t)
+
+ // Put some blocks which will be backdated later on
+ // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
+ // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
+ var oldUnusedBlockLocators []string
+ oldUnusedBlockData := "this block will have older mtime"
+ for i := 0; i < 5; i++ {
+ oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
+ }
+ for i := 0; i < 5; i++ {
+ getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
+ }
+
+ // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
+ oldUsedBlockData := "this collection block will have older mtime"
+ oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
+ getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
+
+ // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
+ // Hence, even though unreferenced, these should not be deleted when datamanager runs.
+ var newBlockLocators []string
+ newBlockData := "this block is newer"
+ for i := 0; i < 5; i++ {
+ newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
+ }
+ for i := 0; i < 5; i++ {
+ getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
+ }
+
+ // Create a collection that would be deleted later on
+ toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
+ toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
+
+ // Create another collection that has the same data as the one of the old blocks
+ oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
+ oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
+ if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
+ t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
+ }
+
+ // Create another collection whose replication level will be changed
+ replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
+ replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
+
+ // Create two collections with same data; one will be deleted later on
+ dataForTwoCollections := "one of these collections will be deleted"
+ oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
+ oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
+ secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
+ secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
+ if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
+ t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
+ }
+
+ // Verify blocks before doing any backdating / deleting.
+ var expected []string
+ expected = append(expected, oldUnusedBlockLocators...)
+ expected = append(expected, newBlockLocators...)
+ expected = append(expected, toBeDeletedCollectionLocator)
+ expected = append(expected, replicationCollectionLocator)
+ expected = append(expected, oneOfTwoWithSameDataLocator)
+ expected = append(expected, secondOfTwoWithSameDataLocator)
+
+ verifyBlocks(t, nil, expected, 2)
+
+ // Run datamanager in singlerun mode
+ dataManagerSingleRun(t)
+ waitUntilQueuesFinishWork(t)
+
+ verifyBlocks(t, nil, expected, 2)
+
+ // Backdate the to-be old blocks and delete the collections
+ backdateBlocks(t, oldUnusedBlockLocators)
+ deleteCollection(t, toBeDeletedCollectionUUID)
+ deleteCollection(t, secondOfTwoWithSameDataUUID)
+
+ // Run data manager again
+ dataManagerSingleRun(t)
+ waitUntilQueuesFinishWork(t)
+
+ // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
+ expected = expected[:0]
+ expected = append(expected, oldUsedBlockLocator)
+ expected = append(expected, newBlockLocators...)
+ expected = append(expected, toBeDeletedCollectionLocator)
+ expected = append(expected, oneOfTwoWithSameDataLocator)
+ expected = append(expected, secondOfTwoWithSameDataLocator)
+
+ verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
+
+ // Reduce desired replication on replicationCollectionUUID
+ // collection, and verify that Data Manager does not reduce
+ // actual replication any further than that. (It might not
+ // reduce actual replication at all; that's OK for this test.)
+
+ // Reduce desired replication level.
+ updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
+ collection := getCollection(t, replicationCollectionUUID)
+ if collection["replication_desired"].(interface{}) != float64(1) {
+ t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
+ }
+
+ // Verify data is currently overreplicated.
+ verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
+
+ // Run data manager again
+ dataManagerSingleRun(t)
+ waitUntilQueuesFinishWork(t)
+
+ // Verify data is not underreplicated.
+ verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
+
+ // Verify *other* collections' data is not underreplicated.
+ verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
+}
+
+func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
+ defer TearDownDataManagerTest(t)
+ SetupDataManagerTest(t)
+
+ for i := 0; i < 10; i++ {
+ err := singlerun(arv)
+ if err != nil {
+ t.Fatalf("Got an error during datamanager singlerun: %v", err)
+ }
+ }
+}
+
+func TestGetStatusRepeatedly(t *testing.T) {
+ defer TearDownDataManagerTest(t)
+ SetupDataManagerTest(t)
+
+ for i := 0; i < 10; i++ {
+ for j := 0; j < 2; j++ {
+ s := getStatus(t, keepServers[j]+"/status.json")
+
+ var pullQueueStatus interface{}
+ pullQueueStatus = s.(map[string]interface{})["PullQueue"]
+ var trashQueueStatus interface{}
+ trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
+
+ if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
+ pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
+ trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
+ trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
+ t.Fatalf("PullQueue and TrashQueue status not found")
+ }
+
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+}
+
+func TestRunDatamanagerWithBogusServer(t *testing.T) {
+ defer TearDownDataManagerTest(t)
+ SetupDataManagerTest(t)
+
+ arv.ApiServer = "bogus-server"
+
+ err := singlerun(arv)
+ if err == nil {
+ t.Fatalf("Expected error during singlerun with bogus server")
+ }
+}
+
+func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
+ defer TearDownDataManagerTest(t)
+ SetupDataManagerTest(t)
+
+ arv.ApiToken = ActiveUserToken
+
+ err := singlerun(arv)
+ if err == nil {
+ t.Fatalf("Expected error during singlerun as non-admin user")
+ }
+}
"bufio"
"encoding/json"
"errors"
- "flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
"net/http"
"strconv"
"strings"
- "sync"
"time"
)
+// ServerAddress struct
type ServerAddress struct {
SSL bool `json:service_ssl_flag`
Host string `json:"service_host"`
Port int `json:"service_port"`
- Uuid string `json:"uuid"`
+ UUID string `json:"uuid"`
}
-// Info about a particular block returned by the server
+// BlockInfo is info about a particular block returned by the server
type BlockInfo struct {
Digest blockdigest.DigestWithSize
Mtime int64 // TODO(misha): Replace this with a timestamp.
}
-// Info about a specified block given by a server
+// BlockServerInfo is info about a specified block given by a server
type BlockServerInfo struct {
ServerIndex int
Mtime int64 // TODO(misha): Replace this with a timestamp.
}
+// ServerContents struct
type ServerContents struct {
BlockDigestToInfo map[blockdigest.DigestWithSize]BlockInfo
}
+// ServerResponse struct
type ServerResponse struct {
Address ServerAddress
Contents ServerContents
}
+// ReadServers struct
type ReadServers struct {
ReadAllServers bool
KeepServerIndexToAddress []ServerAddress
BlockReplicationCounts map[int]int
}
+// GetKeepServersParams struct
type GetKeepServersParams struct {
Client arvadosclient.ArvadosClient
Logger *logger.Logger
Limit int
}
-type KeepServiceList struct {
+// ServiceList consists of the addresses of all the available kee servers
+type ServiceList struct {
ItemsAvailable int `json:"items_available"`
KeepServers []ServerAddress `json:"items"`
}
-var (
- // Don't access the token directly, use getDataManagerToken() to
- // make sure it's been read.
- dataManagerToken string
- dataManagerTokenFile string
- dataManagerTokenFileReadOnce sync.Once
-)
-
-func init() {
- flag.StringVar(&dataManagerTokenFile,
- "data-manager-token-file",
- "",
- "File with the API token we should use to contact keep servers.")
-}
-
+// String
// TODO(misha): Change this to include the UUID as well.
func (s ServerAddress) String() string {
return s.URL()
}
+// URL of the keep server
func (s ServerAddress) URL() string {
if s.SSL {
return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
- } else {
- return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
}
+ return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
}
-func GetDataManagerToken(arvLogger *logger.Logger) string {
- readDataManagerToken := func() {
- if dataManagerTokenFile == "" {
- flag.Usage()
- loggerutil.FatalWithMessage(arvLogger,
- "Data Manager Token needed, but data manager token file not specified.")
- } else {
- rawRead, err := ioutil.ReadFile(dataManagerTokenFile)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Unexpected error reading token file %s: %v",
- dataManagerTokenFile,
- err))
- }
- dataManagerToken = strings.TrimSpace(string(rawRead))
- }
- }
-
- dataManagerTokenFileReadOnce.Do(readDataManagerToken)
- return dataManagerToken
-}
-
+// GetKeepServersAndSummarize gets keep servers from api
func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) {
results = GetKeepServers(params)
log.Printf("Returned %d keep disks", len(results.ServerToContents))
return
}
+// GetKeepServers from api server
func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
- if ¶ms.Client == nil {
- log.Fatalf("params.Client passed to GetKeepServers() should " +
- "contain a valid ArvadosClient, but instead it is nil.")
- }
-
sdkParams := arvadosclient.Dict{
"filters": [][]string{[]string{"service_type", "=", "disk"}},
}
sdkParams["limit"] = params.Limit
}
- var sdkResponse KeepServiceList
+ var sdkResponse ServiceList
err := params.Client.List("keep_services", sdkParams, &sdkResponse)
if err != nil {
log.Printf("Got Server Addresses: %v", results)
- // This is safe for concurrent use
- client := http.Client{}
-
// Send off all the index requests concurrently
responseChan := make(chan ServerResponse)
for _, keepServer := range sdkResponse.KeepServers {
go func(keepServer ServerAddress) {
responseChan <- GetServerContents(params.Logger,
keepServer,
- client)
+ params.Client)
}(keepServer)
}
return
}
+// GetServerContents of the keep server
func GetServerContents(arvLogger *logger.Logger,
keepServer ServerAddress,
- client http.Client) (response ServerResponse) {
+ arv arvadosclient.ArvadosClient) (response ServerResponse) {
- GetServerStatus(arvLogger, keepServer, client)
+ GetServerStatus(arvLogger, keepServer, arv)
- req := CreateIndexRequest(arvLogger, keepServer)
- resp, err := client.Do(req)
+ req := CreateIndexRequest(arvLogger, keepServer, arv)
+ resp, err := arv.Client.Do(req)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
fmt.Sprintf("Error fetching %s: %v. Response was %+v",
return ReadServerResponse(arvLogger, keepServer, resp)
}
+// GetServerStatus get keep server status by invoking /status.json
func GetServerStatus(arvLogger *logger.Logger,
keepServer ServerAddress,
- client http.Client) {
+ arv arvadosclient.ArvadosClient) {
url := fmt.Sprintf("http://%s:%d/status.json",
keepServer.Host,
keepServer.Port)
serverInfo["host"] = keepServer.Host
serverInfo["port"] = keepServer.Port
- keepInfo[keepServer.Uuid] = serverInfo
+ keepInfo[keepServer.UUID] = serverInfo
})
}
- resp, err := client.Get(url)
+ resp, err := arv.Client.Get(url)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
fmt.Sprintf("Error getting keep status from %s: %v", url, err))
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+ serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
serverInfo["status_response_processed_at"] = now
serverInfo["status"] = keepStatus
})
}
}
+// CreateIndexRequest to the keep server
func CreateIndexRequest(arvLogger *logger.Logger,
- keepServer ServerAddress) (req *http.Request) {
+ keepServer ServerAddress,
+ arv arvadosclient.ArvadosClient) (req *http.Request) {
url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
log.Println("About to fetch keep server contents from " + url)
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+ serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
serverInfo["index_request_sent_at"] = now
})
}
fmt.Sprintf("Error building http request for %s: %v", url, err))
}
- req.Header.Add("Authorization",
- fmt.Sprintf("OAuth2 %s", GetDataManagerToken(arvLogger)))
+ req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken)
return
}
+// ReadServerResponse reads reasponse from keep server
func ReadServerResponse(arvLogger *logger.Logger,
keepServer ServerAddress,
resp *http.Response) (response ServerResponse) {
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+ serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
serverInfo["index_response_received_at"] = now
})
}
if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
// This server returned multiple lines containing the same block digest.
- numDuplicates += 1
+ numDuplicates++
// Keep the block that's newer.
if storedBlock.Mtime < blockInfo.Mtime {
response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+ serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
serverInfo["processing_finished_at"] = now
serverInfo["lines_received"] = numLines
return
}
+// Summarize results from keep server
func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
readServers.BlockReplicationCounts = make(map[int]int)
for _, infos := range readServers.BlockToServers {
replication := len(infos)
- readServers.BlockReplicationCounts[replication] += 1
+ readServers.BlockReplicationCounts[replication]++
}
if arvLogger != nil {
keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
})
}
-
}
+// TrashRequest struct
type TrashRequest struct {
Locator string `json:"locator"`
BlockMtime int64 `json:"block_mtime"`
}
+// TrashList is an array of TrashRequest objects
type TrashList []TrashRequest
-func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[string]TrashList) (errs []error) {
+// SendTrashLists to trash queue
+func SendTrashLists(kc *keepclient.KeepClient, spl map[string]TrashList) (errs []error) {
count := 0
barrier := make(chan error)
client := kc.Client
for url, v := range spl {
- count += 1
+ count++
log.Printf("Sending trash list to %v", url)
go (func(url string, v TrashList) {
return
}
- // Add api token header
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", dataManagerToken))
+ req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
// Make the request
var resp *http.Response
}
- for i := 0; i < count; i += 1 {
+ for i := 0; i < count; i++ {
b := <-barrier
if b != nil {
errs = append(errs, b)
request TrashList
}
-func (this *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+func (ts *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
r := json.NewDecoder(req.Body)
- r.Decode(&this.request)
+ r.Decode(&ts.request)
}
func (s *KeepSuite) TestSendTrashLists(c *C) {
type TestHandlerError struct {
}
-func (this *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+func (tse *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
http.Error(writer, "I'm a teapot", 418)
}
/* Ensures that we only have one copy of each unique string. This is
/* not designed for concurrent access. */
+
package summary
// This code should probably be moved somewhere more universal.
+// CanonicalString struct
type CanonicalString struct {
m map[string]string
}
+// Get a CanonicalString
func (cs *CanonicalString) Get(s string) (r string) {
if cs.m == nil {
cs.m = make(map[string]string)
readDataFrom string
)
+// DataFetcher to fetch data from keep servers
type DataFetcher func(arvLogger *logger.Logger,
readCollections *collection.ReadCollections,
keepServerInfo *keep.ReadServers)
"Avoid network i/o and read summary data from this file instead. Used for development only.")
}
-// Writes data we've read to a file.
+// MaybeWriteData writes data we've read to a file.
//
// This is useful for development, so that we don't need to read all
// our data from the network every time we tweak something.
keepServerInfo keep.ReadServers) bool {
if writeDataTo == "" {
return false
- } else {
- summaryFile, err := os.Create(writeDataTo)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to open %s: %v", writeDataTo, err))
- }
- defer summaryFile.Close()
+ }
+ summaryFile, err := os.Create(writeDataTo)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to open %s: %v", writeDataTo, err))
+ }
+ defer summaryFile.Close()
- enc := gob.NewEncoder(summaryFile)
- data := serializedData{
- ReadCollections: readCollections,
- KeepServerInfo: keepServerInfo}
- err = enc.Encode(data)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to write summary data: %v", err))
- }
- log.Printf("Wrote summary data to: %s", writeDataTo)
- return true
+ enc := gob.NewEncoder(summaryFile)
+ data := serializedData{
+ ReadCollections: readCollections,
+ KeepServerInfo: keepServerInfo}
+ err = enc.Encode(data)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to write summary data: %v", err))
}
+ log.Printf("Wrote summary data to: %s", writeDataTo)
+ return true
}
+// ShouldReadData should not be used outside of development
func ShouldReadData() bool {
return readDataFrom != ""
}
-// Reads data that we've written to a file.
+// ReadData reads data that we've written to a file.
//
// This is useful for development, so that we don't need to read all
// our data from the network every time we tweak something.
// Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
+
package summary
import (
"strings"
)
+// Locator is a block digest
type Locator blockdigest.DigestWithSize
+// MarshalJSON encoding
func (l Locator) MarshalJSON() ([]byte, error) {
return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
}
-// One entry in the Pull List
+// PullRequest represents one entry in the Pull List
type PullRequest struct {
Locator Locator `json:"locator"`
Servers []string `json:"servers"`
}
-// The Pull List for a particular server
+// PullList for a particular server
type PullList []PullRequest
// PullListByLocator implements sort.Interface for PullList based on
return false
}
+// PullServers struct
// For a given under-replicated block, this structure represents which
// servers should pull the specified block and which servers they can
// pull it from.
From []string // Servers that already contain the specified block
}
-// Creates a map from block locator to PullServers with one entry for
-// each under-replicated block.
+// ComputePullServers creates a map from block locator to PullServers
+// with one entry for each under-replicated block.
//
// This method ignores zero-replica blocks since there are no servers
// to pull them from, so callers should feel free to omit them, but
writableServers[cs.Get(url)] = struct{}{}
}
- for block, _ := range underReplicated {
+ for block := range underReplicated {
serversStoringBlock := keepServerInfo.BlockToServers[block]
numCopies := len(serversStoringBlock)
numCopiesMissing := blockToDesiredReplication[block] - numCopies
return m
}
-// Creates a pull list in which the To and From fields preserve the
-// ordering of sorted servers and the contents are all canonical
-// strings.
+// CreatePullServers creates a pull list in which the To and From
+// fields preserve the ordering of sorted servers and the contents
+// are all canonical strings.
func CreatePullServers(cs CanonicalString,
serverHasBlock map[string]struct{},
writableServers map[string]struct{},
return
}
-// Strips the protocol prefix from a url.
+// RemoveProtocolPrefix strips the protocol prefix from a url.
func RemoveProtocolPrefix(url string) string {
return url[(strings.LastIndex(url, "/") + 1):]
}
-// Produces a PullList for each keep server.
+// BuildPullLists produces a PullList for each keep server.
func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
spl = map[string]PullList{}
// We don't worry about canonicalizing our strings here, because we
return
}
-// Writes each pull list to a file.
+// WritePullLists writes each pull list to a file.
// The filename is based on the hostname.
//
// This is just a hack for prototyping, it is not expected to be used
// Summarizes Collection Data and Keep Server Contents.
+
package summary
// TODO(misha): Check size of blocks as well as their digest.
"sort"
)
+// BlockSet is a map of blocks
type BlockSet map[blockdigest.DigestWithSize]struct{}
-// Adds a single block to the set.
+// Insert adds a single block to the set.
func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) {
bs[digest] = struct{}{}
}
-// Adds a set of blocks to the set.
+// Union adds a set of blocks to the set.
func (bs BlockSet) Union(obs BlockSet) {
for k, v := range obs {
bs[k] = v
}
}
-// We use the collection index to save space. To convert to and from
+// CollectionIndexSet is used to save space. To convert to and from
// the uuid, use collection.ReadCollections' fields
-// CollectionIndexToUuid and CollectionUuidToIndex.
+// CollectionIndexToUUID and CollectionUUIDToIndex.
type CollectionIndexSet map[int]struct{}
-// Adds a single collection to the set. The collection is specified by
+// Insert adds a single collection to the set. The collection is specified by
// its index.
func (cis CollectionIndexSet) Insert(collectionIndex int) {
cis[collectionIndex] = struct{}{}
}
+// ToCollectionIndexSet gets block to collection indices
func (bs BlockSet) ToCollectionIndexSet(
readCollections collection.ReadCollections,
collectionIndexSet *CollectionIndexSet) {
}
}
+// ReplicationLevels struct
// Keeps track of the requested and actual replication levels.
// Currently this is only used for blocks but could easily be used for
// collections as well.
Actual int
}
-// Maps from replication levels to their blocks.
+// ReplicationLevelBlockSetMap maps from replication levels to their blocks.
type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
-// An individual entry from ReplicationLevelBlockSetMap which only reports the number of blocks, not which blocks.
+// ReplicationLevelBlockCount is an individual entry from ReplicationLevelBlockSetMap
+// which only reports the number of blocks, not which blocks.
type ReplicationLevelBlockCount struct {
Levels ReplicationLevels
Count int
}
-// An ordered list of ReplicationLevelBlockCount useful for reporting.
+// ReplicationLevelBlockSetSlice is an ordered list of ReplicationLevelBlockCount useful for reporting.
type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
+// ReplicationSummary sturct
type ReplicationSummary struct {
CollectionBlocksNotInKeep BlockSet
UnderReplicatedBlocks BlockSet
CorrectlyReplicatedCollections CollectionIndexSet
}
-// This struct counts the elements in each set in ReplicationSummary.
+// ReplicationSummaryCounts struct counts the elements in each set in ReplicationSummary.
type ReplicationSummaryCounts struct {
CollectionBlocksNotInKeep int
UnderReplicatedBlocks int
CorrectlyReplicatedCollections int
}
-// Gets the BlockSet for a given set of ReplicationLevels, creating it
-// if it doesn't already exist.
+// GetOrCreate gets the BlockSet for a given set of ReplicationLevels,
+// creating it if it doesn't already exist.
func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
repLevels ReplicationLevels) (bs BlockSet) {
bs, exists := rlbs[repLevels]
return
}
-// Adds a block to the set for a given replication level.
+// Insert adds a block to the set for a given replication level.
func (rlbs ReplicationLevelBlockSetMap) Insert(
repLevels ReplicationLevels,
block blockdigest.DigestWithSize) {
rlbs.GetOrCreate(repLevels).Insert(block)
}
-// Adds a set of blocks to the set for a given replication level.
+// Union adds a set of blocks to the set for a given replication level.
func (rlbs ReplicationLevelBlockSetMap) Union(
repLevels ReplicationLevels,
bs BlockSet) {
rlbs.GetOrCreate(repLevels).Union(bs)
}
-// Outputs a sorted list of ReplicationLevelBlockCounts.
+// Counts outputs a sorted list of ReplicationLevelBlockCounts.
func (rlbs ReplicationLevelBlockSetMap) Counts() (
sorted ReplicationLevelBlockSetSlice) {
sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
}
+// ComputeCounts returns ReplicationSummaryCounts
func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
// TODO(misha): Consider rewriting this method to iterate through
// the fields using reflection, instead of explictily listing the
return rsc
}
+// PrettyPrint ReplicationSummaryCounts
func (rsc ReplicationSummaryCounts) PrettyPrint() string {
return fmt.Sprintf("Replication Block Counts:"+
"\n Missing From Keep: %d, "+
rsc.CorrectlyReplicatedCollections)
}
+// BucketReplication returns ReplicationLevelBlockSetMap
func BucketReplication(readCollections collection.ReadCollections,
- keepServerInfo keep.ReadServers) (rlbsm ReplicationLevelBlockSetMap) {
- rlbsm = make(ReplicationLevelBlockSetMap)
+ keepServerInfo keep.ReadServers) (rlbs ReplicationLevelBlockSetMap) {
+ rlbs = make(ReplicationLevelBlockSetMap)
for block, requestedReplication := range readCollections.BlockToDesiredReplication {
- rlbsm.Insert(
+ rlbs.Insert(
ReplicationLevels{
Requested: requestedReplication,
Actual: len(keepServerInfo.BlockToServers[block])},
for block, servers := range keepServerInfo.BlockToServers {
if 0 == readCollections.BlockToDesiredReplication[block] {
- rlbsm.Insert(
+ rlbs.Insert(
ReplicationLevels{Requested: 0, Actual: len(servers)},
block)
}
return
}
-func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets(
+// SummarizeBuckets reads collections and summarizes
+func (rlbs ReplicationLevelBlockSetMap) SummarizeBuckets(
readCollections collection.ReadCollections) (
rs ReplicationSummary) {
rs.CollectionBlocksNotInKeep = make(BlockSet)
rs.OverReplicatedCollections = make(CollectionIndexSet)
rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
- for levels, bs := range rlbsm {
+ for levels, bs := range rlbs {
if levels.Actual == 0 {
rs.CollectionBlocksNotInKeep.Union(bs)
} else if levels.Requested == 0 {
rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections,
&rs.OverReplicatedCollections)
- for i := range readCollections.CollectionIndexToUuid {
+ for i := range readCollections.CollectionIndexToUUID {
if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep {
} else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated {
} else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated {
returnedSummary := SummarizeReplication(rc, keepInfo)
if !reflect.DeepEqual(returnedSummary, expectedSummary) {
- t.Fatalf("Expected returnedSummary to look like: \n%+v but instead it is: \n%+v. Index to UUID is %v. BlockToCollectionIndices is %v.", expectedSummary, returnedSummary, rc.CollectionIndexToUuid, rc.BlockToCollectionIndices)
+ t.Fatalf("Expected returnedSummary to look like: \n%+v but instead it is: \n%+v. Index to UUID is %v. BlockToCollectionIndices is %v.", expectedSummary, returnedSummary, rc.CollectionIndexToUUID, rc.BlockToCollectionIndices)
}
}
// Code for generating trash lists
+
package summary
import (
"time"
)
+// BuildTrashLists builds list of blocks to be sent to trash queue
func BuildTrashLists(kc *keepclient.KeepClient,
keepServerInfo *keep.ReadServers,
keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) {
m = make(map[string]keep.TrashList)
for block := range keepBlocksNotInCollections {
- for _, block_on_server := range keepServerInfo.BlockToServers[block] {
- if block_on_server.Mtime >= expiry {
+ for _, blockOnServer := range keepServerInfo.BlockToServers[block] {
+ if blockOnServer.Mtime >= expiry {
continue
}
// block is older than expire cutoff
- srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
+ srv := keepServerInfo.KeepServerIndexToAddress[blockOnServer.ServerIndex].String()
if _, writable := writableServers[srv]; !writable {
continue
}
- m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
+ m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: blockOnServer.Mtime})
}
}
return
keep.BlockServerInfo{1, 101}}}}
// only block0 is in delete set
- var bs BlockSet = make(BlockSet)
+ var bs = make(BlockSet)
bs[block0] = struct{}{}
// Test trash list where only sv0 is on writable list.
// in order to permit writes.
const MinFreeKilobytes = BlockSize / 1024
+// Until #6221 is resolved, never_delete must be true.
+// However, allow it to be false in testing with TEST_DATA_MANAGER_TOKEN
+const TEST_DATA_MANAGER_TOKEN = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
+
// ProcMounts /proc/mounts
var ProcMounts = "/proc/mounts"
flag.Parse()
- if neverDelete != true {
- log.Fatal("neverDelete must be true, see #6221")
- }
-
if maxBuffers < 0 {
log.Fatal("-max-buffers must be greater than zero.")
}
log.Fatalf("reading data manager token: %s\n", err)
}
}
+
+ if neverDelete != true && dataManagerToken != TEST_DATA_MANAGER_TOKEN {
+ log.Fatal("never_delete must be true, see #6221")
+ }
+
if blobSigningKeyFile != "" {
if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
PermissionSecret = bytes.TrimSpace(buf)