6221: Successfully writes trash lists.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 15 Jul 2015 15:00:48 +0000 (11:00 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 15 Jul 2015 15:00:48 +0000 (11:00 -0400)
services/datamanager/datamanager.go
services/datamanager/keep/keep.go
services/datamanager/summary/pull_list.go
services/datamanager/summary/pull_list_test.go
services/datamanager/summary/trash_list.go
services/keepstore/trash_worker.go

index 2078edcdaaee5495263cda1cb064c8d540ace5db..28d558bb8d28711563e72eafb37f2aadb00371d0 100644 (file)
@@ -131,7 +131,7 @@ func singlerun() {
 
        summary.WritePullLists(arvLogger, pullLists)
 
-       summary.WriteTrashLists(arvLogger, trashLists)
+       keep.SendTrashLists(arvLogger, kc, trashLists)
 
        // Log that we're finished. We force the recording, since go will
        // not wait for the write timer before exiting.
index c2346cd154fd7fe8ecda1a6a1130061a64bfdb8e..112823ed5fed036b3d29a257086dd41dc93b5582 100644 (file)
@@ -9,6 +9,7 @@ import (
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "io"
@@ -22,6 +23,7 @@ import (
 )
 
 type ServerAddress struct {
+       SSL  bool   `json:service_ssl_flag`
        Host string `json:"service_host"`
        Port int    `json:"service_port"`
        Uuid string `json:"uuid"`
@@ -89,7 +91,11 @@ func (s ServerAddress) String() string {
 }
 
 func (s ServerAddress) HostPort() string {
-       return fmt.Sprintf("%s:%d", s.Host, s.Port)
+       if s.SSL {
+               return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
+       } else {
+               return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
+       }
 }
 
 func getDataManagerToken(arvLogger *logger.Logger) string {
@@ -447,3 +453,61 @@ func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
        }
 
 }
+
+type TrashRequest struct {
+       Locator    string `json:"locator"`
+       BlockMtime int64  `json:"block_mtime"`
+}
+
+type TrashList []TrashRequest
+
+func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList) {
+       count := 0
+       rendezvous := make(chan bool)
+
+       for url, v := range spl {
+               count += 1
+               log.Printf("Sending trash list to %v", url)
+
+               go (func(url string, v TrashList) {
+                       defer (func() {
+                               rendezvous <- true
+                       })()
+
+                       pipeReader, pipeWriter := io.Pipe()
+                       go (func() {
+                               enc := json.NewEncoder(pipeWriter)
+                               enc.Encode(v)
+                               pipeWriter.Close()
+                       })()
+
+                       req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
+                       if err != nil {
+                               log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
+                               return
+                       }
+
+                       // Add api token header
+                       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+
+                       // Make the request
+                       var resp *http.Response
+                       if resp, err = kc.Client.Do(req); err != nil {
+                               log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+                               return
+                       }
+
+                       if resp.StatusCode != http.StatusOK {
+                               log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+                       }
+
+                       io.Copy(ioutil.Discard, resp.Body)
+                       resp.Body.Close()
+               })(url, v)
+
+       }
+
+       for i := 0; i < count; i += 1 {
+               <-rendezvous
+       }
+}
index d2eef9316ba8ab91794bb0f84822a9cac1ac7b7b..3a246a2b757339e9a8cc8a50076ba116f9049db8 100644 (file)
@@ -126,7 +126,7 @@ func CreatePullServers(cs CanonicalString,
        for _, host := range sortedServers {
                // Strip the protocol portion of the url.
                // Use the canonical copy of the string to avoid memory waste.
-               server := cs.Get(RemoveProtocolPrefix(host))
+               server := cs.Get(host)
                _, hasBlock := serverHasBlock[server]
                if hasBlock {
                        // The from field should include the protocol.
@@ -175,7 +175,7 @@ func WritePullLists(arvLogger *logger.Logger,
        pullLists map[string]PullList) {
        r := strings.NewReplacer(":", ".")
        for host, list := range pullLists {
-               filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
+               filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
                pullListFile, err := os.Create(filename)
                if err != nil {
                        loggerutil.FatalWithMessage(arvLogger,
index 8f17d28bb55986295a0047b3314a1c2d3c650f0e..fb8631b1feb39df3580b1b70da69d4c3f1e5d160 100644 (file)
@@ -270,10 +270,3 @@ func (s *MySuite) TestBuildPullLists(c *C) {
                        },
                })
 }
-
-func (s *MySuite) TestRemoveProtocolPrefix(c *C) {
-       c.Check(RemoveProtocolPrefix("blah"), Equals, "blah")
-       c.Check(RemoveProtocolPrefix("bl/ah"), Equals, "ah")
-       c.Check(RemoveProtocolPrefix("http://blah.com"), Equals, "blah.com")
-       c.Check(RemoveProtocolPrefix("https://blah.com:8900"), Equals, "blah.com:8900")
-}
index fcec557019c23806f6ee51292fc5c968b417347b..efb40e25a7d20e0ef84563d2367b96c7d7bfdd7c 100644 (file)
@@ -14,18 +14,17 @@ import (
        "time"
 )
 
-type TrashRequest struct {
-       Locator    string `json:"locator"`
-       BlockMtime int64  `json:"block_mtime"`
-}
-
-type TrashList []TrashRequest
-
 func BuildTrashLists(kc *keepclient.KeepClient,
        keepServerInfo *keep.ReadServers,
-       keepBlocksNotInCollections BlockSet) (m map[string]TrashList) {
+       keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
 
-       m = make(map[string]TrashList)
+       // Servers that are writeable
+       writableServers := map[string]struct{}{}
+       for _, url := range kc.WritableLocalRoots() {
+               writableServers[url] = struct{}{}
+       }
+
+       m = make(map[string]keep.TrashList)
 
        _ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
        if err != nil {
@@ -43,7 +42,12 @@ func BuildTrashLists(kc *keepclient.KeepClient,
                        if block_on_server.Mtime < expiry {
                                // block is older than expire cutoff
                                srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
-                               m[srv] = append(m[srv], TrashRequest{Locator: block.String(), BlockMtime: block_on_server.Mtime})
+
+                               _, writable := writableServers[srv]
+
+                               if writable {
+                                       m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
+                               }
                        }
                }
        }
@@ -56,10 +60,10 @@ func BuildTrashLists(kc *keepclient.KeepClient,
 // This is just a hack for prototyping, it is not expected to be used
 // in production.
 func WriteTrashLists(arvLogger *logger.Logger,
-       trashLists map[string]TrashList) {
+       trashLists map[string]keep.TrashList) {
        r := strings.NewReplacer(":", ".")
        for host, list := range trashLists {
-               filename := fmt.Sprintf("trash_list.%s", r.Replace(host))
+               filename := fmt.Sprintf("trash_list.%s", r.Replace(RemoveProtocolPrefix(host)))
                trashListFile, err := os.Create(filename)
                if err != nil {
                        loggerutil.FatalWithMessage(arvLogger,
index 6257f7b4700dae6c5e3972aca0ba9b7b7332ad54..5e56f030e28c7616d876a80838869e45b2dc10b7 100644 (file)
@@ -33,11 +33,16 @@ func TrashItem(trashRequest TrashRequest) {
                        blob_signature_ttl)
                return
        }
+
        for _, volume := range KeepVM.AllWritable() {
                mtime, err := volume.Mtime(trashRequest.Locator)
-               if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+               if err != nil {
+                       log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
                        continue
                }
+               if trashRequest.BlockMtime != mtime.Unix() {
+                       log.Printf("%v Delete(%v): mtime does not match", volume, trashRequest.Locator)
+               }
 
                if never_delete {
                        err = errors.New("did not delete block because never_delete is true")