4 "arvados.org/keepclient"
7 "github.com/gorilla/mux"
17 // Default TCP address on which to listen for requests.
18 // Initialized by the -listen flag.
19 const DEFAULT_ADDR = ":25107"
21 var listener net.Listener
36 "Interface on which to listen for requests, in the format "+
37 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
38 "to listen on all network interfaces.")
44 "If set, disable GET operations")
50 "If set, disable PUT operations")
56 "Default number of replicas to write if not specified by the client.")
62 "Path to write pid file")
66 kc, err := keepclient.MakeKeepClient()
68 log.Fatalf("Error setting up keep client %s", err.Error())
72 f, err := os.Create(pidfile)
74 fmt.Fprint(f, os.Getpid())
77 log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
81 kc.Want_replicas = default_replicas
83 listener, err = net.Listen("tcp", listen)
85 log.Fatalf("Could not listen on %v", listen)
88 go RefreshServicesList(&kc)
90 log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
92 // Start listening for requests.
93 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
96 type ApiTokenCache struct {
97 tokens map[string]int64
102 // Refresh the keep service list every five minutes.
103 func RefreshServicesList(kc *keepclient.KeepClient) {
105 time.Sleep(300 * time.Second)
106 oldservices := kc.ServiceRoots()
107 kc.DiscoverKeepServers()
108 newservices := kc.ServiceRoots()
109 s1 := fmt.Sprint(oldservices)
110 s2 := fmt.Sprint(newservices)
112 log.Printf("Updated server list to %v", s2)
117 // Cache the token and set an expire time. If we already have an expire time
118 // on the token, it is not updated.
119 func (this *ApiTokenCache) RememberToken(token string) {
121 defer this.lock.Unlock()
123 now := time.Now().Unix()
124 if this.tokens[token] == 0 {
125 this.tokens[token] = now + this.expireTime
129 // Check if the cached token is known and still believed to be valid.
130 func (this *ApiTokenCache) RecallToken(token string) bool {
132 defer this.lock.Unlock()
134 now := time.Now().Unix()
135 if this.tokens[token] == 0 {
138 } else if now < this.tokens[token] {
139 // Token is known and still valid
143 this.tokens[token] = 0
148 func GetRemoteAddress(req *http.Request) string {
149 if realip := req.Header.Get("X-Real-IP"); realip != "" {
150 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
151 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
156 return req.RemoteAddr
159 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) bool {
161 if auth = req.Header.Get("Authorization"); auth == "" {
166 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
172 if cache.RecallToken(tok) {
173 // Valid in the cache, short circut
177 var usersreq *http.Request
179 if usersreq, err = http.NewRequest("HEAD", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
180 // Can't construct the request
181 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
185 // Add api token header
186 usersreq.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", tok))
188 // Actually make the request
189 var resp *http.Response
190 if resp, err = kc.Client.Do(usersreq); err != nil {
191 // Something else failed
192 log.Printf("%s: CheckAuthorizationHeader error connecting to API server: %v", GetRemoteAddress(req), err.Error())
196 if resp.StatusCode != http.StatusOK {
198 log.Printf("%s: CheckAuthorizationHeader API server responded: %v", GetRemoteAddress(req), resp.Status)
202 // Success! Update cache
203 cache.RememberToken(tok)
208 type GetBlockHandler struct {
209 *keepclient.KeepClient
213 type PutBlockHandler struct {
214 *keepclient.KeepClient
218 type InvalidPathHandler struct{}
221 // Returns a mux.Router that passes GET and PUT requests to the
222 // appropriate handlers.
227 kc *keepclient.KeepClient) *mux.Router {
229 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
231 rest := mux.NewRouter()
232 gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
233 ghsig := rest.Handle(
234 `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
235 GetBlockHandler{kc, t})
236 ph := rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t})
239 gh.Methods("GET", "HEAD")
240 ghsig.Methods("GET", "HEAD")
247 rest.NotFoundHandler = InvalidPathHandler{}
252 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
253 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
254 http.Error(resp, "Bad request", http.StatusBadRequest)
257 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
259 kc := *this.KeepClient
261 hash := mux.Vars(req)["hash"]
262 signature := mux.Vars(req)["signature"]
263 timestamp := mux.Vars(req)["timestamp"]
265 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
267 if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
268 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
272 var reader io.ReadCloser
276 if req.Method == "GET" {
277 reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
279 } else if req.Method == "HEAD" {
280 blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
283 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
288 n, err2 := io.Copy(resp, reader)
290 log.Printf("%s: %s %s mismatched return %v with Content-Length %v error", GetRemoteAddress(req), req.Method, hash, n, blocklen, err.Error())
291 } else if err2 == nil {
292 log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
294 log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
297 log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
299 case keepclient.BlockNotFound:
300 http.Error(resp, "Not found", http.StatusNotFound)
302 http.Error(resp, err.Error(), http.StatusBadGateway)
306 log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
310 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
312 kc := *this.KeepClient
314 hash := mux.Vars(req)["hash"]
316 var contentLength int64 = -1
317 if req.Header.Get("Content-Length") != "" {
318 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
320 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
325 log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
327 if contentLength < 1 {
328 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
332 if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
333 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
337 // Check if the client specified the number of replicas
338 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
340 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
346 // Now try to put the block through
347 replicas, err := kc.PutHR(hash, req.Body, contentLength)
349 // Tell the client how many successful PUTs we accomplished
350 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
354 // Default will return http.StatusOK
355 log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
357 case keepclient.OversizeBlockError:
359 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
361 case keepclient.InsufficientReplicasError:
363 // At least one write is considered success. The
364 // client can decide if getting less than the number of
365 // replications it asked for is a fatal error.
366 // Default will return http.StatusOK
368 http.Error(resp, "", http.StatusServiceUnavailable)
372 http.Error(resp, err.Error(), http.StatusBadGateway)
376 log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, err.Error())