"net/http"
"os"
"os/signal"
+ "reflect"
+ "regexp"
"sync"
"syscall"
"time"
// Refresh the keep service list every five minutes.
func RefreshServicesList(kc *keepclient.KeepClient) {
- previousRoots := ""
+ var previousRoots = []map[string]string{}
+ var delay time.Duration = 0
for {
+ time.Sleep(delay * time.Second)
+ delay = 300
if err := kc.DiscoverKeepServers(); err != nil {
log.Println("Error retrieving services list:", err)
- time.Sleep(3*time.Second)
- previousRoots = ""
- } else if len(kc.LocalRoots()) == 0 {
- log.Println("Received empty services list")
- time.Sleep(3*time.Second)
- previousRoots = ""
- } else {
- newRoots := fmt.Sprint("Locals ", kc.LocalRoots(), ", gateways ", kc.GatewayRoots())
- if newRoots != previousRoots {
- log.Println("Updated services list:", newRoots)
- previousRoots = newRoots
- }
- time.Sleep(300*time.Second)
+ delay = 3
+ continue
+ }
+ newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
+ if !reflect.DeepEqual(previousRoots, newRoots) {
+ log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
+ }
+ if len(newRoots[0]) == 0 {
+ log.Print("WARNING: No local services. Retrying in 3 seconds.")
+ delay = 3
}
+ previousRoots = newRoots
}
}
*ApiTokenCache
}
+type IndexHandler struct {
+ *keepclient.KeepClient
+ *ApiTokenCache
+}
+
type InvalidPathHandler struct{}
type OptionsHandler struct{}
rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
GetBlockHandler{kc, t}).Methods("GET", "HEAD")
rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+
+ // List all blocks
+ rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
+
+ // List blocks whose hash has the given prefix
+ rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
}
if enable_put {
var ContentLengthMismatch = errors.New("Actual length != expected content length")
var MethodNotSupported = errors.New("Method not supported")
+var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)")
+
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
SetCorsHeaders(resp)
var reader io.ReadCloser
+ locator = removeHint.ReplaceAllString(locator, "$1")
+
switch req.Method {
case "HEAD":
expectLength, proxiedURI, err = kc.Ask(locator)
log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
}
- switch err {
+ switch respErr := err.(type) {
case nil:
status = http.StatusOK
resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
err = ContentLengthMismatch
}
}
- case keepclient.BlockNotFound:
- status = http.StatusNotFound
+ case keepclient.Error:
+ if respErr == keepclient.BlockNotFound {
+ status = http.StatusNotFound
+ } else if respErr.Temporary() {
+ status = http.StatusBadGateway
+ } else {
+ status = 422
+ }
default:
- status = http.StatusBadGateway
+ status = http.StatusInternalServerError
}
}
return
}
- if loc := keepclient.MakeLocator(locatorIn); loc.Size > 0 && int64(loc.Size) != expectLength {
- err = LengthMismatchError
- status = http.StatusBadRequest
- return
+ if locatorIn != "" {
+ var loc *keepclient.Locator
+ if loc, err = keepclient.MakeLocator(locatorIn); err != nil {
+ status = http.StatusBadRequest
+ return
+ } else if loc.Size > 0 && int64(loc.Size) != expectLength {
+ err = LengthMismatchError
+ status = http.StatusBadRequest
+ return
+ }
}
var pass bool
status = http.StatusBadGateway
}
}
+
+// ServeHTTP implementation for IndexHandler
+// Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}}
+// For each keep server found in LocalRoots:
+// Invokes GetIndex using keepclient
+// Expects "complete" response (terminating with blank new line)
+// Aborts on any errors
+// Concatenates responses from all those keep servers and returns
+func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ SetCorsHeaders(resp)
+
+ prefix := mux.Vars(req)["prefix"]
+ var err error
+ var status int
+
+ defer func() {
+ if status != http.StatusOK {
+ http.Error(resp, err.Error(), status)
+ }
+ }()
+
+ kc := *handler.KeepClient
+
+ ok, token := CheckAuthorizationHeader(kc, handler.ApiTokenCache, req)
+ if !ok {
+ status, err = http.StatusForbidden, BadAuthorizationHeader
+ return
+ }
+
+ // Copy ArvadosClient struct and use the client's API token
+ arvclient := *kc.Arvados
+ arvclient.ApiToken = token
+ kc.Arvados = &arvclient
+
+ // Only GET method is supported
+ if req.Method != "GET" {
+ status, err = http.StatusNotImplemented, MethodNotSupported
+ return
+ }
+
+ // Get index from all LocalRoots and write to resp
+ var reader io.Reader
+ for uuid := range kc.LocalRoots() {
+ reader, err = kc.GetIndex(uuid, prefix)
+ if err != nil {
+ status = http.StatusBadGateway
+ return
+ }
+
+ _, err = io.Copy(resp, reader)
+ if err != nil {
+ status = http.StatusBadGateway
+ return
+ }
+ }
+
+ // Got index from all the keep servers and wrote to resp
+ status = http.StatusOK
+ resp.Write([]byte("\n"))
+}