"net/http"
"os"
"os/signal"
- "reflect"
"regexp"
- "strings"
"sync"
"syscall"
"time"
)
// Default TCP address on which to listen for requests.
-// Initialized by the -listen flag.
-const DEFAULT_ADDR = ":25107"
+// Override with -listen.
+const DefaultAddr = ":25107"
var listener net.Listener
pidfile string
)
- flagset := flag.NewFlagSet("default", flag.ExitOnError)
+ flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
flagset.StringVar(
&listen,
"listen",
- DEFAULT_ADDR,
+ DefaultAddr,
"Interface on which to listen for requests, in the format "+
"ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
"to listen on all network interfaces.")
log.Fatalf("Error setting up arvados client %s", err.Error())
}
+ if os.Getenv("ARVADOS_DEBUG") != "" {
+ keepclient.DebugPrintf = log.Printf
+ }
kc, err := keepclient.MakeKeepClient(&arv)
if err != nil {
log.Fatalf("Error setting up keep client %s", err.Error())
}
kc.Want_replicas = default_replicas
-
kc.Client.Timeout = time.Duration(timeout) * time.Second
+ go kc.RefreshServices(5*time.Minute, 3*time.Second)
listener, err = net.Listen("tcp", listen)
if err != nil {
log.Fatalf("Could not listen on %v", listen)
}
-
- go RefreshServicesList(kc)
+ log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
- log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
-
- // Start listening for requests.
+ // Start serving requests.
http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
log.Println("shutting down")
expireTime int64
}
-// Refresh the keep service list every five minutes.
-func RefreshServicesList(kc *keepclient.KeepClient) {
- var previousRoots = []map[string]string{}
- var delay time.Duration = 0
- for {
- time.Sleep(delay * time.Second)
- delay = 300
- if err := kc.DiscoverKeepServers(); err != nil {
- log.Println("Error retrieving services list:", err)
- delay = 3
- continue
- }
- newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
- if !reflect.DeepEqual(previousRoots, newRoots) {
- log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
- }
- if len(newRoots[0]) == 0 {
- log.Print("WARNING: No local services. Retrying in 3 seconds.")
- delay = 3
- }
- previousRoots = newRoots
- }
-}
-
// Cache the token and set an expire time. If we already have an expire time
// on the token, it is not updated.
func (this *ApiTokenCache) RememberToken(token string) {
}
func GetRemoteAddress(req *http.Request) string {
- if realip := req.Header.Get("X-Real-IP"); realip != "" {
- if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
- return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
- } else {
- return realip
- }
+ if xff := req.Header.Get("X-Forwarded-For"); xff != "" {
+ return xff + "," + req.RemoteAddr
}
return req.RemoteAddr
}
-func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
+func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
var auth string
if auth = req.Header.Get("Authorization"); auth == "" {
return false, ""
}
if cache.RecallToken(tok) {
- // Valid in the cache, short circut
+ // Valid in the cache, short circuit
return true, tok
}
rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
// List blocks whose hash has the given prefix
- rest.Handle(`/index/{prefix}`, IndexHandler{kc, t}).Methods("GET")
+ rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
}
if enable_put {
var pass bool
var tok string
- if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+ if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
status, err = http.StatusForbidden, BadAuthorizationHeader
return
}
log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
}
- switch err {
+ switch respErr := err.(type) {
case nil:
status = http.StatusOK
resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
err = ContentLengthMismatch
}
}
- case keepclient.BlockNotFound:
- status = http.StatusNotFound
+ case keepclient.Error:
+ if respErr == keepclient.BlockNotFound {
+ status = http.StatusNotFound
+ } else if respErr.Temporary() {
+ status = http.StatusBadGateway
+ } else {
+ status = 422
+ }
default:
- status = http.StatusBadGateway
+ status = http.StatusInternalServerError
}
}
kc := *this.KeepClient
var err error
- var expectLength int64 = -1
+ var expectLength int64
var status = http.StatusInternalServerError
var wroteReplicas int
var locatorOut string = "-"
locatorIn := mux.Vars(req)["locator"]
- if req.Header.Get("Content-Length") != "" {
- _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
- if err != nil {
- resp.Header().Set("Content-Length", fmt.Sprintf("%d", expectLength))
- }
-
- }
-
- if expectLength < 0 {
+ _, err = fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
+ if err != nil || expectLength < 0 {
err = LengthRequiredError
status = http.StatusLengthRequired
return
var pass bool
var tok string
- if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+ if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
err = BadAuthorizationHeader
status = http.StatusForbidden
return
if req.Header.Get("X-Keep-Desired-Replicas") != "" {
var r int
_, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
- if err != nil {
+ if err == nil {
kc.Want_replicas = r
}
}
}
}
-func (this IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+// ServeHTTP implementation for IndexHandler
+// Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}}
+// For each keep server found in LocalRoots:
+// Invokes GetIndex using keepclient
+// Expects "complete" response (terminating with blank new line)
+// Aborts on any errors
+// Concatenates responses from all those keep servers and returns
+func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
SetCorsHeaders(resp)
prefix := mux.Vars(req)["prefix"]
}
}()
- kc := *this.KeepClient
+ kc := *handler.KeepClient
- var pass bool
- var tok string
- if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+ ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req)
+ if !ok {
status, err = http.StatusForbidden, BadAuthorizationHeader
return
}
// Copy ArvadosClient struct and use the client's API token
arvclient := *kc.Arvados
- arvclient.ApiToken = tok
+ arvclient.ApiToken = token
kc.Arvados = &arvclient
- var indexResp []byte
- var reader io.Reader
-
- switch req.Method {
- case "GET":
- for uuid, _ := range kc.LocalRoots() {
- reader, err = kc.GetIndex(uuid, prefix)
- if err != nil {
- break
- }
-
- var readBytes []byte
- readBytes, err = ioutil.ReadAll(reader)
- if err != nil {
- break
- }
-
- // Got index; verify that it is complete
- if !strings.HasSuffix(string(readBytes), "\n\n") {
- err = errors.New("Got incomplete index")
- }
-
- indexResp = append(indexResp, (readBytes[0 : len(readBytes)-1])...)
- }
- indexResp = append(indexResp, ([]byte("\n"))...)
- default:
+ // Only GET method is supported
+ if req.Method != "GET" {
status, err = http.StatusNotImplemented, MethodNotSupported
return
}
- switch err {
- case nil:
- status = http.StatusOK
- resp.Header().Set("Content-Length", fmt.Sprint(len(indexResp)))
- _, err = resp.Write(indexResp)
- default:
- status = http.StatusBadGateway
+ // Get index from all LocalRoots and write to resp
+ var reader io.Reader
+ for uuid := range kc.LocalRoots() {
+ reader, err = kc.GetIndex(uuid, prefix)
+ if err != nil {
+ status = http.StatusBadGateway
+ return
+ }
+
+ _, err = io.Copy(resp, reader)
+ if err != nil {
+ status = http.StatusBadGateway
+ return
+ }
}
+
+ // Got index from all the keep servers and wrote to resp
+ status = http.StatusOK
+ resp.Write([]byte("\n"))
}