"time"
)
+// ServerAddress struct
type ServerAddress struct {
- SSL bool `json:service_ssl_flag`
- Host string `json:"service_host"`
- Port int `json:"service_port"`
- Uuid string `json:"uuid"`
+ SSL bool `json:service_ssl_flag`
+ Host string `json:"service_host"`
+ Port int `json:"service_port"`
+ UUID string `json:"uuid"`
+ ServiceType string `json:"service_type"`
}
-// Info about a particular block returned by the server
+// BlockInfo is info about a particular block returned by the server
type BlockInfo struct {
Digest blockdigest.DigestWithSize
Mtime int64 // TODO(misha): Replace this with a timestamp.
}
-// Info about a specified block given by a server
+// BlockServerInfo is info about a specified block given by a server
type BlockServerInfo struct {
ServerIndex int
Mtime int64 // TODO(misha): Replace this with a timestamp.
}
+// ServerContents struct
type ServerContents struct {
BlockDigestToInfo map[blockdigest.DigestWithSize]BlockInfo
}
+// ServerResponse struct
type ServerResponse struct {
Address ServerAddress
Contents ServerContents
}
+// ReadServers struct
type ReadServers struct {
ReadAllServers bool
KeepServerIndexToAddress []ServerAddress
BlockReplicationCounts map[int]int
}
+// GetKeepServersParams struct
type GetKeepServersParams struct {
Client arvadosclient.ArvadosClient
Logger *logger.Logger
Limit int
}
-type KeepServiceList struct {
+// ServiceList consists of the addresses of all the available kee servers
+type ServiceList struct {
ItemsAvailable int `json:"items_available"`
KeepServers []ServerAddress `json:"items"`
}
+// String
// TODO(misha): Change this to include the UUID as well.
func (s ServerAddress) String() string {
return s.URL()
}
+// URL of the keep server
func (s ServerAddress) URL() string {
if s.SSL {
return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
- } else {
- return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
}
+ return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
}
+// GetKeepServersAndSummarize gets keep servers from api
func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) {
results = GetKeepServers(params)
log.Printf("Returned %d keep disks", len(results.ServerToContents))
return
}
+// GetKeepServers from api server
func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
sdkParams := arvadosclient.Dict{
- "filters": [][]string{[]string{"service_type", "=", "disk"}},
+ "filters": [][]string{[]string{"service_type", "!=", "proxy"}},
}
if params.Limit > 0 {
sdkParams["limit"] = params.Limit
}
- var sdkResponse KeepServiceList
+ var sdkResponse ServiceList
err := params.Client.List("keep_services", sdkParams, &sdkResponse)
if err != nil {
fmt.Sprintf("Error requesting keep disks from API server: %v", err))
}
+ // Currently, only "disk" types are supported. Stop if any other service types are found.
+ for _, server := range sdkResponse.KeepServers {
+ if server.ServiceType != "disk" {
+ loggerutil.FatalWithMessage(params.Logger,
+ fmt.Sprintf("Unsupported service type %q found for: %v", server.ServiceType, server))
+ }
+ }
+
if params.Logger != nil {
params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
keepInfo := logger.GetOrCreateMap(p, "keep_info")
return
}
+// GetServerContents of the keep server
func GetServerContents(arvLogger *logger.Logger,
keepServer ServerAddress,
arv arvadosclient.ArvadosClient) (response ServerResponse) {
return ReadServerResponse(arvLogger, keepServer, resp)
}
+// GetServerStatus get keep server status by invoking /status.json
func GetServerStatus(arvLogger *logger.Logger,
keepServer ServerAddress,
arv arvadosclient.ArvadosClient) {
serverInfo["host"] = keepServer.Host
serverInfo["port"] = keepServer.Port
- keepInfo[keepServer.Uuid] = serverInfo
+ keepInfo[keepServer.UUID] = serverInfo
})
}
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+ serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
serverInfo["status_response_processed_at"] = now
serverInfo["status"] = keepStatus
})
}
}
+// CreateIndexRequest to the keep server
func CreateIndexRequest(arvLogger *logger.Logger,
keepServer ServerAddress,
arv arvadosclient.ArvadosClient) (req *http.Request) {
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+ serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
serverInfo["index_request_sent_at"] = now
})
}
fmt.Sprintf("Error building http request for %s: %v", url, err))
}
- req.Header.Add("Authorization", "OAuth2 " + arv.ApiToken)
+ req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken)
return
}
+// ReadServerResponse reads reasponse from keep server
func ReadServerResponse(arvLogger *logger.Logger,
keepServer ServerAddress,
resp *http.Response) (response ServerResponse) {
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+ serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
serverInfo["index_response_received_at"] = now
})
}
if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
// This server returned multiple lines containing the same block digest.
- numDuplicates += 1
+ numDuplicates++
// Keep the block that's newer.
if storedBlock.Mtime < blockInfo.Mtime {
response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+ serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
serverInfo["processing_finished_at"] = now
serverInfo["lines_received"] = numLines
return
}
+// Summarize results from keep server
func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
readServers.BlockReplicationCounts = make(map[int]int)
for _, infos := range readServers.BlockToServers {
replication := len(infos)
- readServers.BlockReplicationCounts[replication] += 1
+ readServers.BlockReplicationCounts[replication]++
}
if arvLogger != nil {
keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
})
}
-
}
+// TrashRequest struct
type TrashRequest struct {
Locator string `json:"locator"`
BlockMtime int64 `json:"block_mtime"`
}
+// TrashList is an array of TrashRequest objects
type TrashList []TrashRequest
+// SendTrashLists to trash queue
func SendTrashLists(kc *keepclient.KeepClient, spl map[string]TrashList) (errs []error) {
count := 0
barrier := make(chan error)
client := kc.Client
for url, v := range spl {
- count += 1
+ count++
log.Printf("Sending trash list to %v", url)
go (func(url string, v TrashList) {
return
}
- req.Header.Add("Authorization", "OAuth2 " + kc.Arvados.ApiToken)
+ req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
// Make the request
var resp *http.Response
}
- for i := 0; i < count; i += 1 {
+ for i := 0; i < count; i++ {
b := <-barrier
if b != nil {
errs = append(errs, b)