7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
9 "github.com/gorilla/mux"
24 // Default TCP address on which to listen for requests.
25 // Initialized by the -listen flag.
26 const DEFAULT_ADDR = ":25107"
28 var listener net.Listener
40 flagset := flag.NewFlagSet("default", flag.ExitOnError)
46 "Interface on which to listen for requests, in the format "+
47 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
48 "to listen on all network interfaces.")
54 "If set, disable GET operations")
60 "If set, disable PUT operations")
66 "Default number of replicas to write if not specified by the client.")
72 "Timeout on requests to internal Keep services (default 15 seconds)")
78 "Path to write pid file")
80 flagset.Parse(os.Args[1:])
82 arv, err := arvadosclient.MakeArvadosClient()
84 log.Fatalf("Error setting up arvados client %s", err.Error())
87 kc, err := keepclient.MakeKeepClient(&arv)
89 log.Fatalf("Error setting up keep client %s", err.Error())
93 f, err := os.Create(pidfile)
95 log.Fatalf("Error writing pid file (%s): %s", pidfile, err.Error())
97 fmt.Fprint(f, os.Getpid())
99 defer os.Remove(pidfile)
102 kc.Want_replicas = default_replicas
104 kc.Client.Timeout = time.Duration(timeout) * time.Second
106 listener, err = net.Listen("tcp", listen)
108 log.Fatalf("Could not listen on %v", listen)
111 go RefreshServicesList(kc)
113 // Shut down the server gracefully (by closing the listener)
114 // if SIGTERM is received.
115 term := make(chan os.Signal, 1)
116 go func(sig <-chan os.Signal) {
118 log.Println("caught signal:", s)
121 signal.Notify(term, syscall.SIGTERM)
122 signal.Notify(term, syscall.SIGINT)
124 log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
126 // Start listening for requests.
127 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
129 log.Println("shutting down")
132 type ApiTokenCache struct {
133 tokens map[string]int64
138 // Refresh the keep service list every five minutes.
139 func RefreshServicesList(kc *keepclient.KeepClient) {
140 var previousRoots = []map[string]string{}
141 var delay time.Duration = 0
143 time.Sleep(delay * time.Second)
145 if err := kc.DiscoverKeepServers(); err != nil {
146 log.Println("Error retrieving services list:", err)
150 newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
151 if !reflect.DeepEqual(previousRoots, newRoots) {
152 log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
154 if len(newRoots[0]) == 0 {
155 log.Print("WARNING: No local services. Retrying in 3 seconds.")
158 previousRoots = newRoots
162 // Cache the token and set an expire time. If we already have an expire time
163 // on the token, it is not updated.
164 func (this *ApiTokenCache) RememberToken(token string) {
166 defer this.lock.Unlock()
168 now := time.Now().Unix()
169 if this.tokens[token] == 0 {
170 this.tokens[token] = now + this.expireTime
174 // Check if the cached token is known and still believed to be valid.
175 func (this *ApiTokenCache) RecallToken(token string) bool {
177 defer this.lock.Unlock()
179 now := time.Now().Unix()
180 if this.tokens[token] == 0 {
183 } else if now < this.tokens[token] {
184 // Token is known and still valid
188 this.tokens[token] = 0
193 func GetRemoteAddress(req *http.Request) string {
194 if realip := req.Header.Get("X-Real-IP"); realip != "" {
195 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
196 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
201 return req.RemoteAddr
204 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
206 if auth = req.Header.Get("Authorization"); auth == "" {
210 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
216 if cache.RecallToken(tok) {
217 // Valid in the cache, short circut
223 if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
224 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
228 // Success! Update cache
229 cache.RememberToken(tok)
234 type GetBlockHandler struct {
235 *keepclient.KeepClient
239 type PutBlockHandler struct {
240 *keepclient.KeepClient
244 type IndexHandler struct {
245 *keepclient.KeepClient
249 type InvalidPathHandler struct{}
251 type OptionsHandler struct{}
254 // Returns a mux.Router that passes GET and PUT requests to the
255 // appropriate handlers.
260 kc *keepclient.KeepClient) *mux.Router {
262 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
264 rest := mux.NewRouter()
267 rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
268 GetBlockHandler{kc, t}).Methods("GET", "HEAD")
269 rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
272 rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
274 // List blocks whose hash has the given prefix
275 rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
279 rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, PutBlockHandler{kc, t}).Methods("PUT")
280 rest.Handle(`/{locator:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
281 rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
282 rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
283 rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
286 rest.NotFoundHandler = InvalidPathHandler{}
291 func SetCorsHeaders(resp http.ResponseWriter) {
292 resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
293 resp.Header().Set("Access-Control-Allow-Origin", "*")
294 resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
295 resp.Header().Set("Access-Control-Max-Age", "86486400")
298 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
299 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
300 http.Error(resp, "Bad request", http.StatusBadRequest)
303 func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
304 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
308 var BadAuthorizationHeader = errors.New("Missing or invalid Authorization header")
309 var ContentLengthMismatch = errors.New("Actual length != expected content length")
310 var MethodNotSupported = errors.New("Method not supported")
312 var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)")
314 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
317 locator := mux.Vars(req)["locator"]
320 var expectLength, responseLength int64
324 log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
325 if status != http.StatusOK {
326 http.Error(resp, err.Error(), status)
330 kc := *this.KeepClient
334 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
335 status, err = http.StatusForbidden, BadAuthorizationHeader
339 // Copy ArvadosClient struct and use the client's API token
340 arvclient := *kc.Arvados
341 arvclient.ApiToken = tok
342 kc.Arvados = &arvclient
344 var reader io.ReadCloser
346 locator = removeHint.ReplaceAllString(locator, "$1")
350 expectLength, proxiedURI, err = kc.Ask(locator)
352 reader, expectLength, proxiedURI, err = kc.Get(locator)
357 status, err = http.StatusNotImplemented, MethodNotSupported
361 if expectLength == -1 {
362 log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
365 switch respErr := err.(type) {
367 status = http.StatusOK
368 resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
373 responseLength, err = io.Copy(resp, reader)
374 if err == nil && expectLength > -1 && responseLength != expectLength {
375 err = ContentLengthMismatch
378 case keepclient.Error:
379 if respErr == keepclient.BlockNotFound {
380 status = http.StatusNotFound
381 } else if respErr.Temporary() {
382 status = http.StatusBadGateway
387 status = http.StatusInternalServerError
391 var LengthRequiredError = errors.New(http.StatusText(http.StatusLengthRequired))
392 var LengthMismatchError = errors.New("Locator size hint does not match Content-Length header")
394 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
397 kc := *this.KeepClient
399 var expectLength int64 = -1
400 var status = http.StatusInternalServerError
401 var wroteReplicas int
402 var locatorOut string = "-"
405 log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, kc.Want_replicas, wroteReplicas, locatorOut, err)
406 if status != http.StatusOK {
407 http.Error(resp, err.Error(), status)
411 locatorIn := mux.Vars(req)["locator"]
413 if req.Header.Get("Content-Length") != "" {
414 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
416 resp.Header().Set("Content-Length", fmt.Sprintf("%d", expectLength))
421 if expectLength < 0 {
422 err = LengthRequiredError
423 status = http.StatusLengthRequired
428 var loc *keepclient.Locator
429 if loc, err = keepclient.MakeLocator(locatorIn); err != nil {
430 status = http.StatusBadRequest
432 } else if loc.Size > 0 && int64(loc.Size) != expectLength {
433 err = LengthMismatchError
434 status = http.StatusBadRequest
441 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
442 err = BadAuthorizationHeader
443 status = http.StatusForbidden
447 // Copy ArvadosClient struct and use the client's API token
448 arvclient := *kc.Arvados
449 arvclient.ApiToken = tok
450 kc.Arvados = &arvclient
452 // Check if the client specified the number of replicas
453 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
455 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
461 // Now try to put the block through
463 if bytes, err := ioutil.ReadAll(req.Body); err != nil {
464 err = errors.New(fmt.Sprintf("Error reading request body: %s", err))
465 status = http.StatusInternalServerError
468 locatorOut, wroteReplicas, err = kc.PutB(bytes)
471 locatorOut, wroteReplicas, err = kc.PutHR(locatorIn, req.Body, expectLength)
474 // Tell the client how many successful PUTs we accomplished
475 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", wroteReplicas))
479 status = http.StatusOK
480 _, err = io.WriteString(resp, locatorOut)
482 case keepclient.OversizeBlockError:
484 status = http.StatusRequestEntityTooLarge
486 case keepclient.InsufficientReplicasError:
487 if wroteReplicas > 0 {
488 // At least one write is considered success. The
489 // client can decide if getting less than the number of
490 // replications it asked for is a fatal error.
491 status = http.StatusOK
492 _, err = io.WriteString(resp, locatorOut)
494 status = http.StatusServiceUnavailable
498 status = http.StatusBadGateway
502 // ServeHTTP implementation for IndexHandler
503 // Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}}
504 // For each keep server found in LocalRoots:
505 // Invokes GetIndex using keepclient
506 // Expects "complete" response (terminating with blank new line)
507 // Aborts on any errors
508 // Concatenates responses from all those keep servers and returns
509 func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
512 prefix := mux.Vars(req)["prefix"]
517 if status != http.StatusOK {
518 http.Error(resp, err.Error(), status)
522 kc := *handler.KeepClient
524 ok, token := CheckAuthorizationHeader(kc, handler.ApiTokenCache, req)
526 status, err = http.StatusForbidden, BadAuthorizationHeader
530 // Copy ArvadosClient struct and use the client's API token
531 arvclient := *kc.Arvados
532 arvclient.ApiToken = token
533 kc.Arvados = &arvclient
535 // Only GET method is supported
536 if req.Method != "GET" {
537 status, err = http.StatusNotImplemented, MethodNotSupported
541 // Get index from all LocalRoots and write to resp
543 for uuid := range kc.LocalRoots() {
544 reader, err = kc.GetIndex(uuid, prefix)
546 status = http.StatusBadGateway
550 _, err = io.Copy(resp, reader)
552 status = http.StatusBadGateway
557 // Got index from all the keep servers and wrote to resp
558 status = http.StatusOK
559 resp.Write([]byte("\n"))