7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
9 "github.com/gorilla/mux"
23 // Default TCP address on which to listen for requests.
24 // Override with -listen.
25 const DefaultAddr = ":25107"
27 var listener net.Listener
39 flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
45 "Interface on which to listen for requests, in the format "+
46 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
47 "to listen on all network interfaces.")
53 "If set, disable GET operations")
59 "If set, disable PUT operations")
65 "Default number of replicas to write if not specified by the client.")
71 "Timeout on requests to internal Keep services (default 15 seconds)")
77 "Path to write pid file")
79 flagset.Parse(os.Args[1:])
81 arv, err := arvadosclient.MakeArvadosClient()
83 log.Fatalf("Error setting up arvados client %s", err.Error())
86 if os.Getenv("ARVADOS_DEBUG") != "" {
87 keepclient.DebugPrintf = log.Printf
89 kc, err := keepclient.MakeKeepClient(&arv)
91 log.Fatalf("Error setting up keep client %s", err.Error())
95 f, err := os.Create(pidfile)
97 log.Fatalf("Error writing pid file (%s): %s", pidfile, err.Error())
99 fmt.Fprint(f, os.Getpid())
101 defer os.Remove(pidfile)
104 kc.Want_replicas = default_replicas
105 kc.Client.Timeout = time.Duration(timeout) * time.Second
106 go kc.RefreshServices(5*time.Minute, 3*time.Second)
108 listener, err = net.Listen("tcp", listen)
110 log.Fatalf("Could not listen on %v", listen)
112 log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
114 // Shut down the server gracefully (by closing the listener)
115 // if SIGTERM is received.
116 term := make(chan os.Signal, 1)
117 go func(sig <-chan os.Signal) {
119 log.Println("caught signal:", s)
122 signal.Notify(term, syscall.SIGTERM)
123 signal.Notify(term, syscall.SIGINT)
125 // Start serving requests.
126 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
128 log.Println("shutting down")
131 type ApiTokenCache struct {
132 tokens map[string]int64
137 // Cache the token and set an expire time. If we already have an expire time
138 // on the token, it is not updated.
139 func (this *ApiTokenCache) RememberToken(token string) {
141 defer this.lock.Unlock()
143 now := time.Now().Unix()
144 if this.tokens[token] == 0 {
145 this.tokens[token] = now + this.expireTime
149 // Check if the cached token is known and still believed to be valid.
150 func (this *ApiTokenCache) RecallToken(token string) bool {
152 defer this.lock.Unlock()
154 now := time.Now().Unix()
155 if this.tokens[token] == 0 {
158 } else if now < this.tokens[token] {
159 // Token is known and still valid
163 this.tokens[token] = 0
168 func GetRemoteAddress(req *http.Request) string {
169 if xff := req.Header.Get("X-Forwarded-For"); xff != "" {
170 return xff + "," + req.RemoteAddr
172 return req.RemoteAddr
175 func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
177 if auth = req.Header.Get("Authorization"); auth == "" {
181 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
187 if cache.RecallToken(tok) {
188 // Valid in the cache, short circut
194 if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
195 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
199 // Success! Update cache
200 cache.RememberToken(tok)
205 type GetBlockHandler struct {
206 *keepclient.KeepClient
210 type PutBlockHandler struct {
211 *keepclient.KeepClient
215 type IndexHandler struct {
216 *keepclient.KeepClient
220 type InvalidPathHandler struct{}
222 type OptionsHandler struct{}
225 // Returns a mux.Router that passes GET and PUT requests to the
226 // appropriate handlers.
231 kc *keepclient.KeepClient) *mux.Router {
233 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
235 rest := mux.NewRouter()
238 rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
239 GetBlockHandler{kc, t}).Methods("GET", "HEAD")
240 rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
243 rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
245 // List blocks whose hash has the given prefix
246 rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
250 rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, PutBlockHandler{kc, t}).Methods("PUT")
251 rest.Handle(`/{locator:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
252 rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
253 rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
254 rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
257 rest.NotFoundHandler = InvalidPathHandler{}
262 func SetCorsHeaders(resp http.ResponseWriter) {
263 resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
264 resp.Header().Set("Access-Control-Allow-Origin", "*")
265 resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
266 resp.Header().Set("Access-Control-Max-Age", "86486400")
269 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
270 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
271 http.Error(resp, "Bad request", http.StatusBadRequest)
274 func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
275 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
279 var BadAuthorizationHeader = errors.New("Missing or invalid Authorization header")
280 var ContentLengthMismatch = errors.New("Actual length != expected content length")
281 var MethodNotSupported = errors.New("Method not supported")
283 var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)")
285 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
288 locator := mux.Vars(req)["locator"]
291 var expectLength, responseLength int64
295 log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
296 if status != http.StatusOK {
297 http.Error(resp, err.Error(), status)
301 kc := *this.KeepClient
305 if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
306 status, err = http.StatusForbidden, BadAuthorizationHeader
310 // Copy ArvadosClient struct and use the client's API token
311 arvclient := *kc.Arvados
312 arvclient.ApiToken = tok
313 kc.Arvados = &arvclient
315 var reader io.ReadCloser
317 locator = removeHint.ReplaceAllString(locator, "$1")
321 expectLength, proxiedURI, err = kc.Ask(locator)
323 reader, expectLength, proxiedURI, err = kc.Get(locator)
328 status, err = http.StatusNotImplemented, MethodNotSupported
332 if expectLength == -1 {
333 log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
336 switch respErr := err.(type) {
338 status = http.StatusOK
339 resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
344 responseLength, err = io.Copy(resp, reader)
345 if err == nil && expectLength > -1 && responseLength != expectLength {
346 err = ContentLengthMismatch
349 case keepclient.Error:
350 if respErr == keepclient.BlockNotFound {
351 status = http.StatusNotFound
352 } else if respErr.Temporary() {
353 status = http.StatusBadGateway
358 status = http.StatusInternalServerError
362 var LengthRequiredError = errors.New(http.StatusText(http.StatusLengthRequired))
363 var LengthMismatchError = errors.New("Locator size hint does not match Content-Length header")
365 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
368 kc := *this.KeepClient
370 var expectLength int64 = -1
371 var status = http.StatusInternalServerError
372 var wroteReplicas int
373 var locatorOut string = "-"
376 log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, kc.Want_replicas, wroteReplicas, locatorOut, err)
377 if status != http.StatusOK {
378 http.Error(resp, err.Error(), status)
382 locatorIn := mux.Vars(req)["locator"]
384 if req.Header.Get("Content-Length") != "" {
385 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
387 resp.Header().Set("Content-Length", fmt.Sprintf("%d", expectLength))
392 if expectLength < 0 {
393 err = LengthRequiredError
394 status = http.StatusLengthRequired
399 var loc *keepclient.Locator
400 if loc, err = keepclient.MakeLocator(locatorIn); err != nil {
401 status = http.StatusBadRequest
403 } else if loc.Size > 0 && int64(loc.Size) != expectLength {
404 err = LengthMismatchError
405 status = http.StatusBadRequest
412 if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
413 err = BadAuthorizationHeader
414 status = http.StatusForbidden
418 // Copy ArvadosClient struct and use the client's API token
419 arvclient := *kc.Arvados
420 arvclient.ApiToken = tok
421 kc.Arvados = &arvclient
423 // Check if the client specified the number of replicas
424 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
426 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
432 // Now try to put the block through
434 if bytes, err := ioutil.ReadAll(req.Body); err != nil {
435 err = errors.New(fmt.Sprintf("Error reading request body: %s", err))
436 status = http.StatusInternalServerError
439 locatorOut, wroteReplicas, err = kc.PutB(bytes)
442 locatorOut, wroteReplicas, err = kc.PutHR(locatorIn, req.Body, expectLength)
445 // Tell the client how many successful PUTs we accomplished
446 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", wroteReplicas))
450 status = http.StatusOK
451 _, err = io.WriteString(resp, locatorOut)
453 case keepclient.OversizeBlockError:
455 status = http.StatusRequestEntityTooLarge
457 case keepclient.InsufficientReplicasError:
458 if wroteReplicas > 0 {
459 // At least one write is considered success. The
460 // client can decide if getting less than the number of
461 // replications it asked for is a fatal error.
462 status = http.StatusOK
463 _, err = io.WriteString(resp, locatorOut)
465 status = http.StatusServiceUnavailable
469 status = http.StatusBadGateway
473 // ServeHTTP implementation for IndexHandler
474 // Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}}
475 // For each keep server found in LocalRoots:
476 // Invokes GetIndex using keepclient
477 // Expects "complete" response (terminating with blank new line)
478 // Aborts on any errors
479 // Concatenates responses from all those keep servers and returns
480 func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
483 prefix := mux.Vars(req)["prefix"]
488 if status != http.StatusOK {
489 http.Error(resp, err.Error(), status)
493 kc := *handler.KeepClient
495 ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req)
497 status, err = http.StatusForbidden, BadAuthorizationHeader
501 // Copy ArvadosClient struct and use the client's API token
502 arvclient := *kc.Arvados
503 arvclient.ApiToken = token
504 kc.Arvados = &arvclient
506 // Only GET method is supported
507 if req.Method != "GET" {
508 status, err = http.StatusNotImplemented, MethodNotSupported
512 // Get index from all LocalRoots and write to resp
514 for uuid := range kc.LocalRoots() {
515 reader, err = kc.GetIndex(uuid, prefix)
517 status = http.StatusBadGateway
521 _, err = io.Copy(resp, reader)
523 status = http.StatusBadGateway
528 // Got index from all the keep servers and wrote to resp
529 status = http.StatusOK
530 resp.Write([]byte("\n"))