7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
9 "github.com/gorilla/mux"
24 // Default TCP address on which to listen for requests.
25 // Initialized by the -listen flag.
26 const DEFAULT_ADDR = ":25107"
28 var listener net.Listener
40 flagset := flag.NewFlagSet("default", flag.ExitOnError)
46 "Interface on which to listen for requests, in the format "+
47 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
48 "to listen on all network interfaces.")
54 "If set, disable GET operations")
60 "If set, disable PUT operations")
66 "Default number of replicas to write if not specified by the client.")
72 "Timeout on requests to internal Keep services (default 15 seconds)")
78 "Path to write pid file")
80 flagset.Parse(os.Args[1:])
82 arv, err := arvadosclient.MakeArvadosClient()
84 log.Fatalf("Error setting up arvados client %s", err.Error())
87 kc, err := keepclient.MakeKeepClient(&arv)
89 log.Fatalf("Error setting up keep client %s", err.Error())
93 f, err := os.Create(pidfile)
95 log.Fatalf("Error writing pid file (%s): %s", pidfile, err.Error())
97 fmt.Fprint(f, os.Getpid())
99 defer os.Remove(pidfile)
102 kc.Want_replicas = default_replicas
104 kc.Client.Timeout = time.Duration(timeout) * time.Second
106 listener, err = net.Listen("tcp", listen)
108 log.Fatalf("Could not listen on %v", listen)
111 go RefreshServicesList(kc)
113 // Shut down the server gracefully (by closing the listener)
114 // if SIGTERM is received.
115 term := make(chan os.Signal, 1)
116 go func(sig <-chan os.Signal) {
118 log.Println("caught signal:", s)
121 signal.Notify(term, syscall.SIGTERM)
122 signal.Notify(term, syscall.SIGINT)
124 log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
126 // Start listening for requests.
127 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
129 log.Println("shutting down")
132 type ApiTokenCache struct {
133 tokens map[string]int64
138 // Refresh the keep service list every five minutes.
139 func RefreshServicesList(kc *keepclient.KeepClient) {
140 var previousRoots = []map[string]string{}
141 var delay time.Duration = 0
143 time.Sleep(delay * time.Second)
145 if err := kc.DiscoverKeepServers(); err != nil {
146 log.Println("Error retrieving services list:", err)
150 newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
151 if !reflect.DeepEqual(previousRoots, newRoots) {
152 log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
154 if len(newRoots[0]) == 0 {
155 log.Print("WARNING: No local services. Retrying in 3 seconds.")
158 previousRoots = newRoots
162 // Cache the token and set an expire time. If we already have an expire time
163 // on the token, it is not updated.
164 func (this *ApiTokenCache) RememberToken(token string) {
166 defer this.lock.Unlock()
168 now := time.Now().Unix()
169 if this.tokens[token] == 0 {
170 this.tokens[token] = now + this.expireTime
174 // Check if the cached token is known and still believed to be valid.
175 func (this *ApiTokenCache) RecallToken(token string) bool {
177 defer this.lock.Unlock()
179 now := time.Now().Unix()
180 if this.tokens[token] == 0 {
183 } else if now < this.tokens[token] {
184 // Token is known and still valid
188 this.tokens[token] = 0
193 func GetRemoteAddress(req *http.Request) string {
194 if realip := req.Header.Get("X-Real-IP"); realip != "" {
195 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
196 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
201 return req.RemoteAddr
204 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
206 if auth = req.Header.Get("Authorization"); auth == "" {
210 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
216 if cache.RecallToken(tok) {
217 // Valid in the cache, short circut
223 if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
224 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
228 // Success! Update cache
229 cache.RememberToken(tok)
234 type GetBlockHandler struct {
235 *keepclient.KeepClient
239 type PutBlockHandler struct {
240 *keepclient.KeepClient
244 type IndexHandler struct {
245 *keepclient.KeepClient
249 type InvalidPathHandler struct{}
251 type OptionsHandler struct{}
254 // Returns a mux.Router that passes GET and PUT requests to the
255 // appropriate handlers.
260 kc *keepclient.KeepClient) *mux.Router {
262 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
264 rest := mux.NewRouter()
267 rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
268 GetBlockHandler{kc, t}).Methods("GET", "HEAD")
269 rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
272 rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
274 // List blocks whose hash has the given prefix
275 rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
279 rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, PutBlockHandler{kc, t}).Methods("PUT")
280 rest.Handle(`/{locator:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
281 rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
282 rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
283 rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
286 rest.NotFoundHandler = InvalidPathHandler{}
291 func SetCorsHeaders(resp http.ResponseWriter) {
292 resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
293 resp.Header().Set("Access-Control-Allow-Origin", "*")
294 resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
295 resp.Header().Set("Access-Control-Max-Age", "86486400")
298 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
299 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
300 http.Error(resp, "Bad request", http.StatusBadRequest)
303 func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
304 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
308 var BadAuthorizationHeader = errors.New("Missing or invalid Authorization header")
309 var ContentLengthMismatch = errors.New("Actual length != expected content length")
310 var MethodNotSupported = errors.New("Method not supported")
312 var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)")
314 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
317 locator := mux.Vars(req)["locator"]
320 var expectLength, responseLength int64
324 log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
325 if status != http.StatusOK {
326 http.Error(resp, err.Error(), status)
330 kc := *this.KeepClient
334 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
335 status, err = http.StatusForbidden, BadAuthorizationHeader
339 // Copy ArvadosClient struct and use the client's API token
340 arvclient := *kc.Arvados
341 arvclient.ApiToken = tok
342 kc.Arvados = &arvclient
344 var reader io.ReadCloser
346 locator = removeHint.ReplaceAllString(locator, "$1")
350 expectLength, proxiedURI, err = kc.Ask(locator)
352 reader, expectLength, proxiedURI, err = kc.Get(locator)
357 status, err = http.StatusNotImplemented, MethodNotSupported
361 if expectLength == -1 {
362 log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
367 status = http.StatusOK
368 resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
373 responseLength, err = io.Copy(resp, reader)
374 if err == nil && expectLength > -1 && responseLength != expectLength {
375 err = ContentLengthMismatch
378 case keepclient.BlockNotFound:
379 status = http.StatusNotFound
381 status = http.StatusBadGateway
385 var LengthRequiredError = errors.New(http.StatusText(http.StatusLengthRequired))
386 var LengthMismatchError = errors.New("Locator size hint does not match Content-Length header")
388 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
391 kc := *this.KeepClient
393 var expectLength int64 = -1
394 var status = http.StatusInternalServerError
395 var wroteReplicas int
396 var locatorOut string = "-"
399 log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, kc.Want_replicas, wroteReplicas, locatorOut, err)
400 if status != http.StatusOK {
401 http.Error(resp, err.Error(), status)
405 locatorIn := mux.Vars(req)["locator"]
407 if req.Header.Get("Content-Length") != "" {
408 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
410 resp.Header().Set("Content-Length", fmt.Sprintf("%d", expectLength))
415 if expectLength < 0 {
416 err = LengthRequiredError
417 status = http.StatusLengthRequired
422 var loc *keepclient.Locator
423 if loc, err = keepclient.MakeLocator(locatorIn); err != nil {
424 status = http.StatusBadRequest
426 } else if loc.Size > 0 && int64(loc.Size) != expectLength {
427 err = LengthMismatchError
428 status = http.StatusBadRequest
435 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
436 err = BadAuthorizationHeader
437 status = http.StatusForbidden
441 // Copy ArvadosClient struct and use the client's API token
442 arvclient := *kc.Arvados
443 arvclient.ApiToken = tok
444 kc.Arvados = &arvclient
446 // Check if the client specified the number of replicas
447 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
449 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
455 // Now try to put the block through
457 if bytes, err := ioutil.ReadAll(req.Body); err != nil {
458 err = errors.New(fmt.Sprintf("Error reading request body: %s", err))
459 status = http.StatusInternalServerError
462 locatorOut, wroteReplicas, err = kc.PutB(bytes)
465 locatorOut, wroteReplicas, err = kc.PutHR(locatorIn, req.Body, expectLength)
468 // Tell the client how many successful PUTs we accomplished
469 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", wroteReplicas))
473 status = http.StatusOK
474 _, err = io.WriteString(resp, locatorOut)
476 case keepclient.OversizeBlockError:
478 status = http.StatusRequestEntityTooLarge
480 case keepclient.InsufficientReplicasError:
481 if wroteReplicas > 0 {
482 // At least one write is considered success. The
483 // client can decide if getting less than the number of
484 // replications it asked for is a fatal error.
485 status = http.StatusOK
486 _, err = io.WriteString(resp, locatorOut)
488 status = http.StatusServiceUnavailable
492 status = http.StatusBadGateway
496 // ServeHTTP implementation for IndexHandler
497 // Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}}
498 // For each keep server found in LocalRoots:
499 // Invokes GetIndex using keepclient
500 // Expects "complete" response (terminating with blank new line)
501 // Aborts on any errors
502 // Concatenates responses from all those keep servers and returns
503 func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
506 prefix := mux.Vars(req)["prefix"]
511 if status != http.StatusOK {
512 http.Error(resp, err.Error(), status)
516 kc := *handler.KeepClient
518 ok, token := CheckAuthorizationHeader(kc, handler.ApiTokenCache, req)
520 status, err = http.StatusForbidden, BadAuthorizationHeader
524 // Copy ArvadosClient struct and use the client's API token
525 arvclient := *kc.Arvados
526 arvclient.ApiToken = token
527 kc.Arvados = &arvclient
529 // Only GET method is supported
530 if req.Method != "GET" {
531 status, err = http.StatusNotImplemented, MethodNotSupported
535 // Get index from all LocalRoots and write to resp
537 for uuid := range kc.LocalRoots() {
538 reader, err = kc.GetIndex(uuid, prefix)
540 status = http.StatusBadGateway
544 _, err = io.Copy(resp, reader)
546 status = http.StatusBadGateway
551 // Got index from all the keep servers and wrote to resp
552 status = http.StatusOK
553 resp.Write([]byte("\n"))