// Code for generating trash lists
+
package summary
import (
- "encoding/json"
+ "errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/services/datamanager/keep"
- "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
- "log"
- "os"
- "strings"
"time"
)
-type TrashRequest struct {
- Locator string `json:"locator"`
- BlockMtime int64 `json:"block_mtime"`
-}
-
-type TrashList []TrashRequest
-
+// BuildTrashLists builds list of blocks to be sent to trash queue
func BuildTrashLists(kc *keepclient.KeepClient,
keepServerInfo *keep.ReadServers,
- keepBlocksNotInCollections BlockSet) (m map[string]TrashList) {
+ keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) {
- m = make(map[string]TrashList)
+ // Servers that are writeable
+ writableServers := map[string]struct{}{}
+ for _, url := range kc.WritableLocalRoots() {
+ writableServers[url] = struct{}{}
+ }
_ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
if err != nil {
- log.Printf("Failed to get blobSignatureTtl: %v", err)
- return
+ return nil, errors.New(fmt.Sprintf("Failed to get blobSignatureTtl, can't build trash lists: %v", err))
}
ttl := int64(_ttl.(float64))
// expire unreferenced blocks more than "ttl" seconds old.
expiry := time.Now().UTC().Unix() - ttl
- for block, _ := range keepBlocksNotInCollections {
- for _, block_on_server := range keepServerInfo.BlockToServers[block] {
- if block_on_server.Mtime < expiry {
- // block is older than expire cutoff
- srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
- m[srv] = append(m[srv], TrashRequest{Locator: block.String(), BlockMtime: block_on_server.Mtime})
- }
- }
- }
- return
+ return buildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections), nil
}
-// Writes each pull list to a file.
-// The filename is based on the hostname.
-//
-// This is just a hack for prototyping, it is not expected to be used
-// in production.
-func WriteTrashLists(arvLogger *logger.Logger,
- trashLists map[string]TrashList) {
- r := strings.NewReplacer(":", ".")
- for host, list := range trashLists {
- filename := fmt.Sprintf("trash_list.%s", r.Replace(host))
- trashListFile, err := os.Create(filename)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to open %s: %v", filename, err))
- }
- defer trashListFile.Close()
+func buildTrashListsInternal(writableServers map[string]struct{},
+ keepServerInfo *keep.ReadServers,
+ expiry int64,
+ keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
+
+ m = make(map[string]keep.TrashList)
+
+ for block := range keepBlocksNotInCollections {
+ for _, blockOnServer := range keepServerInfo.BlockToServers[block] {
+ if blockOnServer.Mtime >= expiry {
+ continue
+ }
+
+ // block is older than expire cutoff
+ srv := keepServerInfo.KeepServerIndexToAddress[blockOnServer.ServerIndex].String()
- enc := json.NewEncoder(trashListFile)
- err = enc.Encode(list)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to write trash list to %s: %v", filename, err))
+ if _, writable := writableServers[srv]; !writable {
+ continue
+ }
+
+ m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: blockOnServer.Mtime})
}
- log.Printf("Wrote trash list to %s.", filename)
}
+ return
+
}