summary.WritePullLists(arvLogger, pullLists)
- summary.WriteTrashLists(arvLogger, trashLists)
+ keep.SendTrashLists(arvLogger, kc, trashLists)
// Log that we're finished. We force the recording, since go will
// not wait for the write timer before exiting.
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
"io"
)
type ServerAddress struct {
+ SSL bool `json:service_ssl_flag`
Host string `json:"service_host"`
Port int `json:"service_port"`
Uuid string `json:"uuid"`
}
func (s ServerAddress) HostPort() string {
- return fmt.Sprintf("%s:%d", s.Host, s.Port)
+ if s.SSL {
+ return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
+ } else {
+ return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
+ }
}
func getDataManagerToken(arvLogger *logger.Logger) string {
}
}
+
+type TrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+}
+
+type TrashList []TrashRequest
+
+func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList) {
+ count := 0
+ rendezvous := make(chan bool)
+
+ for url, v := range spl {
+ count += 1
+ log.Printf("Sending trash list to %v", url)
+
+ go (func(url string, v TrashList) {
+ defer (func() {
+ rendezvous <- true
+ })()
+
+ pipeReader, pipeWriter := io.Pipe()
+ go (func() {
+ enc := json.NewEncoder(pipeWriter)
+ enc.Encode(v)
+ pipeWriter.Close()
+ })()
+
+ req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
+ if err != nil {
+ log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
+ return
+ }
+
+ // Add api token header
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+
+ // Make the request
+ var resp *http.Response
+ if resp, err = kc.Client.Do(req); err != nil {
+ log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+ return
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+ }
+
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ })(url, v)
+
+ }
+
+ for i := 0; i < count; i += 1 {
+ <-rendezvous
+ }
+}
for _, host := range sortedServers {
// Strip the protocol portion of the url.
// Use the canonical copy of the string to avoid memory waste.
- server := cs.Get(RemoveProtocolPrefix(host))
+ server := cs.Get(host)
_, hasBlock := serverHasBlock[server]
if hasBlock {
// The from field should include the protocol.
pullLists map[string]PullList) {
r := strings.NewReplacer(":", ".")
for host, list := range pullLists {
- filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
+ filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
pullListFile, err := os.Create(filename)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
},
})
}
-
-func (s *MySuite) TestRemoveProtocolPrefix(c *C) {
- c.Check(RemoveProtocolPrefix("blah"), Equals, "blah")
- c.Check(RemoveProtocolPrefix("bl/ah"), Equals, "ah")
- c.Check(RemoveProtocolPrefix("http://blah.com"), Equals, "blah.com")
- c.Check(RemoveProtocolPrefix("https://blah.com:8900"), Equals, "blah.com:8900")
-}
"time"
)
-type TrashRequest struct {
- Locator string `json:"locator"`
- BlockMtime int64 `json:"block_mtime"`
-}
-
-type TrashList []TrashRequest
-
func BuildTrashLists(kc *keepclient.KeepClient,
keepServerInfo *keep.ReadServers,
- keepBlocksNotInCollections BlockSet) (m map[string]TrashList) {
+ keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
- m = make(map[string]TrashList)
+ // Servers that are writeable
+ writableServers := map[string]struct{}{}
+ for _, url := range kc.WritableLocalRoots() {
+ writableServers[url] = struct{}{}
+ }
+
+ m = make(map[string]keep.TrashList)
_ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
if err != nil {
if block_on_server.Mtime < expiry {
// block is older than expire cutoff
srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
- m[srv] = append(m[srv], TrashRequest{Locator: block.String(), BlockMtime: block_on_server.Mtime})
+
+ _, writable := writableServers[srv]
+
+ if writable {
+ m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
+ }
}
}
}
// This is just a hack for prototyping, it is not expected to be used
// in production.
func WriteTrashLists(arvLogger *logger.Logger,
- trashLists map[string]TrashList) {
+ trashLists map[string]keep.TrashList) {
r := strings.NewReplacer(":", ".")
for host, list := range trashLists {
- filename := fmt.Sprintf("trash_list.%s", r.Replace(host))
+ filename := fmt.Sprintf("trash_list.%s", r.Replace(RemoveProtocolPrefix(host)))
trashListFile, err := os.Create(filename)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
blob_signature_ttl)
return
}
+
for _, volume := range KeepVM.AllWritable() {
mtime, err := volume.Mtime(trashRequest.Locator)
- if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+ if err != nil {
+ log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
continue
}
+ if trashRequest.BlockMtime != mtime.Unix() {
+ log.Printf("%v Delete(%v): mtime does not match", volume, trashRequest.Locator)
+ }
if never_delete {
err = errors.New("did not delete block because never_delete is true")