Closes #6221, #6673.
func main() {
flag.Parse()
if minutesBetweenRuns == 0 {
- singlerun()
+ err := singlerun()
+ if err != nil {
+ log.Fatalf("Got an error: %v", err)
+ }
} else {
waitTime := time.Minute * time.Duration(minutesBetweenRuns)
for {
log.Println("Beginning Run")
- singlerun()
+ err := singlerun()
+ if err != nil {
+ log.Printf("Got an error: %v", err)
+ }
log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
time.Sleep(waitTime)
}
}
}
-func singlerun() {
+func singlerun() error {
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Fatalf("Error setting up arvados client %s", err.Error())
fmt.Sprintf("Error setting up keep client %s", err.Error()))
}
- pullServers := summary.ComputePullServers(kc,
- &keepServerInfo,
- readCollections.BlockToDesiredReplication,
- replicationSummary.UnderReplicatedBlocks)
-
- pullLists := summary.BuildPullLists(pullServers)
-
- summary.WritePullLists(arvLogger, pullLists)
-
// Log that we're finished. We force the recording, since go will
// not wait for the write timer before exiting.
if arvLogger != nil {
- arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+ defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
summaryInfo := logger.GetOrCreateMap(p, "summary_info")
summaryInfo["block_replication_counts"] = bucketCounts
summaryInfo["replication_summary"] = replicationCounts
p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
})
}
+
+ pullServers := summary.ComputePullServers(kc,
+ &keepServerInfo,
+ readCollections.BlockToDesiredReplication,
+ replicationSummary.UnderReplicatedBlocks)
+
+ pullLists := summary.BuildPullLists(pullServers)
+
+ trashLists, trashErr := summary.BuildTrashLists(kc,
+ &keepServerInfo,
+ replicationSummary.KeepBlocksNotInCollections)
+
+ summary.WritePullLists(arvLogger, pullLists)
+
+ if trashErr != nil {
+ return err
+ } else {
+ keep.SendTrashLists(keep.GetDataManagerToken(arvLogger), kc, trashLists)
+ }
+
+ return nil
}
// Returns a data fetcher that fetches data from remote servers.
import (
"bufio"
"encoding/json"
+ "errors"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
"io"
)
type ServerAddress struct {
+ SSL bool `json:service_ssl_flag`
Host string `json:"service_host"`
Port int `json:"service_port"`
Uuid string `json:"uuid"`
// TODO(misha): Change this to include the UUID as well.
func (s ServerAddress) String() string {
- return s.HostPort()
+ return s.URL()
}
-func (s ServerAddress) HostPort() string {
- return fmt.Sprintf("%s:%d", s.Host, s.Port)
+func (s ServerAddress) URL() string {
+ if s.SSL {
+ return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
+ } else {
+ return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
+ }
}
-func getDataManagerToken(arvLogger *logger.Logger) string {
+func GetDataManagerToken(arvLogger *logger.Logger) string {
readDataManagerToken := func() {
if dataManagerTokenFile == "" {
flag.Usage()
}
req.Header.Add("Authorization",
- fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+ fmt.Sprintf("OAuth2 %s", GetDataManagerToken(arvLogger)))
return
}
}
}
+
+type TrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+}
+
+type TrashList []TrashRequest
+
+func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[string]TrashList) (errs []error) {
+ count := 0
+ barrier := make(chan error)
+
+ client := kc.Client
+
+ for url, v := range spl {
+ count += 1
+ log.Printf("Sending trash list to %v", url)
+
+ go (func(url string, v TrashList) {
+ pipeReader, pipeWriter := io.Pipe()
+ go (func() {
+ enc := json.NewEncoder(pipeWriter)
+ enc.Encode(v)
+ pipeWriter.Close()
+ })()
+
+ req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
+ if err != nil {
+ log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
+ barrier <- err
+ return
+ }
+
+ // Add api token header
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", dataManagerToken))
+
+ // Make the request
+ var resp *http.Response
+ if resp, err = client.Do(req); err != nil {
+ log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+ barrier <- err
+ return
+ }
+
+ log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status)
+
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ barrier <- errors.New(fmt.Sprintf("Got HTTP code %v", resp.StatusCode))
+ } else {
+ barrier <- nil
+ }
+ })(url, v)
+
+ }
+
+ for i := 0; i < count; i += 1 {
+ b := <-barrier
+ if b != nil {
+ errs = append(errs, b)
+ }
+ }
+
+ return errs
+}
--- /dev/null
+package keep
+
+import (
+ "encoding/json"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ . "gopkg.in/check.v1"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+type KeepSuite struct{}
+
+var _ = Suite(&KeepSuite{})
+
+type TestHandler struct {
+ request TrashList
+}
+
+func (this *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+ r := json.NewDecoder(req.Body)
+ r.Decode(&this.request)
+}
+
+func (s *KeepSuite) TestSendTrashLists(c *C) {
+ th := TestHandler{}
+ server := httptest.NewServer(&th)
+
+ tl := map[string]TrashList{
+ server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+ kc := keepclient.KeepClient{Client: &http.Client{}}
+ kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+ map[string]string{"xxxx": server.URL},
+ map[string]string{})
+
+ err := SendTrashLists("", &kc, tl)
+ server.Close()
+
+ c.Check(err, IsNil)
+
+ c.Check(th.request,
+ DeepEquals,
+ tl[server.URL])
+
+}
+
+type TestHandlerError struct {
+}
+
+func (this *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+ http.Error(writer, "I'm a teapot", 418)
+}
+
+func sendTrashListError(c *C, server *httptest.Server) {
+ tl := map[string]TrashList{
+ server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+ kc := keepclient.KeepClient{Client: &http.Client{}}
+ kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+ map[string]string{"xxxx": server.URL},
+ map[string]string{})
+
+ err := SendTrashLists("", &kc, tl)
+
+ c.Check(err, NotNil)
+ c.Check(err[0], NotNil)
+}
+
+func (s *KeepSuite) TestSendTrashListErrorResponse(c *C) {
+ sendTrashListError(c, httptest.NewServer(&TestHandlerError{}))
+}
+
+func (s *KeepSuite) TestSendTrashListUnreachable(c *C) {
+ sendTrashListError(c, httptest.NewUnstartedServer(&TestHandler{}))
+}
serverHasBlock := map[string]struct{}{}
for _, info := range serversStoringBlock {
sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
- serverHasBlock[cs.Get(sa.HostPort())] = struct{}{}
+ serverHasBlock[cs.Get(sa.URL())] = struct{}{}
}
roots := keepclient.NewRootSorter(kc.LocalRoots(),
for _, host := range sortedServers {
// Strip the protocol portion of the url.
// Use the canonical copy of the string to avoid memory waste.
- server := cs.Get(RemoveProtocolPrefix(host))
+ server := cs.Get(host)
_, hasBlock := serverHasBlock[server]
if hasBlock {
// The from field should include the protocol.
pullLists map[string]PullList) {
r := strings.NewReplacer(":", ".")
for host, list := range pullLists {
- filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
+ filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
pullListFile, err := os.Create(filename)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
)
// Gocheck boilerplate
-func Test(t *testing.T) {
+func TestPullLists(t *testing.T) {
TestingT(t)
}
-type MySuite struct{}
+type PullSuite struct{}
-var _ = Suite(&MySuite{})
+var _ = Suite(&PullSuite{})
// Helper method to declare string sets more succinctly
// Could be placed somewhere more general.
return
}
-func (s *MySuite) TestPullListPrintsJSONCorrectly(c *C) {
+func (s *PullSuite) TestPullListPrintsJSONCorrectly(c *C) {
pl := PullList{PullRequest{
Locator: Locator(blockdigest.MakeTestDigestSpecifySize(0xBadBeef, 56789)),
Servers: []string{"keep0.qr1hi.arvadosapi.com:25107",
c.Check(string(b), Equals, expectedOutput)
}
-func (s *MySuite) TestCreatePullServers(c *C) {
+func (s *PullSuite) TestCreatePullServers(c *C) {
var cs CanonicalString
c.Check(
CreatePullServers(cs,
c.Check(
CreatePullServers(cs,
- stringSet("keep0:25107", "keep1:25108"),
+ stringSet("https://keep0:25107", "https://keep1:25108"),
stringSet(),
[]string{},
5),
c.Check(
CreatePullServers(cs,
- stringSet("keep0:25107", "keep1:25108"),
- stringSet("keep0:25107"),
- []string{"keep0:25107"},
+ stringSet("https://keep0:25107", "https://keep1:25108"),
+ stringSet("https://keep0:25107"),
+ []string{"https://keep0:25107"},
5),
DeepEquals,
- PullServers{To: []string{}, From: []string{"keep0:25107"}})
+ PullServers{To: []string{}, From: []string{"https://keep0:25107"}})
c.Check(
CreatePullServers(cs,
- stringSet("keep0:25107", "keep1:25108"),
- stringSet("keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"),
- []string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
+ stringSet("https://keep0:25107", "https://keep1:25108"),
+ stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
+ []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
5),
DeepEquals,
- PullServers{To: []string{"keep3:25110", "keep2:25109"},
- From: []string{"keep1:25108", "keep0:25107"}})
+ PullServers{To: []string{"https://keep3:25110", "https://keep2:25109"},
+ From: []string{"https://keep1:25108", "https://keep0:25107"}})
c.Check(
CreatePullServers(cs,
- stringSet("keep0:25107", "keep1:25108"),
- stringSet("keep3:25110", "keep1:25108", "keep0:25107"),
- []string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
+ stringSet("https://keep0:25107", "https://keep1:25108"),
+ stringSet("https://keep3:25110", "https://keep1:25108", "https://keep0:25107"),
+ []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
5),
DeepEquals,
- PullServers{To: []string{"keep3:25110"},
- From: []string{"keep1:25108", "keep0:25107"}})
+ PullServers{To: []string{"https://keep3:25110"},
+ From: []string{"https://keep1:25108", "https://keep0:25107"}})
c.Check(
CreatePullServers(cs,
- stringSet("keep0:25107", "keep1:25108"),
- stringSet("keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"),
- []string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
+ stringSet("https://keep0:25107", "https://keep1:25108"),
+ stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
+ []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
1),
DeepEquals,
- PullServers{To: []string{"keep3:25110"},
- From: []string{"keep1:25108", "keep0:25107"}})
+ PullServers{To: []string{"https://keep3:25110"},
+ From: []string{"https://keep1:25108", "https://keep0:25107"}})
c.Check(
CreatePullServers(cs,
- stringSet("keep0:25107", "keep1:25108"),
- stringSet("https://keep3:25110", "http://keep2:25109",
- "https://keep1:25108", "http://keep0:25107"),
- []string{"https://keep3:25110", "http://keep2:25109",
- "https://keep1:25108", "http://keep0:25107"},
+ stringSet("https://keep0:25107", "https://keep1:25108"),
+ stringSet("https://keep3:25110", "https://keep2:25109",
+ "https://keep1:25108", "https://keep0:25107"),
+ []string{"https://keep3:25110", "https://keep2:25109",
+ "https://keep1:25108", "https://keep0:25107"},
1),
DeepEquals,
- PullServers{To: []string{"keep3:25110"},
- From: []string{"https://keep1:25108", "http://keep0:25107"}})
+ PullServers{To: []string{"https://keep3:25110"},
+ From: []string{"https://keep1:25108", "https://keep0:25107"}})
c.Check(
CreatePullServers(cs,
- stringSet("keep0:25107", "keep1:25108"),
- stringSet("keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"),
- []string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
+ stringSet("https://keep0:25107", "https://keep1:25108"),
+ stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
+ []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
0),
DeepEquals,
PullServers{To: []string{},
- From: []string{"keep1:25108", "keep0:25107"}})
+ From: []string{"https://keep1:25108", "https://keep0:25107"}})
}
// Checks whether two pull list maps are equal. Since pull lists are
Params: []string{"obtained", "expected"},
}}
-func (s *MySuite) TestBuildPullLists(c *C) {
+func (s *PullSuite) TestBuildPullLists(c *C) {
c.Check(
BuildPullLists(map[Locator]PullServers{}),
PullListMapEquals,
},
})
}
-
-func (s *MySuite) TestRemoveProtocolPrefix(c *C) {
- c.Check(RemoveProtocolPrefix("blah"), Equals, "blah")
- c.Check(RemoveProtocolPrefix("bl/ah"), Equals, "ah")
- c.Check(RemoveProtocolPrefix("http://blah.com"), Equals, "blah.com")
- c.Check(RemoveProtocolPrefix("https://blah.com:8900"), Equals, "blah.com:8900")
-}
--- /dev/null
+// Code for generating trash lists
+package summary
+
+import (
+ "errors"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/services/datamanager/keep"
+ "time"
+)
+
+func BuildTrashLists(kc *keepclient.KeepClient,
+ keepServerInfo *keep.ReadServers,
+ keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) {
+
+ // Servers that are writeable
+ writableServers := map[string]struct{}{}
+ for _, url := range kc.WritableLocalRoots() {
+ writableServers[url] = struct{}{}
+ }
+
+ _ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
+ if err != nil {
+ return nil, errors.New(fmt.Sprintf("Failed to get blobSignatureTtl, can't build trash lists: %v", err))
+ }
+
+ ttl := int64(_ttl.(float64))
+
+ // expire unreferenced blocks more than "ttl" seconds old.
+ expiry := time.Now().UTC().Unix() - ttl
+
+ return buildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections), nil
+}
+
+func buildTrashListsInternal(writableServers map[string]struct{},
+ keepServerInfo *keep.ReadServers,
+ expiry int64,
+ keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
+
+ m = make(map[string]keep.TrashList)
+
+ for block := range keepBlocksNotInCollections {
+ for _, block_on_server := range keepServerInfo.BlockToServers[block] {
+ if block_on_server.Mtime >= expiry {
+ continue
+ }
+
+ // block is older than expire cutoff
+ srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
+
+ if _, writable := writableServers[srv]; !writable {
+ continue
+ }
+
+ m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
+ }
+ }
+ return
+
+}
--- /dev/null
+package summary
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/blockdigest"
+ "git.curoverse.com/arvados.git/services/datamanager/keep"
+ . "gopkg.in/check.v1"
+ "testing"
+)
+
+// Gocheck boilerplate
+func TestTrash(t *testing.T) {
+ TestingT(t)
+}
+
+type TrashSuite struct{}
+
+var _ = Suite(&TrashSuite{})
+
+func (s *TrashSuite) TestBuildTrashLists(c *C) {
+ var sv0 = keep.ServerAddress{Host: "keep0.example.com", Port: 80}
+ var sv1 = keep.ServerAddress{Host: "keep1.example.com", Port: 80}
+
+ var block0 = blockdigest.MakeTestDigestWithSize(0xdeadbeef)
+ var block1 = blockdigest.MakeTestDigestWithSize(0xfedbeef)
+
+ var keepServerInfo = keep.ReadServers{
+ KeepServerIndexToAddress: []keep.ServerAddress{sv0, sv1},
+ BlockToServers: map[blockdigest.DigestWithSize][]keep.BlockServerInfo{
+ block0: []keep.BlockServerInfo{
+ keep.BlockServerInfo{0, 99},
+ keep.BlockServerInfo{1, 101}},
+ block1: []keep.BlockServerInfo{
+ keep.BlockServerInfo{0, 99},
+ keep.BlockServerInfo{1, 101}}}}
+
+ // only block0 is in delete set
+ var bs BlockSet = make(BlockSet)
+ bs[block0] = struct{}{}
+
+ // Test trash list where only sv0 is on writable list.
+ c.Check(buildTrashListsInternal(
+ map[string]struct{}{
+ sv0.URL(): struct{}{}},
+ &keepServerInfo,
+ 110,
+ bs),
+ DeepEquals,
+ map[string]keep.TrashList{
+ "http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
+
+ // Test trash list where both sv0 and sv1 are on writable list.
+ c.Check(buildTrashListsInternal(
+ map[string]struct{}{
+ sv0.URL(): struct{}{},
+ sv1.URL(): struct{}{}},
+ &keepServerInfo,
+ 110,
+ bs),
+ DeepEquals,
+ map[string]keep.TrashList{
+ "http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}},
+ "http://keep1.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
+
+ // Test trash list where only block on sv0 is expired
+ c.Check(buildTrashListsInternal(
+ map[string]struct{}{
+ sv0.URL(): struct{}{},
+ sv1.URL(): struct{}{}},
+ &keepServerInfo,
+ 100,
+ bs),
+ DeepEquals,
+ map[string]keep.TrashList{
+ "http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
+
+}
blob_signature_ttl)
return
}
+
for _, volume := range KeepVM.AllWritable() {
mtime, err := volume.Mtime(trashRequest.Locator)
- if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+ if err != nil {
+ log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+ continue
+ }
+ if trashRequest.BlockMtime != mtime.Unix() {
+ log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
continue
}