6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
8 "github.com/gorilla/mux"
21 // Default TCP address on which to listen for requests.
22 // Initialized by the -listen flag.
23 const DEFAULT_ADDR = ":25107"
25 var listener net.Listener
37 flagset := flag.NewFlagSet("default", flag.ExitOnError)
43 "Interface on which to listen for requests, in the format "+
44 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
45 "to listen on all network interfaces.")
51 "If set, disable GET operations")
57 "If set, disable PUT operations")
63 "Default number of replicas to write if not specified by the client.")
69 "Timeout on requests to internal Keep services (default 15 seconds)")
75 "Path to write pid file")
77 flagset.Parse(os.Args[1:])
79 arv, err := arvadosclient.MakeArvadosClient()
81 log.Fatalf("Error setting up arvados client %s", err.Error())
84 kc, err := keepclient.MakeKeepClient(&arv)
86 log.Fatalf("Error setting up keep client %s", err.Error())
90 f, err := os.Create(pidfile)
92 log.Fatalf("Error writing pid file (%s): %s", pidfile, err.Error())
94 fmt.Fprint(f, os.Getpid())
96 defer os.Remove(pidfile)
99 kc.Want_replicas = default_replicas
101 kc.Client.Timeout = time.Duration(timeout) * time.Second
103 listener, err = net.Listen("tcp", listen)
105 log.Fatalf("Could not listen on %v", listen)
108 go RefreshServicesList(&kc)
110 // Shut down the server gracefully (by closing the listener)
111 // if SIGTERM is received.
112 term := make(chan os.Signal, 1)
113 go func(sig <-chan os.Signal) {
115 log.Println("caught signal:", s)
118 signal.Notify(term, syscall.SIGTERM)
119 signal.Notify(term, syscall.SIGINT)
121 log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
123 // Start listening for requests.
124 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
126 log.Println("shutting down")
129 type ApiTokenCache struct {
130 tokens map[string]int64
135 // Refresh the keep service list every five minutes.
136 func RefreshServicesList(kc *keepclient.KeepClient) {
137 var sleeptime time.Duration
139 oldservices := kc.ServiceRoots()
140 newservices, err := kc.DiscoverKeepServers()
141 if err == nil && len(newservices) > 0 {
142 s1 := fmt.Sprint(oldservices)
143 s2 := fmt.Sprint(newservices)
145 log.Printf("Updated server list to %v", s2)
147 sleeptime = 300 * time.Second
149 // There was an error, or the list is empty, so wait 3 seconds and try again.
151 log.Printf("Error retrieving server list: %v", err)
153 log.Printf("Retrieved an empty server list")
155 sleeptime = 3 * time.Second
157 time.Sleep(sleeptime)
161 // Cache the token and set an expire time. If we already have an expire time
162 // on the token, it is not updated.
163 func (this *ApiTokenCache) RememberToken(token string) {
165 defer this.lock.Unlock()
167 now := time.Now().Unix()
168 if this.tokens[token] == 0 {
169 this.tokens[token] = now + this.expireTime
173 // Check if the cached token is known and still believed to be valid.
174 func (this *ApiTokenCache) RecallToken(token string) bool {
176 defer this.lock.Unlock()
178 now := time.Now().Unix()
179 if this.tokens[token] == 0 {
182 } else if now < this.tokens[token] {
183 // Token is known and still valid
187 this.tokens[token] = 0
192 func GetRemoteAddress(req *http.Request) string {
193 if realip := req.Header.Get("X-Real-IP"); realip != "" {
194 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
195 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
200 return req.RemoteAddr
203 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
205 if auth = req.Header.Get("Authorization"); auth == "" {
209 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
215 if cache.RecallToken(tok) {
216 // Valid in the cache, short circut
222 if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
223 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
227 // Success! Update cache
228 cache.RememberToken(tok)
233 type GetBlockHandler struct {
234 *keepclient.KeepClient
238 type PutBlockHandler struct {
239 *keepclient.KeepClient
243 type InvalidPathHandler struct{}
245 type OptionsHandler struct{}
248 // Returns a mux.Router that passes GET and PUT requests to the
249 // appropriate handlers.
254 kc *keepclient.KeepClient) *mux.Router {
256 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
258 rest := mux.NewRouter()
261 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
262 GetBlockHandler{kc, t}).Methods("GET", "HEAD")
263 rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
267 rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
268 rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
269 rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
270 rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
271 rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
274 rest.NotFoundHandler = InvalidPathHandler{}
279 func SetCorsHeaders(resp http.ResponseWriter) {
280 resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
281 resp.Header().Set("Access-Control-Allow-Origin", "*")
282 resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
283 resp.Header().Set("Access-Control-Max-Age", "86486400")
286 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
287 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
288 http.Error(resp, "Bad request", http.StatusBadRequest)
291 func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
292 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
296 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
299 kc := *this.KeepClient
301 hash := mux.Vars(req)["hash"]
302 hints := mux.Vars(req)["hints"]
304 locator := keepclient.MakeLocator2(hash, hints)
306 log.Printf("%s: %s %s begin", GetRemoteAddress(req), req.Method, hash)
310 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
311 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
315 // Copy ArvadosClient struct and use the client's API token
316 arvclient := *kc.Arvados
317 arvclient.ApiToken = tok
318 kc.Arvados = &arvclient
320 var reader io.ReadCloser
324 if req.Method == "GET" {
325 reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
329 } else if req.Method == "HEAD" {
330 blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
334 log.Printf("%s: %s %s Keep server did not return Content-Length",
335 GetRemoteAddress(req), req.Method, hash)
341 status = http.StatusOK
342 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
344 n, err2 := io.Copy(resp, reader)
345 if blocklen > -1 && n != blocklen {
346 log.Printf("%s: %s %s %v %v mismatched copy size expected Content-Length: %v",
347 GetRemoteAddress(req), req.Method, hash, status, n, blocklen)
348 } else if err2 == nil {
349 log.Printf("%s: %s %s %v %v",
350 GetRemoteAddress(req), req.Method, hash, status, n)
352 log.Printf("%s: %s %s %v %v copy error: %v",
353 GetRemoteAddress(req), req.Method, hash, status, n, err2.Error())
356 log.Printf("%s: %s %s %v 0", GetRemoteAddress(req), req.Method, hash, status)
358 case keepclient.BlockNotFound:
359 status = http.StatusNotFound
360 http.Error(resp, "Not Found", http.StatusNotFound)
362 status = http.StatusBadGateway
363 http.Error(resp, err.Error(), http.StatusBadGateway)
367 log.Printf("%s: %s %s %v error: %v",
368 GetRemoteAddress(req), req.Method, hash, status, err.Error())
372 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
375 kc := *this.KeepClient
377 hash := mux.Vars(req)["hash"]
378 hints := mux.Vars(req)["hints"]
380 locator := keepclient.MakeLocator2(hash, hints)
382 var contentLength int64 = -1
383 if req.Header.Get("Content-Length") != "" {
384 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
386 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
391 log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
393 if contentLength < 0 {
394 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
398 if locator.Size > 0 && int64(locator.Size) != contentLength {
399 http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
405 if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
406 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
410 // Copy ArvadosClient struct and use the client's API token
411 arvclient := *kc.Arvados
412 arvclient.ApiToken = tok
413 kc.Arvados = &arvclient
415 // Check if the client specified the number of replicas
416 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
418 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
424 // Now try to put the block through
428 if bytes, err := ioutil.ReadAll(req.Body); err != nil {
429 msg := fmt.Sprintf("Error reading request body: %s", err)
431 http.Error(resp, msg, http.StatusInternalServerError)
434 hash, replicas, put_err = kc.PutB(bytes)
437 hash, replicas, put_err = kc.PutHR(hash, req.Body, contentLength)
440 // Tell the client how many successful PUTs we accomplished
441 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
445 // Default will return http.StatusOK
446 log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
447 n, err2 := io.WriteString(resp, hash)
449 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
452 case keepclient.OversizeBlockError:
454 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
456 case keepclient.InsufficientReplicasError:
458 // At least one write is considered success. The
459 // client can decide if getting less than the number of
460 // replications it asked for is a fatal error.
461 // Default will return http.StatusOK
462 n, err2 := io.WriteString(resp, hash)
464 log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
467 http.Error(resp, put_err.Error(), http.StatusServiceUnavailable)
471 http.Error(resp, put_err.Error(), http.StatusBadGateway)
475 log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, put_err.Error())