4 "arvados.org/keepclient"
7 "github.com/gorilla/mux"
19 // Default TCP address on which to listen for requests.
20 // Initialized by the -listen flag.
21 const DEFAULT_ADDR = ":25107"
23 var listener net.Listener
38 "Interface on which to listen for requests, in the format "+
39 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
40 "to listen on all network interfaces.")
46 "If set, disable GET operations")
52 "If set, disable PUT operations")
58 "Default number of replicas to write if not specified by the client.")
64 "Path to write pid file")
68 kc, err := keepclient.MakeKeepClient()
70 log.Fatalf("Error setting up keep client %s", err.Error())
74 f, err := os.Create(pidfile)
76 fmt.Fprint(f, os.Getpid())
79 log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
83 kc.Want_replicas = default_replicas
85 listener, err = net.Listen("tcp", listen)
87 log.Fatalf("Could not listen on %v", listen)
90 go RefreshServicesList(&kc)
92 // Shut down the server gracefully (by closing the listener)
93 // if SIGTERM is received.
94 term := make(chan os.Signal, 1)
95 go func(sig <-chan os.Signal) {
97 log.Println("caught signal:", s)
100 signal.Notify(term, syscall.SIGTERM)
103 f, err := os.Create(pidfile)
105 fmt.Fprint(f, os.Getpid())
108 log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
112 log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
114 // Start listening for requests.
115 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
117 log.Println("shutting down")
124 type ApiTokenCache struct {
125 tokens map[string]int64
130 // Refresh the keep service list every five minutes.
131 func RefreshServicesList(kc *keepclient.KeepClient) {
133 time.Sleep(300 * time.Second)
134 oldservices := kc.ServiceRoots()
135 kc.DiscoverKeepServers()
136 newservices := kc.ServiceRoots()
137 s1 := fmt.Sprint(oldservices)
138 s2 := fmt.Sprint(newservices)
140 log.Printf("Updated server list to %v", s2)
145 // Cache the token and set an expire time. If we already have an expire time
146 // on the token, it is not updated.
147 func (this *ApiTokenCache) RememberToken(token string) {
149 defer this.lock.Unlock()
151 now := time.Now().Unix()
152 if this.tokens[token] == 0 {
153 this.tokens[token] = now + this.expireTime
157 // Check if the cached token is known and still believed to be valid.
158 func (this *ApiTokenCache) RecallToken(token string) bool {
160 defer this.lock.Unlock()
162 now := time.Now().Unix()
163 if this.tokens[token] == 0 {
166 } else if now < this.tokens[token] {
167 // Token is known and still valid
171 this.tokens[token] = 0
176 func GetRemoteAddress(req *http.Request) string {
177 if realip := req.Header.Get("X-Real-IP"); realip != "" {
178 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
179 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
184 return req.RemoteAddr
187 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) bool {
189 if auth = req.Header.Get("Authorization"); auth == "" {
194 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
200 if cache.RecallToken(tok) {
201 // Valid in the cache, short circut
205 var usersreq *http.Request
207 if usersreq, err = http.NewRequest("HEAD", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
208 // Can't construct the request
209 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
213 // Add api token header
214 usersreq.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", tok))
216 // Actually make the request
217 var resp *http.Response
218 if resp, err = kc.Client.Do(usersreq); err != nil {
219 // Something else failed
220 log.Printf("%s: CheckAuthorizationHeader error connecting to API server: %v", GetRemoteAddress(req), err.Error())
224 if resp.StatusCode != http.StatusOK {
226 log.Printf("%s: CheckAuthorizationHeader API server responded: %v", GetRemoteAddress(req), resp.Status)
230 // Success! Update cache
231 cache.RememberToken(tok)
236 type GetBlockHandler struct {
237 *keepclient.KeepClient
241 type PutBlockHandler struct {
242 *keepclient.KeepClient
246 type InvalidPathHandler struct{}
249 // Returns a mux.Router that passes GET and PUT requests to the
250 // appropriate handlers.
255 kc *keepclient.KeepClient) *mux.Router {
257 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
259 rest := mux.NewRouter()
260 gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
261 ghsig := rest.Handle(
262 `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
263 GetBlockHandler{kc, t})
264 ph := rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t})
267 gh.Methods("GET", "HEAD")
268 ghsig.Methods("GET", "HEAD")
275 rest.NotFoundHandler = InvalidPathHandler{}
280 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
281 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
282 http.Error(resp, "Bad request", http.StatusBadRequest)
285 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
287 kc := *this.KeepClient
289 hash := mux.Vars(req)["hash"]
290 signature := mux.Vars(req)["signature"]
291 timestamp := mux.Vars(req)["timestamp"]
293 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
295 if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
296 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
300 var reader io.ReadCloser
304 if req.Method == "GET" {
305 reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
307 } else if req.Method == "HEAD" {
308 blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
311 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
316 n, err2 := io.Copy(resp, reader)
318 log.Printf("%s: %s %s mismatched return %v with Content-Length %v error", GetRemoteAddress(req), req.Method, hash, n, blocklen, err.Error())
319 } else if err2 == nil {
320 log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
322 log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
325 log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
327 case keepclient.BlockNotFound:
328 http.Error(resp, "Not found", http.StatusNotFound)
330 http.Error(resp, err.Error(), http.StatusBadGateway)
334 log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
338 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
340 kc := *this.KeepClient
342 hash := mux.Vars(req)["hash"]
344 var contentLength int64 = -1
345 if req.Header.Get("Content-Length") != "" {
346 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
348 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
353 log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
355 if contentLength < 1 {
356 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
360 if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
361 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
365 // Check if the client specified the number of replicas
366 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
368 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
374 // Now try to put the block through
375 replicas, err := kc.PutHR(hash, req.Body, contentLength)
377 // Tell the client how many successful PUTs we accomplished
378 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
382 // Default will return http.StatusOK
383 log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
385 case keepclient.OversizeBlockError:
387 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
389 case keepclient.InsufficientReplicasError:
391 // At least one write is considered success. The
392 // client can decide if getting less than the number of
393 // replications it asked for is a fatal error.
394 // Default will return http.StatusOK
396 http.Error(resp, "", http.StatusServiceUnavailable)
400 http.Error(resp, err.Error(), http.StatusBadGateway)
404 log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, err.Error())