4 "git.curoverse.com/arvados.git/sdk/go/keepclient"
5 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "github.com/gorilla/mux"
21 // Default TCP address on which to listen for requests.
22 // Initialized by the -listen flag.
23 const DEFAULT_ADDR = ":25107"
25 var listener net.Listener
36 flagset := flag.NewFlagSet("default", flag.ExitOnError)
42 "Interface on which to listen for requests, in the format "+
43 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
44 "to listen on all network interfaces.")
50 "If set, disable GET operations")
56 "If set, disable PUT operations")
62 "Default number of replicas to write if not specified by the client.")
68 "Path to write pid file")
70 flagset.Parse(os.Args[1:])
72 arv, err := arvadosclient.MakeArvadosClient()
74 log.Fatalf("Error setting up arvados client %s", err.Error())
77 kc, err := keepclient.MakeKeepClient(&arv)
79 log.Fatalf("Error setting up keep client %s", err.Error())
83 f, err := os.Create(pidfile)
85 log.Fatalf("Error writing pid file (%s): %s", pidfile, err.Error())
87 fmt.Fprint(f, os.Getpid())
89 defer os.Remove(pidfile)
92 kc.Want_replicas = default_replicas
94 listener, err = net.Listen("tcp", listen)
96 log.Fatalf("Could not listen on %v", listen)
99 go RefreshServicesList(&kc)
101 // Shut down the server gracefully (by closing the listener)
102 // if SIGTERM is received.
103 term := make(chan os.Signal, 1)
104 go func(sig <-chan os.Signal) {
106 log.Println("caught signal:", s)
110 signal.Notify(term, syscall.SIGTERM)
111 signal.Notify(term, syscall.SIGINT)
113 log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
115 // Start listening for requests.
116 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
118 log.Println("shutting down")
121 type ApiTokenCache struct {
122 tokens map[string]int64
127 // Refresh the keep service list every five minutes.
128 func RefreshServicesList(kc *keepclient.KeepClient) {
130 time.Sleep(300 * time.Second)
131 oldservices := kc.ServiceRoots()
132 kc.DiscoverKeepServers()
133 newservices := kc.ServiceRoots()
134 s1 := fmt.Sprint(oldservices)
135 s2 := fmt.Sprint(newservices)
137 log.Printf("Updated server list to %v", s2)
142 // Cache the token and set an expire time. If we already have an expire time
143 // on the token, it is not updated.
144 func (this *ApiTokenCache) RememberToken(token string) {
146 defer this.lock.Unlock()
148 now := time.Now().Unix()
149 if this.tokens[token] == 0 {
150 this.tokens[token] = now + this.expireTime
154 // Check if the cached token is known and still believed to be valid.
155 func (this *ApiTokenCache) RecallToken(token string) bool {
157 defer this.lock.Unlock()
159 now := time.Now().Unix()
160 if this.tokens[token] == 0 {
163 } else if now < this.tokens[token] {
164 // Token is known and still valid
168 this.tokens[token] = 0
173 func GetRemoteAddress(req *http.Request) string {
174 if realip := req.Header.Get("X-Real-IP"); realip != "" {
175 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
176 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
181 return req.RemoteAddr
184 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
186 if auth = req.Header.Get("Authorization"); auth == "" {
190 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
196 if cache.RecallToken(tok) {
197 // Valid in the cache, short circut
203 if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
204 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
208 // Success! Update cache
209 cache.RememberToken(tok)
214 type GetBlockHandler struct {
215 *keepclient.KeepClient
219 type PutBlockHandler struct {
220 *keepclient.KeepClient
224 type InvalidPathHandler struct{}
226 type OptionsHandler struct{}
229 // Returns a mux.Router that passes GET and PUT requests to the
230 // appropriate handlers.
235 kc *keepclient.KeepClient) *mux.Router {
237 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
239 rest := mux.NewRouter()
242 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
243 GetBlockHandler{kc, t}).Methods("GET", "HEAD")
244 rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
248 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
249 rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
250 rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
251 rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
252 rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
255 rest.NotFoundHandler = InvalidPathHandler{}
260 func SetCorsHeaders(resp http.ResponseWriter) {
261 resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
262 resp.Header().Set("Access-Control-Allow-Origin", "*")
263 resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
264 resp.Header().Set("Access-Control-Max-Age", "86486400")
267 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
268 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
269 http.Error(resp, "Bad request", http.StatusBadRequest)
272 func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
273 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
277 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
280 kc := *this.KeepClient
282 hash := mux.Vars(req)["hash"]
283 hints := mux.Vars(req)["hints"]
285 locator := keepclient.MakeLocator2(hash, hints)
287 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
291 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
292 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
296 // Copy ArvadosClient struct and use the client's API token
297 arvclient := *kc.Arvados
298 arvclient.ApiToken = tok
299 kc.Arvados = &arvclient
301 var reader io.ReadCloser
305 if req.Method == "GET" {
306 reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
308 } else if req.Method == "HEAD" {
309 blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
313 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
319 n, err2 := io.Copy(resp, reader)
321 log.Printf("%s: %s %s mismatched return %v with Content-Length %v error %v", GetRemoteAddress(req), req.Method, hash, n, blocklen, err2)
322 } else if err2 == nil {
323 log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
325 log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
328 log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
330 case keepclient.BlockNotFound:
331 http.Error(resp, "Not found", http.StatusNotFound)
333 http.Error(resp, err.Error(), http.StatusBadGateway)
337 log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
341 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
344 kc := *this.KeepClient
346 hash := mux.Vars(req)["hash"]
347 hints := mux.Vars(req)["hints"]
349 locator := keepclient.MakeLocator2(hash, hints)
351 var contentLength int64 = -1
352 if req.Header.Get("Content-Length") != "" {
353 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
355 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
360 log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
362 if contentLength < 1 {
363 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
367 if locator.Size > 0 && int64(locator.Size) != contentLength {
368 http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
374 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
375 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
379 // Copy ArvadosClient struct and use the client's API token
380 arvclient := *kc.Arvados
381 arvclient.ApiToken = tok
382 kc.Arvados = &arvclient
384 // Check if the client specified the number of replicas
385 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
387 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
393 // Now try to put the block through
397 if bytes, err := ioutil.ReadAll(req.Body); err != nil {
398 msg := fmt.Sprintf("Error reading request body: %s", err)
400 http.Error(resp, msg, http.StatusInternalServerError)
403 hash, replicas, put_err = kc.PutB(bytes)
406 hash, replicas, put_err = kc.PutHR(hash, req.Body, contentLength)
409 // Tell the client how many successful PUTs we accomplished
410 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
414 // Default will return http.StatusOK
415 log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
416 n, err2 := io.WriteString(resp, hash)
418 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
421 case keepclient.OversizeBlockError:
423 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
425 case keepclient.InsufficientReplicasError:
427 // At least one write is considered success. The
428 // client can decide if getting less than the number of
429 // replications it asked for is a fatal error.
430 // Default will return http.StatusOK
431 n, err2 := io.WriteString(resp, hash)
433 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
436 http.Error(resp, put_err.Error(), http.StatusServiceUnavailable)
440 http.Error(resp, put_err.Error(), http.StatusBadGateway)
444 log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, put_err.Error())