4 "git.curoverse.com/arvados.git/sdk/go/keepclient"
5 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "github.com/gorilla/mux"
21 // Default TCP address on which to listen for requests.
22 // Initialized by the -listen flag.
23 const DEFAULT_ADDR = ":25107"
25 var listener net.Listener
36 flagset := flag.NewFlagSet("default", flag.ExitOnError)
42 "Interface on which to listen for requests, in the format "+
43 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
44 "to listen on all network interfaces.")
50 "If set, disable GET operations")
56 "If set, disable PUT operations")
62 "Default number of replicas to write if not specified by the client.")
68 "Path to write pid file")
70 flagset.Parse(os.Args[1:])
72 arv, err := arvadosclient.MakeArvadosClient()
74 log.Fatalf("Error setting up arvados client %s", err.Error())
77 kc, err := keepclient.MakeKeepClient(&arv)
79 log.Fatalf("Error setting up keep client %s", err.Error())
83 f, err := os.Create(pidfile)
85 log.Fatalf("Error writing pid file (%s): %s", pidfile, err.Error())
87 fmt.Fprint(f, os.Getpid())
89 defer os.Remove(pidfile)
92 kc.Want_replicas = default_replicas
94 listener, err = net.Listen("tcp", listen)
96 log.Fatalf("Could not listen on %v", listen)
99 go RefreshServicesList(&kc)
101 // Shut down the server gracefully (by closing the listener)
102 // if SIGTERM is received.
103 term := make(chan os.Signal, 1)
104 go func(sig <-chan os.Signal) {
106 log.Println("caught signal:", s)
109 signal.Notify(term, syscall.SIGTERM)
110 signal.Notify(term, syscall.SIGINT)
112 log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
114 // Start listening for requests.
115 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
117 log.Println("shutting down")
120 type ApiTokenCache struct {
121 tokens map[string]int64
126 // Refresh the keep service list every five minutes.
127 func RefreshServicesList(kc *keepclient.KeepClient) {
129 time.Sleep(300 * time.Second)
130 oldservices := kc.ServiceRoots()
131 kc.DiscoverKeepServers()
132 newservices := kc.ServiceRoots()
133 s1 := fmt.Sprint(oldservices)
134 s2 := fmt.Sprint(newservices)
136 log.Printf("Updated server list to %v", s2)
141 // Cache the token and set an expire time. If we already have an expire time
142 // on the token, it is not updated.
143 func (this *ApiTokenCache) RememberToken(token string) {
145 defer this.lock.Unlock()
147 now := time.Now().Unix()
148 if this.tokens[token] == 0 {
149 this.tokens[token] = now + this.expireTime
153 // Check if the cached token is known and still believed to be valid.
154 func (this *ApiTokenCache) RecallToken(token string) bool {
156 defer this.lock.Unlock()
158 now := time.Now().Unix()
159 if this.tokens[token] == 0 {
162 } else if now < this.tokens[token] {
163 // Token is known and still valid
167 this.tokens[token] = 0
172 func GetRemoteAddress(req *http.Request) string {
173 if realip := req.Header.Get("X-Real-IP"); realip != "" {
174 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
175 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
180 return req.RemoteAddr
183 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
185 if auth = req.Header.Get("Authorization"); auth == "" {
189 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
195 if cache.RecallToken(tok) {
196 // Valid in the cache, short circut
202 if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
203 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
207 // Success! Update cache
208 cache.RememberToken(tok)
213 type GetBlockHandler struct {
214 *keepclient.KeepClient
218 type PutBlockHandler struct {
219 *keepclient.KeepClient
223 type InvalidPathHandler struct{}
225 type OptionsHandler struct{}
228 // Returns a mux.Router that passes GET and PUT requests to the
229 // appropriate handlers.
234 kc *keepclient.KeepClient) *mux.Router {
236 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
238 rest := mux.NewRouter()
241 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
242 GetBlockHandler{kc, t}).Methods("GET", "HEAD")
243 rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
247 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
248 rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
249 rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
250 rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
251 rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
254 rest.NotFoundHandler = InvalidPathHandler{}
259 func SetCorsHeaders(resp http.ResponseWriter) {
260 resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
261 resp.Header().Set("Access-Control-Allow-Origin", "*")
262 resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
263 resp.Header().Set("Access-Control-Max-Age", "86486400")
266 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
267 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
268 http.Error(resp, "Bad request", http.StatusBadRequest)
271 func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
272 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
276 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
279 kc := *this.KeepClient
281 hash := mux.Vars(req)["hash"]
282 hints := mux.Vars(req)["hints"]
284 locator := keepclient.MakeLocator2(hash, hints)
286 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
290 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
291 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
295 // Copy ArvadosClient struct and use the client's API token
296 arvclient := *kc.Arvados
297 arvclient.ApiToken = tok
298 kc.Arvados = &arvclient
300 var reader io.ReadCloser
304 if req.Method == "GET" {
305 reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
307 } else if req.Method == "HEAD" {
308 blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
312 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
318 n, err2 := io.Copy(resp, reader)
320 log.Printf("%s: %s %s mismatched return %v with Content-Length %v error %v", GetRemoteAddress(req), req.Method, hash, n, blocklen, err2)
321 } else if err2 == nil {
322 log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
324 log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
327 log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
329 case keepclient.BlockNotFound:
330 http.Error(resp, "Not found", http.StatusNotFound)
332 http.Error(resp, err.Error(), http.StatusBadGateway)
336 log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
340 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
343 kc := *this.KeepClient
345 hash := mux.Vars(req)["hash"]
346 hints := mux.Vars(req)["hints"]
348 locator := keepclient.MakeLocator2(hash, hints)
350 var contentLength int64 = -1
351 if req.Header.Get("Content-Length") != "" {
352 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
354 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
359 log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
361 if contentLength < 1 {
362 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
366 if locator.Size > 0 && int64(locator.Size) != contentLength {
367 http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
373 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
374 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
378 // Copy ArvadosClient struct and use the client's API token
379 arvclient := *kc.Arvados
380 arvclient.ApiToken = tok
381 kc.Arvados = &arvclient
383 // Check if the client specified the number of replicas
384 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
386 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
392 // Now try to put the block through
396 if bytes, err := ioutil.ReadAll(req.Body); err != nil {
397 msg := fmt.Sprintf("Error reading request body: %s", err)
399 http.Error(resp, msg, http.StatusInternalServerError)
402 hash, replicas, put_err = kc.PutB(bytes)
405 hash, replicas, put_err = kc.PutHR(hash, req.Body, contentLength)
408 // Tell the client how many successful PUTs we accomplished
409 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
413 // Default will return http.StatusOK
414 log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
415 n, err2 := io.WriteString(resp, hash)
417 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
420 case keepclient.OversizeBlockError:
422 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
424 case keepclient.InsufficientReplicasError:
426 // At least one write is considered success. The
427 // client can decide if getting less than the number of
428 // replications it asked for is a fatal error.
429 // Default will return http.StatusOK
430 n, err2 := io.WriteString(resp, hash)
432 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
435 http.Error(resp, put_err.Error(), http.StatusServiceUnavailable)
439 http.Error(resp, put_err.Error(), http.StatusBadGateway)
443 log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, put_err.Error())