6221: Add SendTrashLists() test. Propagate errors from BuildTrashList. Style fixes.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 16 Jul 2015 20:12:47 +0000 (16:12 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 16 Jul 2015 20:12:47 +0000 (16:12 -0400)
services/datamanager/datamanager.go
services/datamanager/keep/keep.go
services/datamanager/keep/keep_test.go [new file with mode: 0644]
services/datamanager/summary/pull_list_test.go
services/datamanager/summary/trash_list.go
services/datamanager/summary/trash_list_test.go
services/keepstore/trash_worker.go

index d7ac0d2e1fb415bc6e563d6bc744d681aa2717ec..70a9ae785956396bab936e73b1a7f6ed04c63731 100644 (file)
@@ -41,19 +41,25 @@ func init() {
 func main() {
        flag.Parse()
        if minutesBetweenRuns == 0 {
-               singlerun()
+               err := singlerun()
+               if err != nil {
+                       log.Fatalf("Got an error: %v", err)
+               }
        } else {
                waitTime := time.Minute * time.Duration(minutesBetweenRuns)
                for {
                        log.Println("Beginning Run")
-                       singlerun()
+                       err := singlerun()
+                       if err != nil {
+                               log.Printf("Got an error: %v", err)
+                       }
                        log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
                        time.Sleep(waitTime)
                }
        }
 }
 
-func singlerun() {
+func singlerun() error {
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Fatalf("Error setting up arvados client %s", err.Error())
@@ -119,31 +125,39 @@ func singlerun() {
                        fmt.Sprintf("Error setting up keep client %s", err.Error()))
        }
 
+       // Log that we're finished. We force the recording, since go will
+       // not wait for the write timer before exiting.
+       if arvLogger != nil {
+               defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+                       summaryInfo := logger.GetOrCreateMap(p, "summary_info")
+                       summaryInfo["block_replication_counts"] = bucketCounts
+                       summaryInfo["replication_summary"] = replicationCounts
+                       p["summary_info"] = summaryInfo
+
+                       p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
+               })
+       }
+
        pullServers := summary.ComputePullServers(kc,
                &keepServerInfo,
                readCollections.BlockToDesiredReplication,
                replicationSummary.UnderReplicatedBlocks)
 
        pullLists := summary.BuildPullLists(pullServers)
-       trashLists := summary.BuildTrashLists(kc,
+
+       trashLists, trashErr := summary.BuildTrashLists(kc,
                &keepServerInfo,
                replicationSummary.KeepBlocksNotInCollections)
 
        summary.WritePullLists(arvLogger, pullLists)
-       keep.SendTrashLists(arvLogger, kc, trashLists)
 
-       // Log that we're finished. We force the recording, since go will
-       // not wait for the write timer before exiting.
-       if arvLogger != nil {
-               arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
-                       summaryInfo := logger.GetOrCreateMap(p, "summary_info")
-                       summaryInfo["block_replication_counts"] = bucketCounts
-                       summaryInfo["replication_summary"] = replicationCounts
-                       p["summary_info"] = summaryInfo
-
-                       p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
-               })
+       if trashErr != nil {
+               return err
+       } else {
+               keep.SendTrashLists(keep.GetDataManagerToken(arvLogger), kc, trashLists)
        }
+
+       return nil
 }
 
 // Returns a data fetcher that fetches data from remote servers.
index 871acc8e793929b909b1256b3ff7e14a90059eba..08e54e6ced6079af651c4ca495fe474b9206e256 100644 (file)
@@ -98,7 +98,7 @@ func (s ServerAddress) URL() string {
        }
 }
 
-func getDataManagerToken(arvLogger *logger.Logger) string {
+func GetDataManagerToken(arvLogger *logger.Logger) string {
        readDataManagerToken := func() {
                if dataManagerTokenFile == "" {
                        flag.Usage()
@@ -308,7 +308,7 @@ func CreateIndexRequest(arvLogger *logger.Logger,
        }
 
        req.Header.Add("Authorization",
-               fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+               fmt.Sprintf("OAuth2 %s", GetDataManagerToken(arvLogger)))
        return
 }
 
@@ -461,9 +461,11 @@ type TrashRequest struct {
 
 type TrashList []TrashRequest
 
-func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList) {
+func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[string]TrashList) {
        count := 0
-       rendezvous := make(chan bool)
+       barrier := make(chan bool)
+
+       client := kc.Client
 
        for url, v := range spl {
                count += 1
@@ -471,7 +473,7 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
 
                go (func(url string, v TrashList) {
                        defer (func() {
-                               rendezvous <- true
+                               barrier <- true
                        })()
 
                        pipeReader, pipeWriter := io.Pipe()
@@ -488,16 +490,16 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
                        }
 
                        // Add api token header
-                       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+                       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", dataManagerToken))
 
                        // Make the request
                        var resp *http.Response
-                       if resp, err = kc.Client.Do(req); err != nil {
+                       if resp, err = client.Do(req); err != nil {
                                log.Printf("Error sending trash list to %v error: %v", url, err.Error())
                                return
                        }
 
-                       log.Printf("Sent trash list to %v: response was HTTP %d", url, resp.Status)
+                       log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status)
 
                        io.Copy(ioutil.Discard, resp.Body)
                        resp.Body.Close()
@@ -506,6 +508,6 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
        }
 
        for i := 0; i < count; i += 1 {
-               <-rendezvous
+               <-barrier
        }
 }
diff --git a/services/datamanager/keep/keep_test.go b/services/datamanager/keep/keep_test.go
new file mode 100644 (file)
index 0000000..85f704c
--- /dev/null
@@ -0,0 +1,92 @@
+package keep
+
+import (
+       "encoding/json"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       . "gopkg.in/check.v1"
+       "net/http"
+       "net/http/httptest"
+       "testing"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       TestingT(t)
+}
+
+type KeepSuite struct{}
+
+var _ = Suite(&KeepSuite{})
+
+type TestHandler struct {
+       request TrashList
+}
+
+func (this *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+       r := json.NewDecoder(req.Body)
+       r.Decode(&this.request)
+}
+
+func (s *KeepSuite) TestSendTrashLists(c *C) {
+       th := TestHandler{}
+       server := httptest.NewServer(&th)
+
+       tl := map[string]TrashList{
+               server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+       kc := keepclient.KeepClient{Client: &http.Client{}}
+       kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+               map[string]string{"xxxx": server.URL},
+               map[string]string{})
+
+       SendTrashLists("", &kc, tl)
+       server.Close()
+
+       c.Check(th.request,
+               DeepEquals,
+               tl[server.URL])
+
+}
+
+type TestHandlerError struct {
+}
+
+func (this *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+       http.Error(writer, "I'm a teapot", 405)
+}
+
+func (s *KeepSuite) TestSendTrashListError(c *C) {
+       // Server responds with an error
+
+       th := TestHandlerError{}
+       server := httptest.NewServer(&th)
+
+       tl := map[string]TrashList{
+               server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+       kc := keepclient.KeepClient{Client: &http.Client{}}
+       kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+               map[string]string{"xxxx": server.URL},
+               map[string]string{})
+
+       SendTrashLists("", &kc, tl)
+       server.Close()
+}
+
+func (s *KeepSuite) TestSendTrashListError2(c *C) {
+       // Server is not reachable
+
+       th := TestHandler{}
+       server := httptest.NewServer(&th)
+       server.Close()
+
+       tl := map[string]TrashList{
+               server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+       kc := keepclient.KeepClient{Client: &http.Client{}}
+       kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+               map[string]string{"xxxx": server.URL},
+               map[string]string{})
+
+       SendTrashLists("", &kc, tl)
+}
index f22d47d29b1e4560b054d70d1ecd845c8b38fdb4..e2050c2b1ebefbc42bf950fc1ad30121d63b9c84 100644 (file)
@@ -9,13 +9,13 @@ import (
 )
 
 // Gocheck boilerplate
-func Test(t *testing.T) {
+func TestPullLists(t *testing.T) {
        TestingT(t)
 }
 
-type MySuite struct{}
+type PullSuite struct{}
 
-var _ = Suite(&MySuite{})
+var _ = Suite(&PullSuite{})
 
 // Helper method to declare string sets more succinctly
 // Could be placed somewhere more general.
@@ -27,7 +27,7 @@ func stringSet(slice ...string) (m map[string]struct{}) {
        return
 }
 
-func (s *MySuite) TestPullListPrintsJSONCorrectly(c *C) {
+func (s *PullSuite) TestPullListPrintsJSONCorrectly(c *C) {
        pl := PullList{PullRequest{
                Locator: Locator(blockdigest.MakeTestDigestSpecifySize(0xBadBeef, 56789)),
                Servers: []string{"keep0.qr1hi.arvadosapi.com:25107",
@@ -41,7 +41,7 @@ func (s *MySuite) TestPullListPrintsJSONCorrectly(c *C) {
        c.Check(string(b), Equals, expectedOutput)
 }
 
-func (s *MySuite) TestCreatePullServers(c *C) {
+func (s *PullSuite) TestCreatePullServers(c *C) {
        var cs CanonicalString
        c.Check(
                CreatePullServers(cs,
@@ -155,7 +155,7 @@ var PullListMapEquals Checker = &pullListMapEqualsChecker{&CheckerInfo{
        Params: []string{"obtained", "expected"},
 }}
 
-func (s *MySuite) TestBuildPullLists(c *C) {
+func (s *PullSuite) TestBuildPullLists(c *C) {
        c.Check(
                BuildPullLists(map[Locator]PullServers{}),
                PullListMapEquals,
index 330cd4e45a33a23ed01a9946c10c76dcbea34d7e..0bedc9cc3a6cd80854185f99867257e0727448af 100644 (file)
@@ -2,15 +2,16 @@
 package summary
 
 import (
+       "errors"
+       "fmt"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "log"
        "time"
 )
 
 func BuildTrashLists(kc *keepclient.KeepClient,
        keepServerInfo *keep.ReadServers,
-       keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
+       keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) {
 
        // Servers that are writeable
        writableServers := map[string]struct{}{}
@@ -20,8 +21,7 @@ func BuildTrashLists(kc *keepclient.KeepClient,
 
        _ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
        if err != nil {
-               log.Printf("Failed to get blobSignatureTtl: %v", err)
-               return map[string]keep.TrashList{}
+               return nil, errors.New(fmt.Sprintf("Failed to get blobSignatureTtl, can't build trash lists: %v", err))
        }
 
        ttl := int64(_ttl.(float64))
@@ -29,28 +29,30 @@ func BuildTrashLists(kc *keepclient.KeepClient,
        // expire unreferenced blocks more than "ttl" seconds old.
        expiry := time.Now().UTC().Unix() - ttl
 
-       return BuildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections)
+       return buildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections), nil
 }
 
-func BuildTrashListsInternal(writableServers map[string]struct{},
+func buildTrashListsInternal(writableServers map[string]struct{},
        keepServerInfo *keep.ReadServers,
        expiry int64,
        keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
 
        m = make(map[string]keep.TrashList)
 
-       for block, _ := range keepBlocksNotInCollections {
+       for block := range keepBlocksNotInCollections {
                for _, block_on_server := range keepServerInfo.BlockToServers[block] {
-                       if block_on_server.Mtime < expiry {
-                               // block is older than expire cutoff
-                               srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
+                       if block_on_server.Mtime >= expiry {
+                               continue
+                       }
 
-                               _, writable := writableServers[srv]
+                       // block is older than expire cutoff
+                       srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
 
-                               if writable {
-                                       m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
-                               }
+                       if _, writable := writableServers[srv]; !writable {
+                               continue
                        }
+
+                       m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
                }
        }
        return
index b6f9582e7403f8110e64e67f0e3de2d2a8d591ed..7620631a157688adbf06f4db90e10edebb7fb4b5 100644 (file)
@@ -8,7 +8,7 @@ import (
 )
 
 // Gocheck boilerplate
-func TrashTest(t *testing.T) {
+func TestTrash(t *testing.T) {
        TestingT(t)
 }
 
@@ -38,7 +38,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
        bs[block0] = struct{}{}
 
        // Test trash list where only sv0 is on writable list.
-       c.Check(BuildTrashListsInternal(
+       c.Check(buildTrashListsInternal(
                map[string]struct{}{
                        sv0.URL(): struct{}{}},
                &keepServerInfo,
@@ -49,7 +49,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
                        "http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
 
        // Test trash list where both sv0 and sv1 are on writable list.
-       c.Check(BuildTrashListsInternal(
+       c.Check(buildTrashListsInternal(
                map[string]struct{}{
                        sv0.URL(): struct{}{},
                        sv1.URL(): struct{}{}},
@@ -62,7 +62,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
                        "http://keep1.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
 
        // Test trash list where only block on sv0 is expired
-       c.Check(BuildTrashListsInternal(
+       c.Check(buildTrashListsInternal(
                map[string]struct{}{
                        sv0.URL(): struct{}{},
                        sv1.URL(): struct{}{}},
index 69fbf7456a12f2bb1a39cae0e7c9760f0a12d102..4fbe4bb59624e22e820cf7eab7c2bb5013212e14 100644 (file)
@@ -41,7 +41,7 @@ func TrashItem(trashRequest TrashRequest) {
                        continue
                }
                if trashRequest.BlockMtime != mtime.Unix() {
-                       log.Printf("%v Delete(%v): mtime does not match", volume, trashRequest.Locator)
+                       log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
                        continue
                }