7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
9 "github.com/gorilla/mux"
23 // Default TCP address on which to listen for requests.
24 // Override with -listen.
25 const DefaultAddr = ":25107"
27 var listener net.Listener
39 flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
45 "Interface on which to listen for requests, in the format "+
46 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
47 "to listen on all network interfaces.")
53 "If set, disable GET operations")
59 "If set, disable PUT operations")
65 "Default number of replicas to write if not specified by the client.")
71 "Timeout on requests to internal Keep services (default 15 seconds)")
77 "Path to write pid file")
79 flagset.Parse(os.Args[1:])
81 arv, err := arvadosclient.MakeArvadosClient()
83 log.Fatalf("Error setting up arvados client %s", err.Error())
86 if os.Getenv("ARVADOS_DEBUG") != "" {
87 keepclient.DebugPrintf = log.Printf
89 kc, err := keepclient.MakeKeepClient(&arv)
91 log.Fatalf("Error setting up keep client %s", err.Error())
95 f, err := os.Create(pidfile)
97 log.Fatalf("Error writing pid file (%s): %s", pidfile, err.Error())
99 fmt.Fprint(f, os.Getpid())
101 defer os.Remove(pidfile)
104 kc.Want_replicas = default_replicas
105 kc.Client.Timeout = time.Duration(timeout) * time.Second
106 go kc.RefreshServices(5*time.Minute, 3*time.Second)
108 listener, err = net.Listen("tcp", listen)
110 log.Fatalf("Could not listen on %v", listen)
112 log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
114 // Shut down the server gracefully (by closing the listener)
115 // if SIGTERM is received.
116 term := make(chan os.Signal, 1)
117 go func(sig <-chan os.Signal) {
119 log.Println("caught signal:", s)
122 signal.Notify(term, syscall.SIGTERM)
123 signal.Notify(term, syscall.SIGINT)
125 // Start serving requests.
126 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
128 log.Println("shutting down")
131 type ApiTokenCache struct {
132 tokens map[string]int64
137 // Cache the token and set an expire time. If we already have an expire time
138 // on the token, it is not updated.
139 func (this *ApiTokenCache) RememberToken(token string) {
141 defer this.lock.Unlock()
143 now := time.Now().Unix()
144 if this.tokens[token] == 0 {
145 this.tokens[token] = now + this.expireTime
149 // Check if the cached token is known and still believed to be valid.
150 func (this *ApiTokenCache) RecallToken(token string) bool {
152 defer this.lock.Unlock()
154 now := time.Now().Unix()
155 if this.tokens[token] == 0 {
158 } else if now < this.tokens[token] {
159 // Token is known and still valid
163 this.tokens[token] = 0
168 func GetRemoteAddress(req *http.Request) string {
169 if xff := req.Header.Get("X-Forwarded-For"); xff != "" {
170 return xff + "," + req.RemoteAddr
172 return req.RemoteAddr
175 func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
177 if auth = req.Header.Get("Authorization"); auth == "" {
181 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
187 if cache.RecallToken(tok) {
188 // Valid in the cache, short circuit
194 if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
195 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
199 // Success! Update cache
200 cache.RememberToken(tok)
205 type GetBlockHandler struct {
206 *keepclient.KeepClient
210 type PutBlockHandler struct {
211 *keepclient.KeepClient
215 type IndexHandler struct {
216 *keepclient.KeepClient
220 type InvalidPathHandler struct{}
222 type OptionsHandler struct{}
225 // Returns a mux.Router that passes GET and PUT requests to the
226 // appropriate handlers.
231 kc *keepclient.KeepClient) *mux.Router {
233 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
235 rest := mux.NewRouter()
238 rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
239 GetBlockHandler{kc, t}).Methods("GET", "HEAD")
240 rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
243 rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
245 // List blocks whose hash has the given prefix
246 rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
250 rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, PutBlockHandler{kc, t}).Methods("PUT")
251 rest.Handle(`/{locator:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
252 rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
253 rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
254 rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
257 rest.NotFoundHandler = InvalidPathHandler{}
262 func SetCorsHeaders(resp http.ResponseWriter) {
263 resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
264 resp.Header().Set("Access-Control-Allow-Origin", "*")
265 resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
266 resp.Header().Set("Access-Control-Max-Age", "86486400")
269 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
270 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
271 http.Error(resp, "Bad request", http.StatusBadRequest)
274 func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
275 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
279 var BadAuthorizationHeader = errors.New("Missing or invalid Authorization header")
280 var ContentLengthMismatch = errors.New("Actual length != expected content length")
281 var MethodNotSupported = errors.New("Method not supported")
283 var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)")
285 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
288 locator := mux.Vars(req)["locator"]
291 var expectLength, responseLength int64
295 log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
296 if status != http.StatusOK {
297 http.Error(resp, err.Error(), status)
301 kc := *this.KeepClient
305 if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
306 status, err = http.StatusForbidden, BadAuthorizationHeader
310 // Copy ArvadosClient struct and use the client's API token
311 arvclient := *kc.Arvados
312 arvclient.ApiToken = tok
313 kc.Arvados = &arvclient
315 var reader io.ReadCloser
317 locator = removeHint.ReplaceAllString(locator, "$1")
321 expectLength, proxiedURI, err = kc.Ask(locator)
323 reader, expectLength, proxiedURI, err = kc.Get(locator)
328 status, err = http.StatusNotImplemented, MethodNotSupported
332 if expectLength == -1 {
333 log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
336 switch respErr := err.(type) {
338 status = http.StatusOK
339 resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
344 responseLength, err = io.Copy(resp, reader)
345 if err == nil && expectLength > -1 && responseLength != expectLength {
346 err = ContentLengthMismatch
349 case keepclient.Error:
350 if respErr == keepclient.BlockNotFound {
351 status = http.StatusNotFound
352 } else if respErr.Temporary() {
353 status = http.StatusBadGateway
358 status = http.StatusInternalServerError
362 var LengthRequiredError = errors.New(http.StatusText(http.StatusLengthRequired))
363 var LengthMismatchError = errors.New("Locator size hint does not match Content-Length header")
365 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
368 kc := *this.KeepClient
370 var expectLength int64
371 var status = http.StatusInternalServerError
372 var wroteReplicas int
373 var locatorOut string = "-"
376 log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, kc.Want_replicas, wroteReplicas, locatorOut, err)
377 if status != http.StatusOK {
378 http.Error(resp, err.Error(), status)
382 locatorIn := mux.Vars(req)["locator"]
384 _, err = fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
385 if err != nil || expectLength < 0 {
386 err = LengthRequiredError
387 status = http.StatusLengthRequired
392 var loc *keepclient.Locator
393 if loc, err = keepclient.MakeLocator(locatorIn); err != nil {
394 status = http.StatusBadRequest
396 } else if loc.Size > 0 && int64(loc.Size) != expectLength {
397 err = LengthMismatchError
398 status = http.StatusBadRequest
405 if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
406 err = BadAuthorizationHeader
407 status = http.StatusForbidden
411 // Copy ArvadosClient struct and use the client's API token
412 arvclient := *kc.Arvados
413 arvclient.ApiToken = tok
414 kc.Arvados = &arvclient
416 // Check if the client specified the number of replicas
417 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
419 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
425 // Now try to put the block through
427 if bytes, err := ioutil.ReadAll(req.Body); err != nil {
428 err = errors.New(fmt.Sprintf("Error reading request body: %s", err))
429 status = http.StatusInternalServerError
432 locatorOut, wroteReplicas, err = kc.PutB(bytes)
435 locatorOut, wroteReplicas, err = kc.PutHR(locatorIn, req.Body, expectLength)
438 // Tell the client how many successful PUTs we accomplished
439 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", wroteReplicas))
443 status = http.StatusOK
444 _, err = io.WriteString(resp, locatorOut)
446 case keepclient.OversizeBlockError:
448 status = http.StatusRequestEntityTooLarge
450 case keepclient.InsufficientReplicasError:
451 if wroteReplicas > 0 {
452 // At least one write is considered success. The
453 // client can decide if getting less than the number of
454 // replications it asked for is a fatal error.
455 status = http.StatusOK
456 _, err = io.WriteString(resp, locatorOut)
458 status = http.StatusServiceUnavailable
462 status = http.StatusBadGateway
466 // ServeHTTP implementation for IndexHandler
467 // Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}}
468 // For each keep server found in LocalRoots:
469 // Invokes GetIndex using keepclient
470 // Expects "complete" response (terminating with blank new line)
471 // Aborts on any errors
472 // Concatenates responses from all those keep servers and returns
473 func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
476 prefix := mux.Vars(req)["prefix"]
481 if status != http.StatusOK {
482 http.Error(resp, err.Error(), status)
486 kc := *handler.KeepClient
488 ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req)
490 status, err = http.StatusForbidden, BadAuthorizationHeader
494 // Copy ArvadosClient struct and use the client's API token
495 arvclient := *kc.Arvados
496 arvclient.ApiToken = token
497 kc.Arvados = &arvclient
499 // Only GET method is supported
500 if req.Method != "GET" {
501 status, err = http.StatusNotImplemented, MethodNotSupported
505 // Get index from all LocalRoots and write to resp
507 for uuid := range kc.LocalRoots() {
508 reader, err = kc.GetIndex(uuid, prefix)
510 status = http.StatusBadGateway
514 _, err = io.Copy(resp, reader)
516 status = http.StatusBadGateway
521 // Got index from all the keep servers and wrote to resp
522 status = http.StatusOK
523 resp.Write([]byte("\n"))