6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
8 "github.com/gorilla/mux"
21 // Default TCP address on which to listen for requests.
22 // Initialized by the -listen flag.
23 const DEFAULT_ADDR = ":25107"
25 var listener net.Listener
37 flagset := flag.NewFlagSet("default", flag.ExitOnError)
43 "Interface on which to listen for requests, in the format "+
44 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
45 "to listen on all network interfaces.")
51 "If set, disable GET operations")
57 "If set, disable PUT operations")
63 "Default number of replicas to write if not specified by the client.")
69 "Timeout on requests to internal Keep services (default 15 seconds)")
75 "Path to write pid file")
77 flagset.Parse(os.Args[1:])
79 arv, err := arvadosclient.MakeArvadosClient()
81 log.Fatalf("Error setting up arvados client %s", err.Error())
84 kc, err := keepclient.MakeKeepClient(&arv)
86 log.Fatalf("Error setting up keep client %s", err.Error())
90 f, err := os.Create(pidfile)
92 log.Fatalf("Error writing pid file (%s): %s", pidfile, err.Error())
94 fmt.Fprint(f, os.Getpid())
96 defer os.Remove(pidfile)
99 kc.Want_replicas = default_replicas
101 kc.Client.Timeout = time.Duration(timeout) * time.Second
103 listener, err = net.Listen("tcp", listen)
105 log.Fatalf("Could not listen on %v", listen)
108 go RefreshServicesList(&kc)
110 // Shut down the server gracefully (by closing the listener)
111 // if SIGTERM is received.
112 term := make(chan os.Signal, 1)
113 go func(sig <-chan os.Signal) {
115 log.Println("caught signal:", s)
118 signal.Notify(term, syscall.SIGTERM)
119 signal.Notify(term, syscall.SIGINT)
121 log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
123 // Start listening for requests.
124 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
126 log.Println("shutting down")
129 type ApiTokenCache struct {
130 tokens map[string]int64
135 // Refresh the keep service list every five minutes.
136 func RefreshServicesList(kc *keepclient.KeepClient) {
138 time.Sleep(300 * time.Second)
139 oldservices := kc.ServiceRoots()
140 kc.DiscoverKeepServers()
141 newservices := kc.ServiceRoots()
142 s1 := fmt.Sprint(oldservices)
143 s2 := fmt.Sprint(newservices)
145 log.Printf("Updated server list to %v", s2)
150 // Cache the token and set an expire time. If we already have an expire time
151 // on the token, it is not updated.
152 func (this *ApiTokenCache) RememberToken(token string) {
154 defer this.lock.Unlock()
156 now := time.Now().Unix()
157 if this.tokens[token] == 0 {
158 this.tokens[token] = now + this.expireTime
162 // Check if the cached token is known and still believed to be valid.
163 func (this *ApiTokenCache) RecallToken(token string) bool {
165 defer this.lock.Unlock()
167 now := time.Now().Unix()
168 if this.tokens[token] == 0 {
171 } else if now < this.tokens[token] {
172 // Token is known and still valid
176 this.tokens[token] = 0
181 func GetRemoteAddress(req *http.Request) string {
182 if realip := req.Header.Get("X-Real-IP"); realip != "" {
183 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
184 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
189 return req.RemoteAddr
192 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
194 if auth = req.Header.Get("Authorization"); auth == "" {
198 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
204 if cache.RecallToken(tok) {
205 // Valid in the cache, short circut
211 if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
212 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
216 // Success! Update cache
217 cache.RememberToken(tok)
222 type GetBlockHandler struct {
223 *keepclient.KeepClient
227 type PutBlockHandler struct {
228 *keepclient.KeepClient
232 type InvalidPathHandler struct{}
234 type OptionsHandler struct{}
237 // Returns a mux.Router that passes GET and PUT requests to the
238 // appropriate handlers.
243 kc *keepclient.KeepClient) *mux.Router {
245 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
247 rest := mux.NewRouter()
250 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
251 GetBlockHandler{kc, t}).Methods("GET", "HEAD")
252 rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
256 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
257 rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
258 rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
259 rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
260 rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
263 rest.NotFoundHandler = InvalidPathHandler{}
268 func SetCorsHeaders(resp http.ResponseWriter) {
269 resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
270 resp.Header().Set("Access-Control-Allow-Origin", "*")
271 resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
272 resp.Header().Set("Access-Control-Max-Age", "86486400")
275 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
276 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
277 http.Error(resp, "Bad request", http.StatusBadRequest)
280 func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
281 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
285 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
288 kc := *this.KeepClient
290 hash := mux.Vars(req)["hash"]
291 hints := mux.Vars(req)["hints"]
293 locator := keepclient.MakeLocator2(hash, hints)
295 log.Printf("%s: %s %s begin", GetRemoteAddress(req), req.Method, hash)
299 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
300 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
304 // Copy ArvadosClient struct and use the client's API token
305 arvclient := *kc.Arvados
306 arvclient.ApiToken = tok
307 kc.Arvados = &arvclient
309 var reader io.ReadCloser
313 if req.Method == "GET" {
314 reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
316 } else if req.Method == "HEAD" {
317 blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
321 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
323 log.Printf("%s: %s %s Keep server did not return Content-Length",
324 GetRemoteAddress(req), req.Method, hash)
330 status = http.StatusOK
332 n, err2 := io.Copy(resp, reader)
333 if blocklen > -1 && n != blocklen {
334 log.Printf("%s: %s %s %v %v mismatched copy size expected Content-Length: %v",
335 GetRemoteAddress(req), req.Method, hash, status, n, blocklen)
336 } else if err2 == nil {
337 log.Printf("%s: %s %s %v %v",
338 GetRemoteAddress(req), req.Method, hash, status, n)
340 log.Printf("%s: %s %s %v %v copy error: %v",
341 GetRemoteAddress(req), req.Method, hash, status, n, err2.Error())
344 log.Printf("%s: %s %s %v 0", GetRemoteAddress(req), req.Method, hash, status)
346 case keepclient.BlockNotFound:
347 status = http.StatusNotFound
348 http.Error(resp, "Not found", http.StatusNotFound)
350 status = http.StatusBadGateway
351 http.Error(resp, err.Error(), http.StatusBadGateway)
355 log.Printf("%s: %s %s %v error: %v",
356 GetRemoteAddress(req), req.Method, hash, status, err.Error())
360 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
363 kc := *this.KeepClient
365 hash := mux.Vars(req)["hash"]
366 hints := mux.Vars(req)["hints"]
368 locator := keepclient.MakeLocator2(hash, hints)
370 var contentLength int64 = -1
371 if req.Header.Get("Content-Length") != "" {
372 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
374 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
379 log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
381 if contentLength < 0 {
382 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
386 if locator.Size > 0 && int64(locator.Size) != contentLength {
387 http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
393 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
394 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
398 // Copy ArvadosClient struct and use the client's API token
399 arvclient := *kc.Arvados
400 arvclient.ApiToken = tok
401 kc.Arvados = &arvclient
403 // Check if the client specified the number of replicas
404 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
406 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
412 // Now try to put the block through
416 if bytes, err := ioutil.ReadAll(req.Body); err != nil {
417 msg := fmt.Sprintf("Error reading request body: %s", err)
419 http.Error(resp, msg, http.StatusInternalServerError)
422 hash, replicas, put_err = kc.PutB(bytes)
425 hash, replicas, put_err = kc.PutHR(hash, req.Body, contentLength)
428 // Tell the client how many successful PUTs we accomplished
429 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
433 // Default will return http.StatusOK
434 log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
435 n, err2 := io.WriteString(resp, hash)
437 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
440 case keepclient.OversizeBlockError:
442 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
444 case keepclient.InsufficientReplicasError:
446 // At least one write is considered success. The
447 // client can decide if getting less than the number of
448 // replications it asked for is a fatal error.
449 // Default will return http.StatusOK
450 n, err2 := io.WriteString(resp, hash)
452 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
455 http.Error(resp, put_err.Error(), http.StatusServiceUnavailable)
459 http.Error(resp, put_err.Error(), http.StatusBadGateway)
463 log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, put_err.Error())