4 "git.curoverse.com/arvados.git/sdk/go/keepclient"
5 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "github.com/gorilla/mux"
21 // Default TCP address on which to listen for requests.
22 // Initialized by the -listen flag.
23 const DEFAULT_ADDR = ":25107"
25 var listener net.Listener
36 flagset := flag.NewFlagSet("default", flag.ExitOnError)
42 "Interface on which to listen for requests, in the format "+
43 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
44 "to listen on all network interfaces.")
50 "If set, disable GET operations")
56 "If set, disable PUT operations")
62 "Default number of replicas to write if not specified by the client.")
68 "Path to write pid file")
70 flagset.Parse(os.Args[1:])
72 arv, err := arvadosclient.MakeArvadosClient()
74 log.Fatalf("Error setting up arvados client %s", err.Error())
77 kc, err := keepclient.MakeKeepClient(&arv)
79 log.Fatalf("Error setting up keep client %s", err.Error())
83 f, err := os.Create(pidfile)
85 log.Fatalf("Error writing pid file (%s): %s", pidfile, err.Error())
87 fmt.Fprint(f, os.Getpid())
89 defer os.Remove(pidfile)
92 kc.Want_replicas = default_replicas
94 listener, err = net.Listen("tcp", listen)
96 log.Fatalf("Could not listen on %v", listen)
99 go RefreshServicesList(&kc)
101 // Shut down the server gracefully (by closing the listener)
102 // if SIGTERM is received.
103 term := make(chan os.Signal, 1)
104 go func(sig <-chan os.Signal) {
106 log.Println("caught signal:", s)
110 signal.Notify(term, syscall.SIGTERM)
111 signal.Notify(term, syscall.SIGINT)
113 log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
115 // Start listening for requests.
116 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
118 log.Println("shutting down")
121 type ApiTokenCache struct {
122 tokens map[string]int64
127 // Refresh the keep service list every five minutes.
128 func RefreshServicesList(kc *keepclient.KeepClient) {
130 time.Sleep(300 * time.Second)
131 oldservices := kc.ServiceRoots()
132 kc.DiscoverKeepServers()
133 newservices := kc.ServiceRoots()
134 s1 := fmt.Sprint(oldservices)
135 s2 := fmt.Sprint(newservices)
137 log.Printf("Updated server list to %v", s2)
142 // Cache the token and set an expire time. If we already have an expire time
143 // on the token, it is not updated.
144 func (this *ApiTokenCache) RememberToken(token string) {
146 defer this.lock.Unlock()
148 now := time.Now().Unix()
149 if this.tokens[token] == 0 {
150 this.tokens[token] = now + this.expireTime
154 // Check if the cached token is known and still believed to be valid.
155 func (this *ApiTokenCache) RecallToken(token string) bool {
157 defer this.lock.Unlock()
159 now := time.Now().Unix()
160 if this.tokens[token] == 0 {
163 } else if now < this.tokens[token] {
164 // Token is known and still valid
168 this.tokens[token] = 0
173 func GetRemoteAddress(req *http.Request) string {
174 if realip := req.Header.Get("X-Real-IP"); realip != "" {
175 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
176 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
181 return req.RemoteAddr
184 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
186 if auth = req.Header.Get("Authorization"); auth == "" {
190 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
196 if cache.RecallToken(tok) {
197 // Valid in the cache, short circut
203 if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
204 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
208 // Success! Update cache
209 cache.RememberToken(tok)
214 type GetBlockHandler struct {
215 *keepclient.KeepClient
219 type PutBlockHandler struct {
220 *keepclient.KeepClient
224 type InvalidPathHandler struct{}
226 type OptionsHandler struct{}
229 // Returns a mux.Router that passes GET and PUT requests to the
230 // appropriate handlers.
235 kc *keepclient.KeepClient) *mux.Router {
237 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
239 rest := mux.NewRouter()
242 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
243 GetBlockHandler{kc, t}).Methods("GET", "HEAD")
244 rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
248 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
249 rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
250 rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
251 rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
254 rest.NotFoundHandler = InvalidPathHandler{}
259 func SetCorsHeaders(resp http.ResponseWriter) {
260 resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
261 resp.Header().Set("Access-Control-Allow-Origin", "*")
262 resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
263 resp.Header().Set("Access-Control-Max-Age", "86486400")
266 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
267 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
268 http.Error(resp, "Bad request", http.StatusBadRequest)
271 func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
272 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
276 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
279 kc := *this.KeepClient
281 hash := mux.Vars(req)["hash"]
282 hints := mux.Vars(req)["hints"]
284 locator := keepclient.MakeLocator2(hash, hints)
286 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
290 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
291 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
295 // Copy ArvadosClient struct and use the client's API token
296 arvclient := *kc.Arvados
297 arvclient.ApiToken = tok
298 kc.Arvados = &arvclient
300 var reader io.ReadCloser
304 if req.Method == "GET" {
305 reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
307 } else if req.Method == "HEAD" {
308 blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
312 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
318 n, err2 := io.Copy(resp, reader)
320 log.Printf("%s: %s %s mismatched return %v with Content-Length %v error %v", GetRemoteAddress(req), req.Method, hash, n, blocklen, err2)
321 } else if err2 == nil {
322 log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
324 log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
327 log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
329 case keepclient.BlockNotFound:
330 http.Error(resp, "Not found", http.StatusNotFound)
332 http.Error(resp, err.Error(), http.StatusBadGateway)
336 log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
340 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
343 kc := *this.KeepClient
345 hash := mux.Vars(req)["hash"]
346 hints := mux.Vars(req)["hints"]
348 locator := keepclient.MakeLocator2(hash, hints)
350 var contentLength int64 = -1
351 if req.Header.Get("Content-Length") != "" {
352 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
354 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
359 log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
361 if contentLength < 1 {
362 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
366 if locator.Size > 0 && int64(locator.Size) != contentLength {
367 http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
373 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
374 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
378 // Copy ArvadosClient struct and use the client's API token
379 arvclient := *kc.Arvados
380 arvclient.ApiToken = tok
381 kc.Arvados = &arvclient
383 // Check if the client specified the number of replicas
384 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
386 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
392 // Now try to put the block through
396 if bytes, err := ioutil.ReadAll(req.Body); err != nil {
397 msg := fmt.Sprintf("Error reading request body: %s", err)
399 http.Error(resp, msg, http.StatusInternalServerError)
402 hash, replicas, put_err = kc.PutB(bytes)
405 hash, replicas, put_err = kc.PutHR(hash, req.Body, contentLength)
408 // Tell the client how many successful PUTs we accomplished
409 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
413 // Default will return http.StatusOK
414 log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
415 n, err2 := io.WriteString(resp, hash)
417 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
420 case keepclient.OversizeBlockError:
422 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
424 case keepclient.InsufficientReplicasError:
426 // At least one write is considered success. The
427 // client can decide if getting less than the number of
428 // replications it asked for is a fatal error.
429 // Default will return http.StatusOK
430 n, err2 := io.WriteString(resp, hash)
432 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
435 http.Error(resp, put_err.Error(), http.StatusServiceUnavailable)
439 http.Error(resp, put_err.Error(), http.StatusBadGateway)
443 log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, put_err.Error())