Merge branch 'master' into 6260-test-datamanager
authorradhika <radhika@curoverse.com>
Tue, 15 Sep 2015 16:24:30 +0000 (12:24 -0400)
committerradhika <radhika@curoverse.com>
Tue, 15 Sep 2015 16:24:30 +0000 (12:24 -0400)
Conflicts:
services/keepstore/keepstore.go

18 files changed:
sdk/go/keepclient/keepclient_test.go
sdk/python/tests/run_test_server.py
sdk/python/tests/test_collections.py
services/datamanager/collection/collection.go
services/datamanager/collection/collection_test.go
services/datamanager/collection/testing.go
services/datamanager/datamanager.go
services/datamanager/datamanager_test.go [new file with mode: 0644]
services/datamanager/keep/keep.go
services/datamanager/keep/keep_test.go
services/datamanager/summary/canonical_string.go
services/datamanager/summary/file.go
services/datamanager/summary/pull_list.go
services/datamanager/summary/summary.go
services/datamanager/summary/summary_test.go
services/datamanager/summary/trash_list.go
services/datamanager/summary/trash_list_test.go
services/keepstore/keepstore.go

index c1f6a3e6f9a2614fc362985be67c86aeff355624..e4e459e83a51c408141c7127a098f69e955a005e 100644 (file)
@@ -743,7 +743,7 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
        }
        {
                hash2, replicas, err := kc.PutB(content)
-               c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
+               c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
                c.Check(replicas, Equals, 2)
                c.Check(err, Equals, nil)
        }
index 1c5162b97d87e476b9ff3badc82ffce3afdd4d28..5d0c42ad2109e2d605f5ab45fea5bd64fc26b1e8 100644 (file)
@@ -310,8 +310,9 @@ def _start_keep(n, keep_args):
     for arg, val in keep_args.iteritems():
         keep_cmd.append("{}={}".format(arg, val))
 
+    logf = open(os.path.join(TEST_TMPDIR, 'keep{}.log'.format(n)), 'a+')
     kp0 = subprocess.Popen(
-        keep_cmd, stdin=open('/dev/null'), stdout=sys.stderr)
+        keep_cmd, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
     with open(_pidfile('keep{}'.format(n)), 'w') as f:
         f.write(str(kp0.pid))
 
@@ -326,12 +327,17 @@ def run_keep(blob_signing_key=None, enforce_permissions=False):
     stop_keep()
 
     keep_args = {}
-    if blob_signing_key:
-        with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
-            keep_args['--permission-key-file'] = f.name
-            f.write(blob_signing_key)
+    if not blob_signing_key:
+        blob_signing_key = 'zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc'
+    with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
+        keep_args['-blob-signing-key-file'] = f.name
+        f.write(blob_signing_key)
     if enforce_permissions:
-        keep_args['--enforce-permissions'] = 'true'
+        keep_args['-enforce-permissions'] = 'true'
+    with open(os.path.join(TEST_TMPDIR, "keep.data-manager-token-file"), "w") as f:
+        keep_args['-data-manager-token-file'] = f.name
+        f.write(os.environ['ARVADOS_API_TOKEN'])
+    keep_args['-never-delete'] = 'false'
 
     api = arvados.api(
         version='v1',
index 13fc88def303c28d4161e3e4e3d080b9cb17cce6..ac7dd1b9f678ab6391ad71b13201374d127aaba3 100644 (file)
@@ -1144,7 +1144,7 @@ class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
         c2.save()
 
         c1.update()
-        self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3 7ac66c0f148de9519b8bd264312c4d64\+7\+A[a-f0-9]{40}@[a-f0-9]{8} 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+        self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
 
 
 if __name__ == '__main__':
index 5519ad8670ec93611740268d34f845e5c104fffe..ca03627405e0742ad419b1d1dd6daf64fba7a341 100644 (file)
@@ -24,38 +24,43 @@ var (
        maxManifestSize   uint64
 )
 
+// Collection representation
 type Collection struct {
-       Uuid              string
-       OwnerUuid         string
+       UUID              string
+       OwnerUUID         string
        ReplicationLevel  int
        BlockDigestToSize map[blockdigest.BlockDigest]int
        TotalSize         int
 }
 
+// ReadCollections holds information about collections from API server
 type ReadCollections struct {
        ReadAllCollections        bool
-       UuidToCollection          map[string]Collection
+       UUIDToCollection          map[string]Collection
        OwnerToCollectionSize     map[string]int
        BlockToDesiredReplication map[blockdigest.DigestWithSize]int
-       CollectionUuidToIndex     map[string]int
-       CollectionIndexToUuid     []string
+       CollectionUUIDToIndex     map[string]int
+       CollectionIndexToUUID     []string
        BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
 }
 
+// GetCollectionsParams params
 type GetCollectionsParams struct {
        Client    arvadosclient.ArvadosClient
        Logger    *logger.Logger
        BatchSize int
 }
 
+// SdkCollectionInfo holds collection info from api
 type SdkCollectionInfo struct {
-       Uuid         string    `json:"uuid"`
-       OwnerUuid    string    `json:"owner_uuid"`
+       UUID         string    `json:"uuid"`
+       OwnerUUID    string    `json:"owner_uuid"`
        Redundancy   int       `json:"redundancy"`
        ModifiedAt   time.Time `json:"modified_at"`
        ManifestText string    `json:"manifest_text"`
 }
 
+// SdkCollectionList lists collections from api
 type SdkCollectionList struct {
        ItemsAvailable int                 `json:"items_available"`
        Items          []SdkCollectionInfo `json:"items"`
@@ -68,7 +73,7 @@ func init() {
                "File to write the heap profiles to. Leave blank to skip profiling.")
 }
 
-// Write the heap profile to a file for later review.
+// WriteHeapProfile writes the heap profile to a file for later review.
 // Since a file is expected to only contain a single heap profile this
 // function overwrites the previously written profile, so it is safe
 // to call multiple times in a single run.
@@ -77,27 +82,28 @@ func init() {
 func WriteHeapProfile() {
        if heapProfileFilename != "" {
 
-               heap_profile, err := os.Create(heapProfileFilename)
+               heapProfile, err := os.Create(heapProfileFilename)
                if err != nil {
                        log.Fatal(err)
                }
 
-               defer heap_profile.Close()
+               defer heapProfile.Close()
 
-               err = pprof.WriteHeapProfile(heap_profile)
+               err = pprof.WriteHeapProfile(heapProfile)
                if err != nil {
                        log.Fatal(err)
                }
        }
 }
 
+// GetCollectionsAndSummarize gets collections from api and summarizes
 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
        results = GetCollections(params)
        results.Summarize(params.Logger)
 
        log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
        log.Printf("Read and processed %d collections",
-               len(results.UuidToCollection))
+               len(results.UUIDToCollection))
 
        // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
        // lots of behaviors can become warnings (and obviously we can't
@@ -109,6 +115,7 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec
        return
 }
 
+// GetCollections gets collections from api
 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        if &params.Client == nil {
                log.Fatalf("params.Client passed to GetCollections() should " +
@@ -157,7 +164,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        // that we don't have to grow the map in most cases.
        maxExpectedCollections := int(
                float64(initialNumberOfCollectionsAvailable) * 1.01)
-       results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
+       results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
 
        if params.Logger != nil {
                params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
@@ -191,11 +198,11 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                        ProcessCollections(params.Logger,
                                collections.Items,
                                defaultReplicationLevel,
-                               results.UuidToCollection).Format(time.RFC3339)
+                               results.UUIDToCollection).Format(time.RFC3339)
 
                // update counts
                previousTotalCollections = totalCollections
-               totalCollections = len(results.UuidToCollection)
+               totalCollections = len(results.UUIDToCollection)
 
                log.Printf("%d collections read, %d new in last batch, "+
                        "%s latest modified date, %.0f %d %d avg,max,total manifest size",
@@ -229,13 +236,14 @@ func StrCopy(s string) string {
        return string([]byte(s))
 }
 
+// ProcessCollections read from api server
 func ProcessCollections(arvLogger *logger.Logger,
        receivedCollections []SdkCollectionInfo,
        defaultReplicationLevel int,
-       uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
+       UUIDToCollection map[string]Collection) (latestModificationDate time.Time) {
        for _, sdkCollection := range receivedCollections {
-               collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
-                       OwnerUuid:         StrCopy(sdkCollection.OwnerUuid),
+               collection := Collection{UUID: StrCopy(sdkCollection.UUID),
+                       OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
                        ReplicationLevel:  sdkCollection.Redundancy,
                        BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
 
@@ -260,7 +268,7 @@ func ProcessCollections(arvLogger *logger.Logger,
                manifest := manifest.Manifest{sdkCollection.ManifestText}
                manifestSize := uint64(len(sdkCollection.ManifestText))
 
-               if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
+               if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
                        totalManifestSize += manifestSize
                }
                if manifestSize > maxManifestSize {
@@ -269,11 +277,11 @@ func ProcessCollections(arvLogger *logger.Logger,
 
                blockChannel := manifest.BlockIterWithDuplicates()
                for block := range blockChannel {
-                       if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
+                       if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
                                message := fmt.Sprintf(
                                        "Collection %s contains multiple sizes (%d and %d) for block %s",
-                                       collection.Uuid,
-                                       stored_size,
+                                       collection.UUID,
+                                       storedSize,
                                        block.Size,
                                        block.Digest)
                                loggerutil.FatalWithMessage(arvLogger, message)
@@ -284,7 +292,7 @@ func ProcessCollections(arvLogger *logger.Logger,
                for _, size := range collection.BlockDigestToSize {
                        collection.TotalSize += size
                }
-               uuidToCollection[collection.Uuid] = collection
+               UUIDToCollection[collection.UUID] = collection
 
                // Clear out all the manifest strings that we don't need anymore.
                // These hopefully form the bulk of our memory usage.
@@ -295,22 +303,23 @@ func ProcessCollections(arvLogger *logger.Logger,
        return
 }
 
+// Summarize the collections read
 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
        readCollections.OwnerToCollectionSize = make(map[string]int)
        readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
-       numCollections := len(readCollections.UuidToCollection)
-       readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
-       readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
+       numCollections := len(readCollections.UUIDToCollection)
+       readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
+       readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
        readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
 
-       for _, coll := range readCollections.UuidToCollection {
-               collectionIndex := len(readCollections.CollectionIndexToUuid)
-               readCollections.CollectionIndexToUuid =
-                       append(readCollections.CollectionIndexToUuid, coll.Uuid)
-               readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
+       for _, coll := range readCollections.UUIDToCollection {
+               collectionIndex := len(readCollections.CollectionIndexToUUID)
+               readCollections.CollectionIndexToUUID =
+                       append(readCollections.CollectionIndexToUUID, coll.UUID)
+               readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
 
-               readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
-                       readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+               readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
+                       readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
 
                for block, size := range coll.BlockDigestToSize {
                        locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
index 1669bb778498a1b912c8cd68ebc36042e14405d4..07c82e1abc0b5581c523683b4947e7d094f7c9cd 100644 (file)
@@ -16,7 +16,7 @@ type MySuite struct{}
 var _ = Suite(&MySuite{})
 
 // This captures the result we expect from
-// ReadCollections.Summarize().  Because CollectionUuidToIndex is
+// ReadCollections.Summarize().  Because CollectionUUIDToIndex is
 // indeterminate, we replace BlockToCollectionIndices with
 // BlockToCollectionUuids.
 type ExpectedSummary struct {
@@ -41,7 +41,7 @@ func CompareSummarizedReadCollections(c *C,
                uuidSet := make(map[string]struct{})
                summarizedBlockToCollectionUuids[digest] = uuidSet
                for _, index := range indices {
-                       uuidSet[summarized.CollectionIndexToUuid[index]] = struct{}{}
+                       uuidSet[summarized.CollectionIndexToUUID[index]] = struct{}{}
                }
        }
 
@@ -67,15 +67,15 @@ func (s *MySuite) TestSummarizeSimple(checker *C) {
 
        rc.Summarize(nil)
 
-       c := rc.UuidToCollection["col0"]
+       c := rc.UUIDToCollection["col0"]
 
        blockDigest1 := blockdigest.MakeTestDigestWithSize(1)
        blockDigest2 := blockdigest.MakeTestDigestWithSize(2)
 
        expected := ExpectedSummary{
-               OwnerToCollectionSize:     map[string]int{c.OwnerUuid: c.TotalSize},
+               OwnerToCollectionSize:     map[string]int{c.OwnerUUID: c.TotalSize},
                BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{blockDigest1: 5, blockDigest2: 5},
-               BlockToCollectionUuids:    map[blockdigest.DigestWithSize][]string{blockDigest1: []string{c.Uuid}, blockDigest2: []string{c.Uuid}},
+               BlockToCollectionUuids:    map[blockdigest.DigestWithSize][]string{blockDigest1: []string{c.UUID}, blockDigest2: []string{c.UUID}},
        }
 
        CompareSummarizedReadCollections(checker, rc, expected)
@@ -95,8 +95,8 @@ func (s *MySuite) TestSummarizeOverlapping(checker *C) {
 
        rc.Summarize(nil)
 
-       c0 := rc.UuidToCollection["col0"]
-       c1 := rc.UuidToCollection["col1"]
+       c0 := rc.UUIDToCollection["col0"]
+       c1 := rc.UUIDToCollection["col1"]
 
        blockDigest1 := blockdigest.MakeTestDigestWithSize(1)
        blockDigest2 := blockdigest.MakeTestDigestWithSize(2)
@@ -104,8 +104,8 @@ func (s *MySuite) TestSummarizeOverlapping(checker *C) {
 
        expected := ExpectedSummary{
                OwnerToCollectionSize: map[string]int{
-                       c0.OwnerUuid: c0.TotalSize,
-                       c1.OwnerUuid: c1.TotalSize,
+                       c0.OwnerUUID: c0.TotalSize,
+                       c1.OwnerUUID: c1.TotalSize,
                },
                BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{
                        blockDigest1: 5,
@@ -113,9 +113,9 @@ func (s *MySuite) TestSummarizeOverlapping(checker *C) {
                        blockDigest3: 8,
                },
                BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{
-                       blockDigest1: []string{c0.Uuid},
-                       blockDigest2: []string{c0.Uuid, c1.Uuid},
-                       blockDigest3: []string{c1.Uuid},
+                       blockDigest1: []string{c0.UUID},
+                       blockDigest2: []string{c0.UUID, c1.UUID},
+                       blockDigest3: []string{c1.UUID},
                },
        }
 
index f3c1f47664a039e6b6771c620c41cef3272a8c44..223843372290f91cd58655f44eb1e404afe2127d 100644 (file)
@@ -7,6 +7,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
 )
 
+// TestCollectionSpec with test blocks and desired replication level
 type TestCollectionSpec struct {
        // The desired replication level
        ReplicationLevel int
@@ -15,23 +16,23 @@ type TestCollectionSpec struct {
        Blocks []int
 }
 
-// Creates a ReadCollections object for testing based on the give
-// specs.  Only the ReadAllCollections and UuidToCollection fields are
-// populated.  To populate other fields call rc.Summarize().
+// MakeTestReadCollections creates a ReadCollections object for testing
+// based on the give specs. Only the ReadAllCollections and UUIDToCollection
+// fields are populated. To populate other fields call rc.Summarize().
 func MakeTestReadCollections(specs []TestCollectionSpec) (rc ReadCollections) {
        rc = ReadCollections{
                ReadAllCollections: true,
-               UuidToCollection:   map[string]Collection{},
+               UUIDToCollection:   map[string]Collection{},
        }
 
        for i, spec := range specs {
                c := Collection{
-                       Uuid:              fmt.Sprintf("col%d", i),
-                       OwnerUuid:         fmt.Sprintf("owner%d", i),
+                       UUID:              fmt.Sprintf("col%d", i),
+                       OwnerUUID:         fmt.Sprintf("owner%d", i),
                        ReplicationLevel:  spec.ReplicationLevel,
                        BlockDigestToSize: map[blockdigest.BlockDigest]int{},
                }
-               rc.UuidToCollection[c.Uuid] = c
+               rc.UUIDToCollection[c.UUID] = c
                for _, j := range spec.Blocks {
                        c.BlockDigestToSize[blockdigest.MakeTestBlockDigest(j)] = j
                }
@@ -45,16 +46,16 @@ func MakeTestReadCollections(specs []TestCollectionSpec) (rc ReadCollections) {
        return
 }
 
-// Returns a slice giving the collection index of each collection that
-// was passed in to MakeTestReadCollections. rc.Summarize() must be
-// called before this method, since Summarize() assigns an index to
-// each collection.
+// CollectionIndicesForTesting returns a slice giving the collection
+// index of each collection that was passed in to MakeTestReadCollections.
+// rc.Summarize() must be called before this method, since Summarize()
+// assigns an index to each collection.
 func (rc ReadCollections) CollectionIndicesForTesting() (indices []int) {
        // TODO(misha): Assert that rc.Summarize() has been called.
-       numCollections := len(rc.CollectionIndexToUuid)
+       numCollections := len(rc.CollectionIndexToUUID)
        indices = make([]int, numCollections)
        for i := 0; i < numCollections; i++ {
-               indices[i] = rc.CollectionUuidToIndex[fmt.Sprintf("col%d", i)]
+               indices[i] = rc.CollectionUUIDToIndex[fmt.Sprintf("col%d", i)]
        }
        return
 }
index 70a9ae785956396bab936e73b1a7f6ed04c63731..a9306ce83a6011002cef96b86eb6caf700feda23 100644 (file)
@@ -3,6 +3,7 @@
 package main
 
 import (
+       "errors"
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -41,17 +42,17 @@ func init() {
 func main() {
        flag.Parse()
        if minutesBetweenRuns == 0 {
-               err := singlerun()
+               err := singlerun(makeArvadosClient())
                if err != nil {
-                       log.Fatalf("Got an error: %v", err)
+                       log.Fatalf("singlerun: %v", err)
                }
        } else {
                waitTime := time.Minute * time.Duration(minutesBetweenRuns)
                for {
                        log.Println("Beginning Run")
-                       err := singlerun()
+                       err := singlerun(makeArvadosClient())
                        if err != nil {
-                               log.Printf("Got an error: %v", err)
+                               log.Printf("singlerun: %v", err)
                        }
                        log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
                        time.Sleep(waitTime)
@@ -59,16 +60,20 @@ func main() {
        }
 }
 
-func singlerun() error {
+func makeArvadosClient() arvadosclient.ArvadosClient {
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               log.Fatalf("Error setting up arvados client %s", err.Error())
+               log.Fatalf("Error setting up arvados client: %s", err)
        }
+       return arv
+}
 
-       if is_admin, err := util.UserIsAdmin(arv); err != nil {
-               log.Fatalf("Error querying current arvados user %s", err.Error())
-       } else if !is_admin {
-               log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
+func singlerun(arv arvadosclient.ArvadosClient) error {
+       var err error
+       if isAdmin, err := util.UserIsAdmin(arv); err != nil {
+               return errors.New("Error verifying admin token: " + err.Error())
+       } else if !isAdmin {
+               return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
        }
 
        var arvLogger *logger.Logger
@@ -153,14 +158,13 @@ func singlerun() error {
 
        if trashErr != nil {
                return err
-       } else {
-               keep.SendTrashLists(keep.GetDataManagerToken(arvLogger), kc, trashLists)
        }
+       keep.SendTrashLists(kc, trashLists)
 
        return nil
 }
 
-// Returns a data fetcher that fetches data from remote servers.
+// BuildDataFetcher returns a data fetcher that fetches data from remote servers.
 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
        return func(arvLogger *logger.Logger,
                readCollections *collection.ReadCollections,
diff --git a/services/datamanager/datamanager_test.go b/services/datamanager/datamanager_test.go
new file mode 100644 (file)
index 0000000..3d9bb3d
--- /dev/null
@@ -0,0 +1,513 @@
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "io/ioutil"
+       "net/http"
+       "os"
+       "os/exec"
+       "regexp"
+       "strings"
+       "testing"
+       "time"
+)
+
+const (
+       ActiveUserToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+       AdminToken      = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
+)
+
+var arv arvadosclient.ArvadosClient
+var keepClient *keepclient.KeepClient
+var keepServers []string
+
+func SetupDataManagerTest(t *testing.T) {
+       os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+
+       // start api and keep servers
+       arvadostest.ResetEnv()
+       arvadostest.StartAPI()
+       arvadostest.StartKeep()
+
+       arv = makeArvadosClient()
+
+       // keep client
+       keepClient = &keepclient.KeepClient{
+               Arvados:       &arv,
+               Want_replicas: 2,
+               Using_proxy:   true,
+               Client:        &http.Client{},
+       }
+
+       // discover keep services
+       if err := keepClient.DiscoverKeepServers(); err != nil {
+               t.Fatalf("Error discovering keep services: %s", err)
+       }
+       keepServers = []string{}
+       for _, host := range keepClient.LocalRoots() {
+               keepServers = append(keepServers, host)
+       }
+}
+
+func TearDownDataManagerTest(t *testing.T) {
+       arvadostest.StopKeep()
+       arvadostest.StopAPI()
+}
+
+func putBlock(t *testing.T, data string) string {
+       locator, _, err := keepClient.PutB([]byte(data))
+       if err != nil {
+               t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
+       }
+       if locator == "" {
+               t.Fatalf("No locator found after putting test data")
+       }
+
+       splits := strings.Split(locator, "+")
+       return splits[0] + "+" + splits[1]
+}
+
+func getBlock(t *testing.T, locator string, data string) {
+       reader, blocklen, _, err := keepClient.Get(locator)
+       if err != nil {
+               t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
+       }
+       if reader == nil {
+               t.Fatalf("No reader found after putting test data")
+       }
+       if blocklen != int64(len(data)) {
+               t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
+       }
+
+       all, err := ioutil.ReadAll(reader)
+       if string(all) != data {
+               t.Fatalf("Data read %s did not match expected data %s", string(all), data)
+       }
+}
+
+// Create a collection using arv-put
+func createCollection(t *testing.T, data string) string {
+       tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
+       defer os.Remove(tempfile.Name())
+
+       _, err = tempfile.Write([]byte(data))
+       if err != nil {
+               t.Fatalf("Error writing to tempfile %v", err)
+       }
+
+       // arv-put
+       output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
+       if err != nil {
+               t.Fatalf("Error running arv-put %s", err)
+       }
+
+       uuid := string(output[0:27]) // trim terminating char
+       return uuid
+}
+
+// Get collection locator
+var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
+
+func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
+       manifest := getCollection(t, uuid)["manifest_text"].(string)
+
+       locator := strings.Split(manifest, " ")[1]
+       match := locatorMatcher.FindStringSubmatch(locator)
+       if match == nil {
+               t.Fatalf("No locator found in collection manifest %s", manifest)
+       }
+
+       return match[1] + "+" + match[2]
+}
+
+func getCollection(t *testing.T, uuid string) Dict {
+       getback := make(Dict)
+       err := arv.Get("collections", uuid, nil, &getback)
+       if err != nil {
+               t.Fatalf("Error getting collection %s", err)
+       }
+       if getback["uuid"] != uuid {
+               t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
+       }
+
+       return getback
+}
+
+func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
+       err := arv.Update("collections", uuid, arvadosclient.Dict{
+               "collection": arvadosclient.Dict{
+                       paramName: paramValue,
+               },
+       }, &arvadosclient.Dict{})
+
+       if err != nil {
+               t.Fatalf("Error updating collection %s", err)
+       }
+}
+
+type Dict map[string]interface{}
+
+func deleteCollection(t *testing.T, uuid string) {
+       getback := make(Dict)
+       err := arv.Delete("collections", uuid, nil, &getback)
+       if err != nil {
+               t.Fatalf("Error deleting collection %s", err)
+       }
+       if getback["uuid"] != uuid {
+               t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
+       }
+}
+
+func dataManagerSingleRun(t *testing.T) {
+       err := singlerun(arv)
+       if err != nil {
+               t.Fatalf("Error during singlerun %s", err)
+       }
+}
+
+func getBlockIndexesForServer(t *testing.T, i int) []string {
+       var indexes []string
+
+       path := keepServers[i] + "/index"
+       client := http.Client{}
+       req, err := http.NewRequest("GET", path, nil)
+       req.Header.Add("Authorization", "OAuth2 "+AdminToken)
+       req.Header.Add("Content-Type", "application/octet-stream")
+       resp, err := client.Do(req)
+       defer resp.Body.Close()
+
+       if err != nil {
+               t.Fatalf("Error during %s %s", path, err)
+       }
+
+       body, err := ioutil.ReadAll(resp.Body)
+       if err != nil {
+               t.Fatalf("Error reading response from %s %s", path, err)
+       }
+
+       lines := strings.Split(string(body), "\n")
+       for _, line := range lines {
+               indexes = append(indexes, strings.Split(line, " ")...)
+       }
+
+       return indexes
+}
+
+func getBlockIndexes(t *testing.T) [][]string {
+       var indexes [][]string
+
+       for i := 0; i < len(keepServers); i++ {
+               indexes = append(indexes, getBlockIndexesForServer(t, i))
+       }
+       return indexes
+}
+
+func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
+       blocks := getBlockIndexes(t)
+
+       for _, block := range notExpected {
+               for _, idx := range blocks {
+                       if valueInArray(block, idx) {
+                               t.Fatalf("Found unexpected block %s", block)
+                       }
+               }
+       }
+
+       for _, block := range expected {
+               nFound := 0
+               for _, idx := range blocks {
+                       if valueInArray(block, idx) {
+                               nFound++
+                       }
+               }
+               if nFound < minReplication {
+                       t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
+               }
+       }
+}
+
+func valueInArray(value string, list []string) bool {
+       for _, v := range list {
+               if value == v {
+                       return true
+               }
+       }
+       return false
+}
+
+/*
+Test env uses two keep volumes. The volume names can be found by reading the files
+  ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
+
+The keep volumes are of the dir structure:
+  volumeN/subdir/locator
+*/
+func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
+       // First get rid of any size hints in the locators
+       var trimmedBlockLocators []string
+       for _, block := range oldUnusedBlockLocators {
+               trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
+       }
+
+       // Get the working dir so that we can read keep{n}.volume files
+       wd, err := os.Getwd()
+       if err != nil {
+               t.Fatalf("Error getting working dir %s", err)
+       }
+
+       // Now cycle through the two keep volumes
+       oldTime := time.Now().AddDate(0, -2, 0)
+       for i := 0; i < 2; i++ {
+               filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
+               volumeDir, err := ioutil.ReadFile(filename)
+               if err != nil {
+                       t.Fatalf("Error reading keep volume file %s %s", filename, err)
+               }
+
+               // Read the keep volume dir structure
+               volumeContents, err := ioutil.ReadDir(string(volumeDir))
+               if err != nil {
+                       t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
+               }
+
+               // Read each subdir for each of the keep volume dir
+               for _, subdir := range volumeContents {
+                       subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
+                       subdirContents, err := ioutil.ReadDir(string(subdirName))
+                       if err != nil {
+                               t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
+                       }
+
+                       // Now we got to the files. The files are names are the block locators
+                       for _, fileInfo := range subdirContents {
+                               blockName := fileInfo.Name()
+                               myname := fmt.Sprintf("%s/%s", subdirName, blockName)
+                               if valueInArray(blockName, trimmedBlockLocators) {
+                                       err = os.Chtimes(myname, oldTime, oldTime)
+                               }
+                       }
+               }
+       }
+}
+
+func getStatus(t *testing.T, path string) interface{} {
+       client := http.Client{}
+       req, err := http.NewRequest("GET", path, nil)
+       req.Header.Add("Authorization", "OAuth2 "+AdminToken)
+       req.Header.Add("Content-Type", "application/octet-stream")
+       resp, err := client.Do(req)
+       if err != nil {
+               t.Fatalf("Error during %s %s", path, err)
+       }
+       defer resp.Body.Close()
+
+       var s interface{}
+       json.NewDecoder(resp.Body).Decode(&s)
+
+       return s
+}
+
+// Wait until PullQueue and TrashQueue are empty on all keepServers.
+func waitUntilQueuesFinishWork(t *testing.T) {
+       for _, ks := range keepServers {
+               for done := false; !done; {
+                       time.Sleep(100 * time.Millisecond)
+                       s := getStatus(t, ks+"/status.json")
+                       for _, qName := range []string{"PullQueue", "TrashQueue"} {
+                               qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
+                               if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
+                                       done = true
+                               }
+                       }
+               }
+       }
+}
+
+/*
+Create some blocks and backdate some of them.
+Also create some collections and delete some of them.
+Verify block indexes.
+*/
+func TestPutAndGetBlocks(t *testing.T) {
+       defer TearDownDataManagerTest(t)
+       SetupDataManagerTest(t)
+
+       // Put some blocks which will be backdated later on
+       // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
+       // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
+       var oldUnusedBlockLocators []string
+       oldUnusedBlockData := "this block will have older mtime"
+       for i := 0; i < 5; i++ {
+               oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
+       }
+       for i := 0; i < 5; i++ {
+               getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
+       }
+
+       // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
+       oldUsedBlockData := "this collection block will have older mtime"
+       oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
+       getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
+
+       // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
+       // Hence, even though unreferenced, these should not be deleted when datamanager runs.
+       var newBlockLocators []string
+       newBlockData := "this block is newer"
+       for i := 0; i < 5; i++ {
+               newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
+       }
+       for i := 0; i < 5; i++ {
+               getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
+       }
+
+       // Create a collection that would be deleted later on
+       toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
+       toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
+
+       // Create another collection that has the same data as the one of the old blocks
+       oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
+       oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
+       if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
+               t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
+       }
+
+       // Create another collection whose replication level will be changed
+       replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
+       replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
+
+       // Create two collections with same data; one will be deleted later on
+       dataForTwoCollections := "one of these collections will be deleted"
+       oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
+       oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
+       secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
+       secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
+       if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
+               t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
+       }
+
+       // Verify blocks before doing any backdating / deleting.
+       var expected []string
+       expected = append(expected, oldUnusedBlockLocators...)
+       expected = append(expected, newBlockLocators...)
+       expected = append(expected, toBeDeletedCollectionLocator)
+       expected = append(expected, replicationCollectionLocator)
+       expected = append(expected, oneOfTwoWithSameDataLocator)
+       expected = append(expected, secondOfTwoWithSameDataLocator)
+
+       verifyBlocks(t, nil, expected, 2)
+
+       // Run datamanager in singlerun mode
+       dataManagerSingleRun(t)
+       waitUntilQueuesFinishWork(t)
+
+       verifyBlocks(t, nil, expected, 2)
+
+       // Backdate the to-be old blocks and delete the collections
+       backdateBlocks(t, oldUnusedBlockLocators)
+       deleteCollection(t, toBeDeletedCollectionUUID)
+       deleteCollection(t, secondOfTwoWithSameDataUUID)
+
+       // Run data manager again
+       dataManagerSingleRun(t)
+       waitUntilQueuesFinishWork(t)
+
+       // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
+       expected = expected[:0]
+       expected = append(expected, oldUsedBlockLocator)
+       expected = append(expected, newBlockLocators...)
+       expected = append(expected, toBeDeletedCollectionLocator)
+       expected = append(expected, oneOfTwoWithSameDataLocator)
+       expected = append(expected, secondOfTwoWithSameDataLocator)
+
+       verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
+
+       // Reduce desired replication on replicationCollectionUUID
+       // collection, and verify that Data Manager does not reduce
+       // actual replication any further than that. (It might not
+       // reduce actual replication at all; that's OK for this test.)
+
+       // Reduce desired replication level.
+       updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
+       collection := getCollection(t, replicationCollectionUUID)
+       if collection["replication_desired"].(interface{}) != float64(1) {
+               t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
+       }
+
+       // Verify data is currently overreplicated.
+       verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
+
+       // Run data manager again
+       dataManagerSingleRun(t)
+       waitUntilQueuesFinishWork(t)
+
+       // Verify data is not underreplicated.
+       verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
+
+       // Verify *other* collections' data is not underreplicated.
+       verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
+}
+
+func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
+       defer TearDownDataManagerTest(t)
+       SetupDataManagerTest(t)
+
+       for i := 0; i < 10; i++ {
+               err := singlerun(arv)
+               if err != nil {
+                       t.Fatalf("Got an error during datamanager singlerun: %v", err)
+               }
+       }
+}
+
+func TestGetStatusRepeatedly(t *testing.T) {
+       defer TearDownDataManagerTest(t)
+       SetupDataManagerTest(t)
+
+       for i := 0; i < 10; i++ {
+               for j := 0; j < 2; j++ {
+                       s := getStatus(t, keepServers[j]+"/status.json")
+
+                       var pullQueueStatus interface{}
+                       pullQueueStatus = s.(map[string]interface{})["PullQueue"]
+                       var trashQueueStatus interface{}
+                       trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
+
+                       if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
+                               pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
+                               trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
+                               trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
+                               t.Fatalf("PullQueue and TrashQueue status not found")
+                       }
+
+                       time.Sleep(100 * time.Millisecond)
+               }
+       }
+}
+
+func TestRunDatamanagerWithBogusServer(t *testing.T) {
+       defer TearDownDataManagerTest(t)
+       SetupDataManagerTest(t)
+
+       arv.ApiServer = "bogus-server"
+
+       err := singlerun(arv)
+       if err == nil {
+               t.Fatalf("Expected error during singlerun with bogus server")
+       }
+}
+
+func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
+       defer TearDownDataManagerTest(t)
+       SetupDataManagerTest(t)
+
+       arv.ApiToken = ActiveUserToken
+
+       err := singlerun(arv)
+       if err == nil {
+               t.Fatalf("Expected error during singlerun as non-admin user")
+       }
+}
index 0e3cc1d44e79ef7b533a8cccef77a7c5cf6c6605..5b855dc61ea1e3df5ea45c2567bbbdb57776664c 100644 (file)
@@ -6,7 +6,6 @@ import (
        "bufio"
        "encoding/json"
        "errors"
-       "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
@@ -19,38 +18,41 @@ import (
        "net/http"
        "strconv"
        "strings"
-       "sync"
        "time"
 )
 
+// ServerAddress struct
 type ServerAddress struct {
        SSL  bool   `json:service_ssl_flag`
        Host string `json:"service_host"`
        Port int    `json:"service_port"`
-       Uuid string `json:"uuid"`
+       UUID string `json:"uuid"`
 }
 
-// Info about a particular block returned by the server
+// BlockInfo is info about a particular block returned by the server
 type BlockInfo struct {
        Digest blockdigest.DigestWithSize
        Mtime  int64 // TODO(misha): Replace this with a timestamp.
 }
 
-// Info about a specified block given by a server
+// BlockServerInfo is info about a specified block given by a server
 type BlockServerInfo struct {
        ServerIndex int
        Mtime       int64 // TODO(misha): Replace this with a timestamp.
 }
 
+// ServerContents struct
 type ServerContents struct {
        BlockDigestToInfo map[blockdigest.DigestWithSize]BlockInfo
 }
 
+// ServerResponse struct
 type ServerResponse struct {
        Address  ServerAddress
        Contents ServerContents
 }
 
+// ReadServers struct
 type ReadServers struct {
        ReadAllServers           bool
        KeepServerIndexToAddress []ServerAddress
@@ -60,67 +62,34 @@ type ReadServers struct {
        BlockReplicationCounts   map[int]int
 }
 
+// GetKeepServersParams struct
 type GetKeepServersParams struct {
        Client arvadosclient.ArvadosClient
        Logger *logger.Logger
        Limit  int
 }
 
-type KeepServiceList struct {
+// ServiceList consists of the addresses of all the available kee servers
+type ServiceList struct {
        ItemsAvailable int             `json:"items_available"`
        KeepServers    []ServerAddress `json:"items"`
 }
 
-var (
-       // Don't access the token directly, use getDataManagerToken() to
-       // make sure it's been read.
-       dataManagerToken             string
-       dataManagerTokenFile         string
-       dataManagerTokenFileReadOnce sync.Once
-)
-
-func init() {
-       flag.StringVar(&dataManagerTokenFile,
-               "data-manager-token-file",
-               "",
-               "File with the API token we should use to contact keep servers.")
-}
-
+// String
 // TODO(misha): Change this to include the UUID as well.
 func (s ServerAddress) String() string {
        return s.URL()
 }
 
+// URL of the keep server
 func (s ServerAddress) URL() string {
        if s.SSL {
                return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
-       } else {
-               return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
        }
+       return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
 }
 
-func GetDataManagerToken(arvLogger *logger.Logger) string {
-       readDataManagerToken := func() {
-               if dataManagerTokenFile == "" {
-                       flag.Usage()
-                       loggerutil.FatalWithMessage(arvLogger,
-                               "Data Manager Token needed, but data manager token file not specified.")
-               } else {
-                       rawRead, err := ioutil.ReadFile(dataManagerTokenFile)
-                       if err != nil {
-                               loggerutil.FatalWithMessage(arvLogger,
-                                       fmt.Sprintf("Unexpected error reading token file %s: %v",
-                                               dataManagerTokenFile,
-                                               err))
-                       }
-                       dataManagerToken = strings.TrimSpace(string(rawRead))
-               }
-       }
-
-       dataManagerTokenFileReadOnce.Do(readDataManagerToken)
-       return dataManagerToken
-}
-
+// GetKeepServersAndSummarize gets keep servers from api
 func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) {
        results = GetKeepServers(params)
        log.Printf("Returned %d keep disks", len(results.ServerToContents))
@@ -132,12 +101,8 @@ func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServer
        return
 }
 
+// GetKeepServers from api server
 func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
-       if &params.Client == nil {
-               log.Fatalf("params.Client passed to GetKeepServers() should " +
-                       "contain a valid ArvadosClient, but instead it is nil.")
-       }
-
        sdkParams := arvadosclient.Dict{
                "filters": [][]string{[]string{"service_type", "=", "disk"}},
        }
@@ -145,7 +110,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
                sdkParams["limit"] = params.Limit
        }
 
-       var sdkResponse KeepServiceList
+       var sdkResponse ServiceList
        err := params.Client.List("keep_services", sdkParams, &sdkResponse)
 
        if err != nil {
@@ -177,9 +142,6 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
 
        log.Printf("Got Server Addresses: %v", results)
 
-       // This is safe for concurrent use
-       client := http.Client{}
-
        // Send off all the index requests concurrently
        responseChan := make(chan ServerResponse)
        for _, keepServer := range sdkResponse.KeepServers {
@@ -192,7 +154,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
                go func(keepServer ServerAddress) {
                        responseChan <- GetServerContents(params.Logger,
                                keepServer,
-                               client)
+                               params.Client)
                }(keepServer)
        }
 
@@ -218,14 +180,15 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
        return
 }
 
+// GetServerContents of the keep server
 func GetServerContents(arvLogger *logger.Logger,
        keepServer ServerAddress,
-       client http.Client) (response ServerResponse) {
+       arv arvadosclient.ArvadosClient) (response ServerResponse) {
 
-       GetServerStatus(arvLogger, keepServer, client)
+       GetServerStatus(arvLogger, keepServer, arv)
 
-       req := CreateIndexRequest(arvLogger, keepServer)
-       resp, err := client.Do(req)
+       req := CreateIndexRequest(arvLogger, keepServer, arv)
+       resp, err := arv.Client.Do(req)
        if err != nil {
                loggerutil.FatalWithMessage(arvLogger,
                        fmt.Sprintf("Error fetching %s: %v. Response was %+v",
@@ -237,9 +200,10 @@ func GetServerContents(arvLogger *logger.Logger,
        return ReadServerResponse(arvLogger, keepServer, resp)
 }
 
+// GetServerStatus get keep server status by invoking /status.json
 func GetServerStatus(arvLogger *logger.Logger,
        keepServer ServerAddress,
-       client http.Client) {
+       arv arvadosclient.ArvadosClient) {
        url := fmt.Sprintf("http://%s:%d/status.json",
                keepServer.Host,
                keepServer.Port)
@@ -253,11 +217,11 @@ func GetServerStatus(arvLogger *logger.Logger,
                        serverInfo["host"] = keepServer.Host
                        serverInfo["port"] = keepServer.Port
 
-                       keepInfo[keepServer.Uuid] = serverInfo
+                       keepInfo[keepServer.UUID] = serverInfo
                })
        }
 
-       resp, err := client.Get(url)
+       resp, err := arv.Client.Get(url)
        if err != nil {
                loggerutil.FatalWithMessage(arvLogger,
                        fmt.Sprintf("Error getting keep status from %s: %v", url, err))
@@ -281,15 +245,17 @@ func GetServerStatus(arvLogger *logger.Logger,
                now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
                        serverInfo["status_response_processed_at"] = now
                        serverInfo["status"] = keepStatus
                })
        }
 }
 
+// CreateIndexRequest to the keep server
 func CreateIndexRequest(arvLogger *logger.Logger,
-       keepServer ServerAddress) (req *http.Request) {
+       keepServer ServerAddress,
+       arv arvadosclient.ArvadosClient) (req *http.Request) {
        url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
        log.Println("About to fetch keep server contents from " + url)
 
@@ -297,7 +263,7 @@ func CreateIndexRequest(arvLogger *logger.Logger,
                now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
                        serverInfo["index_request_sent_at"] = now
                })
        }
@@ -308,11 +274,11 @@ func CreateIndexRequest(arvLogger *logger.Logger,
                        fmt.Sprintf("Error building http request for %s: %v", url, err))
        }
 
-       req.Header.Add("Authorization",
-               fmt.Sprintf("OAuth2 %s", GetDataManagerToken(arvLogger)))
+       req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken)
        return
 }
 
+// ReadServerResponse reads reasponse from keep server
 func ReadServerResponse(arvLogger *logger.Logger,
        keepServer ServerAddress,
        resp *http.Response) (response ServerResponse) {
@@ -328,7 +294,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
                now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
                        serverInfo["index_response_received_at"] = now
                })
        }
@@ -375,7 +341,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
 
                if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
                        // This server returned multiple lines containing the same block digest.
-                       numDuplicates += 1
+                       numDuplicates++
                        // Keep the block that's newer.
                        if storedBlock.Mtime < blockInfo.Mtime {
                                response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
@@ -396,7 +362,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
                now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
 
                        serverInfo["processing_finished_at"] = now
                        serverInfo["lines_received"] = numLines
@@ -439,11 +405,12 @@ func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err err
        return
 }
 
+// Summarize results from keep server
 func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
        readServers.BlockReplicationCounts = make(map[int]int)
        for _, infos := range readServers.BlockToServers {
                replication := len(infos)
-               readServers.BlockReplicationCounts[replication] += 1
+               readServers.BlockReplicationCounts[replication]++
        }
 
        if arvLogger != nil {
@@ -452,24 +419,26 @@ func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
                        keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
                })
        }
-
 }
 
+// TrashRequest struct
 type TrashRequest struct {
        Locator    string `json:"locator"`
        BlockMtime int64  `json:"block_mtime"`
 }
 
+// TrashList is an array of TrashRequest objects
 type TrashList []TrashRequest
 
-func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[string]TrashList) (errs []error) {
+// SendTrashLists to trash queue
+func SendTrashLists(kc *keepclient.KeepClient, spl map[string]TrashList) (errs []error) {
        count := 0
        barrier := make(chan error)
 
        client := kc.Client
 
        for url, v := range spl {
-               count += 1
+               count++
                log.Printf("Sending trash list to %v", url)
 
                go (func(url string, v TrashList) {
@@ -487,8 +456,7 @@ func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[
                                return
                        }
 
-                       // Add api token header
-                       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", dataManagerToken))
+                       req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
 
                        // Make the request
                        var resp *http.Response
@@ -512,7 +480,7 @@ func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[
 
        }
 
-       for i := 0; i < count; i += 1 {
+       for i := 0; i < count; i++ {
                b := <-barrier
                if b != nil {
                        errs = append(errs, b)
index f39463ed6233169a9c9509133a729d7bed1fbeb6..2ccf17d45f78419b63fc5074ab9208c7e2e2c3c0 100644 (file)
@@ -22,9 +22,9 @@ type TestHandler struct {
        request TrashList
 }
 
-func (this *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+func (ts *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
        r := json.NewDecoder(req.Body)
-       r.Decode(&this.request)
+       r.Decode(&ts.request)
 }
 
 func (s *KeepSuite) TestSendTrashLists(c *C) {
@@ -53,7 +53,7 @@ func (s *KeepSuite) TestSendTrashLists(c *C) {
 type TestHandlerError struct {
 }
 
-func (this *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+func (tse *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
        http.Error(writer, "I'm a teapot", 418)
 }
 
index 94f06764a1d5e6c9522050ad44f65423ece9630d..152314cf6f4cb5dadddbdf2e3288e711b4ee8b70 100644 (file)
@@ -1,13 +1,16 @@
 /* Ensures that we only have one copy of each unique string. This is
 /* not designed for concurrent access. */
+
 package summary
 
 // This code should probably be moved somewhere more universal.
 
+// CanonicalString struct
 type CanonicalString struct {
        m map[string]string
 }
 
+// Get a CanonicalString
 func (cs *CanonicalString) Get(s string) (r string) {
        if cs.m == nil {
                cs.m = make(map[string]string)
index 8c37e99ade723a5c51a1f80f6c203dce504100d3..18b3aec8190408897d2857e3e6b2a212936bc21c 100644 (file)
@@ -26,6 +26,7 @@ var (
        readDataFrom string
 )
 
+// DataFetcher to fetch data from keep servers
 type DataFetcher func(arvLogger *logger.Logger,
        readCollections *collection.ReadCollections,
        keepServerInfo *keep.ReadServers)
@@ -41,7 +42,7 @@ func init() {
                "Avoid network i/o and read summary data from this file instead. Used for development only.")
 }
 
-// Writes data we've read to a file.
+// MaybeWriteData writes data we've read to a file.
 //
 // This is useful for development, so that we don't need to read all
 // our data from the network every time we tweak something.
@@ -53,33 +54,33 @@ func MaybeWriteData(arvLogger *logger.Logger,
        keepServerInfo keep.ReadServers) bool {
        if writeDataTo == "" {
                return false
-       } else {
-               summaryFile, err := os.Create(writeDataTo)
-               if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Failed to open %s: %v", writeDataTo, err))
-               }
-               defer summaryFile.Close()
+       }
+       summaryFile, err := os.Create(writeDataTo)
+       if err != nil {
+               loggerutil.FatalWithMessage(arvLogger,
+                       fmt.Sprintf("Failed to open %s: %v", writeDataTo, err))
+       }
+       defer summaryFile.Close()
 
-               enc := gob.NewEncoder(summaryFile)
-               data := serializedData{
-                       ReadCollections: readCollections,
-                       KeepServerInfo:  keepServerInfo}
-               err = enc.Encode(data)
-               if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Failed to write summary data: %v", err))
-               }
-               log.Printf("Wrote summary data to: %s", writeDataTo)
-               return true
+       enc := gob.NewEncoder(summaryFile)
+       data := serializedData{
+               ReadCollections: readCollections,
+               KeepServerInfo:  keepServerInfo}
+       err = enc.Encode(data)
+       if err != nil {
+               loggerutil.FatalWithMessage(arvLogger,
+                       fmt.Sprintf("Failed to write summary data: %v", err))
        }
+       log.Printf("Wrote summary data to: %s", writeDataTo)
+       return true
 }
 
+// ShouldReadData should not be used outside of development
 func ShouldReadData() bool {
        return readDataFrom != ""
 }
 
-// Reads data that we've written to a file.
+// ReadData reads data that we've written to a file.
 //
 // This is useful for development, so that we don't need to read all
 // our data from the network every time we tweak something.
index b326c9521ab0d7b545fd52c340c2b17455ea5aa5..cc01249a624a7f4947cdcfc8dafd73dd7e347377 100644 (file)
@@ -1,4 +1,5 @@
 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
+
 package summary
 
 import (
@@ -14,19 +15,21 @@ import (
        "strings"
 )
 
+// Locator is a block digest
 type Locator blockdigest.DigestWithSize
 
+// MarshalJSON encoding
 func (l Locator) MarshalJSON() ([]byte, error) {
        return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
 }
 
-// One entry in the Pull List
+// PullRequest represents one entry in the Pull List
 type PullRequest struct {
        Locator Locator  `json:"locator"`
        Servers []string `json:"servers"`
 }
 
-// The Pull List for a particular server
+// PullList for a particular server
 type PullList []PullRequest
 
 // PullListByLocator implements sort.Interface for PullList based on
@@ -49,6 +52,7 @@ func (a PullListByLocator) Less(i, j int) bool {
        return false
 }
 
+// PullServers struct
 // For a given under-replicated block, this structure represents which
 // servers should pull the specified block and which servers they can
 // pull it from.
@@ -57,8 +61,8 @@ type PullServers struct {
        From []string // Servers that already contain the specified block
 }
 
-// Creates a map from block locator to PullServers with one entry for
-// each under-replicated block.
+// ComputePullServers creates a map from block locator to PullServers
+// with one entry for each under-replicated block.
 //
 // This method ignores zero-replica blocks since there are no servers
 // to pull them from, so callers should feel free to omit them, but
@@ -78,7 +82,7 @@ func ComputePullServers(kc *keepclient.KeepClient,
                writableServers[cs.Get(url)] = struct{}{}
        }
 
-       for block, _ := range underReplicated {
+       for block := range underReplicated {
                serversStoringBlock := keepServerInfo.BlockToServers[block]
                numCopies := len(serversStoringBlock)
                numCopiesMissing := blockToDesiredReplication[block] - numCopies
@@ -109,9 +113,9 @@ func ComputePullServers(kc *keepclient.KeepClient,
        return m
 }
 
-// Creates a pull list in which the To and From fields preserve the
-// ordering of sorted servers and the contents are all canonical
-// strings.
+// CreatePullServers creates a pull list in which the To and From
+// fields preserve the ordering of sorted servers and the contents
+// are all canonical strings.
 func CreatePullServers(cs CanonicalString,
        serverHasBlock map[string]struct{},
        writableServers map[string]struct{},
@@ -142,12 +146,12 @@ func CreatePullServers(cs CanonicalString,
        return
 }
 
-// Strips the protocol prefix from a url.
+// RemoveProtocolPrefix strips the protocol prefix from a url.
 func RemoveProtocolPrefix(url string) string {
        return url[(strings.LastIndex(url, "/") + 1):]
 }
 
-// Produces a PullList for each keep server.
+// BuildPullLists produces a PullList for each keep server.
 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
        spl = map[string]PullList{}
        // We don't worry about canonicalizing our strings here, because we
@@ -166,7 +170,7 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
        return
 }
 
-// Writes each pull list to a file.
+// WritePullLists writes each pull list to a file.
 // The filename is based on the hostname.
 //
 // This is just a hack for prototyping, it is not expected to be used
index edd760b035d066627b51f913799bb2606f4c0141..9fb0316b736c74e789f6e0edea96042c05fd0a91 100644 (file)
@@ -1,4 +1,5 @@
 // Summarizes Collection Data and Keep Server Contents.
+
 package summary
 
 // TODO(misha): Check size of blocks as well as their digest.
@@ -11,31 +12,33 @@ import (
        "sort"
 )
 
+// BlockSet is a map of blocks
 type BlockSet map[blockdigest.DigestWithSize]struct{}
 
-// Adds a single block to the set.
+// Insert adds a single block to the set.
 func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) {
        bs[digest] = struct{}{}
 }
 
-// Adds a set of blocks to the set.
+// Union adds a set of blocks to the set.
 func (bs BlockSet) Union(obs BlockSet) {
        for k, v := range obs {
                bs[k] = v
        }
 }
 
-// We use the collection index to save space. To convert to and from
+// CollectionIndexSet is used to save space. To convert to and from
 // the uuid, use collection.ReadCollections' fields
-// CollectionIndexToUuid and CollectionUuidToIndex.
+// CollectionIndexToUUID and CollectionUUIDToIndex.
 type CollectionIndexSet map[int]struct{}
 
-// Adds a single collection to the set. The collection is specified by
+// Insert adds a single collection to the set. The collection is specified by
 // its index.
 func (cis CollectionIndexSet) Insert(collectionIndex int) {
        cis[collectionIndex] = struct{}{}
 }
 
+// ToCollectionIndexSet gets block to collection indices
 func (bs BlockSet) ToCollectionIndexSet(
        readCollections collection.ReadCollections,
        collectionIndexSet *CollectionIndexSet) {
@@ -46,6 +49,7 @@ func (bs BlockSet) ToCollectionIndexSet(
        }
 }
 
+// ReplicationLevels struct
 // Keeps track of the requested and actual replication levels.
 // Currently this is only used for blocks but could easily be used for
 // collections as well.
@@ -59,18 +63,20 @@ type ReplicationLevels struct {
        Actual int
 }
 
-// Maps from replication levels to their blocks.
+// ReplicationLevelBlockSetMap maps from replication levels to their blocks.
 type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
 
-// An individual entry from ReplicationLevelBlockSetMap which only reports the number of blocks, not which blocks.
+// ReplicationLevelBlockCount is an individual entry from ReplicationLevelBlockSetMap
+// which only reports the number of blocks, not which blocks.
 type ReplicationLevelBlockCount struct {
        Levels ReplicationLevels
        Count  int
 }
 
-// An ordered list of ReplicationLevelBlockCount useful for reporting.
+// ReplicationLevelBlockSetSlice is an ordered list of ReplicationLevelBlockCount useful for reporting.
 type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
 
+// ReplicationSummary sturct
 type ReplicationSummary struct {
        CollectionBlocksNotInKeep  BlockSet
        UnderReplicatedBlocks      BlockSet
@@ -84,7 +90,7 @@ type ReplicationSummary struct {
        CorrectlyReplicatedCollections CollectionIndexSet
 }
 
-// This struct counts the elements in each set in ReplicationSummary.
+// ReplicationSummaryCounts struct counts the elements in each set in ReplicationSummary.
 type ReplicationSummaryCounts struct {
        CollectionBlocksNotInKeep      int
        UnderReplicatedBlocks          int
@@ -97,8 +103,8 @@ type ReplicationSummaryCounts struct {
        CorrectlyReplicatedCollections int
 }
 
-// Gets the BlockSet for a given set of ReplicationLevels, creating it
-// if it doesn't already exist.
+// GetOrCreate gets the BlockSet for a given set of ReplicationLevels,
+// creating it if it doesn't already exist.
 func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
        repLevels ReplicationLevels) (bs BlockSet) {
        bs, exists := rlbs[repLevels]
@@ -109,21 +115,21 @@ func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
        return
 }
 
-// Adds a block to the set for a given replication level.
+// Insert adds a block to the set for a given replication level.
 func (rlbs ReplicationLevelBlockSetMap) Insert(
        repLevels ReplicationLevels,
        block blockdigest.DigestWithSize) {
        rlbs.GetOrCreate(repLevels).Insert(block)
 }
 
-// Adds a set of blocks to the set for a given replication level.
+// Union adds a set of blocks to the set for a given replication level.
 func (rlbs ReplicationLevelBlockSetMap) Union(
        repLevels ReplicationLevels,
        bs BlockSet) {
        rlbs.GetOrCreate(repLevels).Union(bs)
 }
 
-// Outputs a sorted list of ReplicationLevelBlockCounts.
+// Counts outputs a sorted list of ReplicationLevelBlockCounts.
 func (rlbs ReplicationLevelBlockSetMap) Counts() (
        sorted ReplicationLevelBlockSetSlice) {
        sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
@@ -153,6 +159,7 @@ func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) {
        rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
 }
 
+// ComputeCounts returns ReplicationSummaryCounts
 func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
        // TODO(misha): Consider rewriting this method to iterate through
        // the fields using reflection, instead of explictily listing the
@@ -169,6 +176,7 @@ func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
        return rsc
 }
 
+// PrettyPrint ReplicationSummaryCounts
 func (rsc ReplicationSummaryCounts) PrettyPrint() string {
        return fmt.Sprintf("Replication Block Counts:"+
                "\n Missing From Keep: %d, "+
@@ -192,12 +200,13 @@ func (rsc ReplicationSummaryCounts) PrettyPrint() string {
                rsc.CorrectlyReplicatedCollections)
 }
 
+// BucketReplication returns ReplicationLevelBlockSetMap
 func BucketReplication(readCollections collection.ReadCollections,
-       keepServerInfo keep.ReadServers) (rlbsm ReplicationLevelBlockSetMap) {
-       rlbsm = make(ReplicationLevelBlockSetMap)
+       keepServerInfo keep.ReadServers) (rlbs ReplicationLevelBlockSetMap) {
+       rlbs = make(ReplicationLevelBlockSetMap)
 
        for block, requestedReplication := range readCollections.BlockToDesiredReplication {
-               rlbsm.Insert(
+               rlbs.Insert(
                        ReplicationLevels{
                                Requested: requestedReplication,
                                Actual:    len(keepServerInfo.BlockToServers[block])},
@@ -206,7 +215,7 @@ func BucketReplication(readCollections collection.ReadCollections,
 
        for block, servers := range keepServerInfo.BlockToServers {
                if 0 == readCollections.BlockToDesiredReplication[block] {
-                       rlbsm.Insert(
+                       rlbs.Insert(
                                ReplicationLevels{Requested: 0, Actual: len(servers)},
                                block)
                }
@@ -214,7 +223,8 @@ func BucketReplication(readCollections collection.ReadCollections,
        return
 }
 
-func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets(
+// SummarizeBuckets reads collections and summarizes
+func (rlbs ReplicationLevelBlockSetMap) SummarizeBuckets(
        readCollections collection.ReadCollections) (
        rs ReplicationSummary) {
        rs.CollectionBlocksNotInKeep = make(BlockSet)
@@ -228,7 +238,7 @@ func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets(
        rs.OverReplicatedCollections = make(CollectionIndexSet)
        rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
 
-       for levels, bs := range rlbsm {
+       for levels, bs := range rlbs {
                if levels.Actual == 0 {
                        rs.CollectionBlocksNotInKeep.Union(bs)
                } else if levels.Requested == 0 {
@@ -254,7 +264,7 @@ func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets(
        rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections,
                &rs.OverReplicatedCollections)
 
-       for i := range readCollections.CollectionIndexToUuid {
+       for i := range readCollections.CollectionIndexToUUID {
                if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep {
                } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated {
                } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated {
index ea76df4d34043ea1f706ca89a82b699b52c806ca..cc4eb92560b26b385378ffa6d947abb2bc9f0168 100644 (file)
@@ -215,6 +215,6 @@ func TestMixedReplication(t *testing.T) {
        returnedSummary := SummarizeReplication(rc, keepInfo)
 
        if !reflect.DeepEqual(returnedSummary, expectedSummary) {
-               t.Fatalf("Expected returnedSummary to look like: \n%+v but instead it is: \n%+v. Index to UUID is %v. BlockToCollectionIndices is %v.", expectedSummary, returnedSummary, rc.CollectionIndexToUuid, rc.BlockToCollectionIndices)
+               t.Fatalf("Expected returnedSummary to look like: \n%+v but instead it is: \n%+v. Index to UUID is %v. BlockToCollectionIndices is %v.", expectedSummary, returnedSummary, rc.CollectionIndexToUUID, rc.BlockToCollectionIndices)
        }
 }
index 0bedc9cc3a6cd80854185f99867257e0727448af..b6ceacecde2b8e2ffe810deea9e3777aade06625 100644 (file)
@@ -1,4 +1,5 @@
 // Code for generating trash lists
+
 package summary
 
 import (
@@ -9,6 +10,7 @@ import (
        "time"
 )
 
+// BuildTrashLists builds list of blocks to be sent to trash queue
 func BuildTrashLists(kc *keepclient.KeepClient,
        keepServerInfo *keep.ReadServers,
        keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) {
@@ -40,19 +42,19 @@ func buildTrashListsInternal(writableServers map[string]struct{},
        m = make(map[string]keep.TrashList)
 
        for block := range keepBlocksNotInCollections {
-               for _, block_on_server := range keepServerInfo.BlockToServers[block] {
-                       if block_on_server.Mtime >= expiry {
+               for _, blockOnServer := range keepServerInfo.BlockToServers[block] {
+                       if blockOnServer.Mtime >= expiry {
                                continue
                        }
 
                        // block is older than expire cutoff
-                       srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
+                       srv := keepServerInfo.KeepServerIndexToAddress[blockOnServer.ServerIndex].String()
 
                        if _, writable := writableServers[srv]; !writable {
                                continue
                        }
 
-                       m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
+                       m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: blockOnServer.Mtime})
                }
        }
        return
index 7620631a157688adbf06f4db90e10edebb7fb4b5..555211fe0275e9a42b49625557f8d505999b9c2d 100644 (file)
@@ -34,7 +34,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
                                keep.BlockServerInfo{1, 101}}}}
 
        // only block0 is in delete set
-       var bs BlockSet = make(BlockSet)
+       var bs = make(BlockSet)
        bs[block0] = struct{}{}
 
        // Test trash list where only sv0 is on writable list.
index c4a12c5a5c44a9af5ca2360e70fe9ccdf961a330..ec11af5cf51c8c1a65bdf4827962a74aa90a03fe 100644 (file)
@@ -36,6 +36,10 @@ const BlockSize = 64 * 1024 * 1024
 // in order to permit writes.
 const MinFreeKilobytes = BlockSize / 1024
 
+// Until #6221 is resolved, never_delete must be true.
+// However, allow it to be false in testing with TEST_DATA_MANAGER_TOKEN
+const TEST_DATA_MANAGER_TOKEN = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
+
 // ProcMounts /proc/mounts
 var ProcMounts = "/proc/mounts"
 
@@ -293,10 +297,6 @@ func main() {
 
        flag.Parse()
 
-       if neverDelete != true {
-               log.Fatal("neverDelete must be true, see #6221")
-       }
-
        if maxBuffers < 0 {
                log.Fatal("-max-buffers must be greater than zero.")
        }
@@ -347,6 +347,11 @@ func main() {
                        log.Fatalf("reading data manager token: %s\n", err)
                }
        }
+
+       if neverDelete != true && dataManagerToken != TEST_DATA_MANAGER_TOKEN {
+               log.Fatal("never_delete must be true, see #6221")
+       }
+
        if blobSigningKeyFile != "" {
                if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
                        PermissionSecret = bytes.TrimSpace(buf)