6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
8 "github.com/gorilla/mux"
21 // Default TCP address on which to listen for requests.
22 // Initialized by the -listen flag.
23 const DEFAULT_ADDR = ":25107"
25 var listener net.Listener
37 flagset := flag.NewFlagSet("default", flag.ExitOnError)
43 "Interface on which to listen for requests, in the format "+
44 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
45 "to listen on all network interfaces.")
51 "If set, disable GET operations")
57 "If set, disable PUT operations")
63 "Default number of replicas to write if not specified by the client.")
69 "Timeout on requests to internal Keep services (default 15 seconds)")
75 "Path to write pid file")
77 flagset.Parse(os.Args[1:])
79 arv, err := arvadosclient.MakeArvadosClient()
81 log.Fatalf("Error setting up arvados client %s", err.Error())
84 kc, err := keepclient.MakeKeepClient(&arv)
86 log.Fatalf("Error setting up keep client %s", err.Error())
90 f, err := os.Create(pidfile)
92 log.Fatalf("Error writing pid file (%s): %s", pidfile, err.Error())
94 fmt.Fprint(f, os.Getpid())
96 defer os.Remove(pidfile)
99 kc.Want_replicas = default_replicas
101 kc.Client.Timeout = time.Duration(timeout) * time.Second
103 listener, err = net.Listen("tcp", listen)
105 log.Fatalf("Could not listen on %v", listen)
108 go RefreshServicesList(&kc)
110 // Shut down the server gracefully (by closing the listener)
111 // if SIGTERM is received.
112 term := make(chan os.Signal, 1)
113 go func(sig <-chan os.Signal) {
115 log.Println("caught signal:", s)
118 signal.Notify(term, syscall.SIGTERM)
119 signal.Notify(term, syscall.SIGINT)
121 log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
123 // Start listening for requests.
124 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
126 log.Println("shutting down")
129 type ApiTokenCache struct {
130 tokens map[string]int64
135 // Refresh the keep service list every five minutes.
136 func RefreshServicesList(kc *keepclient.KeepClient) {
138 time.Sleep(300 * time.Second)
139 oldservices := kc.ServiceRoots()
140 kc.DiscoverKeepServers()
141 newservices := kc.ServiceRoots()
142 s1 := fmt.Sprint(oldservices)
143 s2 := fmt.Sprint(newservices)
145 log.Printf("Updated server list to %v", s2)
150 // Cache the token and set an expire time. If we already have an expire time
151 // on the token, it is not updated.
152 func (this *ApiTokenCache) RememberToken(token string) {
154 defer this.lock.Unlock()
156 now := time.Now().Unix()
157 if this.tokens[token] == 0 {
158 this.tokens[token] = now + this.expireTime
162 // Check if the cached token is known and still believed to be valid.
163 func (this *ApiTokenCache) RecallToken(token string) bool {
165 defer this.lock.Unlock()
167 now := time.Now().Unix()
168 if this.tokens[token] == 0 {
171 } else if now < this.tokens[token] {
172 // Token is known and still valid
176 this.tokens[token] = 0
181 func GetRemoteAddress(req *http.Request) string {
182 if realip := req.Header.Get("X-Real-IP"); realip != "" {
183 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
184 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
189 return req.RemoteAddr
192 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
194 if auth = req.Header.Get("Authorization"); auth == "" {
198 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
204 if cache.RecallToken(tok) {
205 // Valid in the cache, short circut
211 if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
212 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
216 // Success! Update cache
217 cache.RememberToken(tok)
222 type GetBlockHandler struct {
223 *keepclient.KeepClient
227 type PutBlockHandler struct {
228 *keepclient.KeepClient
232 type InvalidPathHandler struct{}
234 type OptionsHandler struct{}
237 // Returns a mux.Router that passes GET and PUT requests to the
238 // appropriate handlers.
243 kc *keepclient.KeepClient) *mux.Router {
245 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
247 rest := mux.NewRouter()
250 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
251 GetBlockHandler{kc, t}).Methods("GET", "HEAD")
252 rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
256 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
257 rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
258 rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
259 rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
260 rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
263 rest.NotFoundHandler = InvalidPathHandler{}
268 func SetCorsHeaders(resp http.ResponseWriter) {
269 resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
270 resp.Header().Set("Access-Control-Allow-Origin", "*")
271 resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
272 resp.Header().Set("Access-Control-Max-Age", "86486400")
275 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
276 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
277 http.Error(resp, "Bad request", http.StatusBadRequest)
280 func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
281 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
285 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
288 kc := *this.KeepClient
290 hash := mux.Vars(req)["hash"]
291 hints := mux.Vars(req)["hints"]
293 locator := keepclient.MakeLocator2(hash, hints)
295 log.Printf("%s: %s %s begin", GetRemoteAddress(req), req.Method, hash)
299 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
300 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
304 // Copy ArvadosClient struct and use the client's API token
305 arvclient := *kc.Arvados
306 arvclient.ApiToken = tok
307 kc.Arvados = &arvclient
309 var reader io.ReadCloser
313 if req.Method == "GET" {
314 reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
318 } else if req.Method == "HEAD" {
319 blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
323 log.Printf("%s: %s %s Keep server did not return Content-Length",
324 GetRemoteAddress(req), req.Method, hash)
330 status = http.StatusOK
331 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
333 n, err2 := io.Copy(resp, reader)
334 if blocklen > -1 && n != blocklen {
335 log.Printf("%s: %s %s %v %v mismatched copy size expected Content-Length: %v",
336 GetRemoteAddress(req), req.Method, hash, status, n, blocklen)
337 } else if err2 == nil {
338 log.Printf("%s: %s %s %v %v",
339 GetRemoteAddress(req), req.Method, hash, status, n)
341 log.Printf("%s: %s %s %v %v copy error: %v",
342 GetRemoteAddress(req), req.Method, hash, status, n, err2.Error())
345 log.Printf("%s: %s %s %v 0", GetRemoteAddress(req), req.Method, hash, status)
347 case keepclient.BlockNotFound:
348 status = http.StatusNotFound
349 http.Error(resp, "Not Found", http.StatusNotFound)
351 status = http.StatusBadGateway
352 http.Error(resp, err.Error(), http.StatusBadGateway)
356 log.Printf("%s: %s %s %v error: %v",
357 GetRemoteAddress(req), req.Method, hash, status, err.Error())
361 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
364 kc := *this.KeepClient
366 hash := mux.Vars(req)["hash"]
367 hints := mux.Vars(req)["hints"]
369 locator := keepclient.MakeLocator2(hash, hints)
371 var contentLength int64 = -1
372 if req.Header.Get("Content-Length") != "" {
373 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
375 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
380 log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
382 if contentLength < 0 {
383 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
387 if locator.Size > 0 && int64(locator.Size) != contentLength {
388 http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
394 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
395 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
399 // Copy ArvadosClient struct and use the client's API token
400 arvclient := *kc.Arvados
401 arvclient.ApiToken = tok
402 kc.Arvados = &arvclient
404 // Check if the client specified the number of replicas
405 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
407 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
413 // Now try to put the block through
417 if bytes, err := ioutil.ReadAll(req.Body); err != nil {
418 msg := fmt.Sprintf("Error reading request body: %s", err)
420 http.Error(resp, msg, http.StatusInternalServerError)
423 hash, replicas, put_err = kc.PutB(bytes)
426 hash, replicas, put_err = kc.PutHR(hash, req.Body, contentLength)
429 // Tell the client how many successful PUTs we accomplished
430 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
434 // Default will return http.StatusOK
435 log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
436 n, err2 := io.WriteString(resp, hash)
438 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
441 case keepclient.OversizeBlockError:
443 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
445 case keepclient.InsufficientReplicasError:
447 // At least one write is considered success. The
448 // client can decide if getting less than the number of
449 // replications it asked for is a fatal error.
450 // Default will return http.StatusOK
451 n, err2 := io.WriteString(resp, hash)
453 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
456 http.Error(resp, put_err.Error(), http.StatusServiceUnavailable)
460 http.Error(resp, put_err.Error(), http.StatusBadGateway)
464 log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, put_err.Error())