4 "arvados.org/keepclient"
7 "github.com/gorilla/mux"
17 // Default TCP address on which to listen for requests.
18 // Initialized by the -listen flag.
19 const DEFAULT_ADDR = ":25107"
21 var listener net.Listener
32 flagset := flag.NewFlagSet("default", flag.ExitOnError)
38 "Interface on which to listen for requests, in the format "+
39 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
40 "to listen on all network interfaces.")
46 "If set, disable GET operations")
52 "If set, disable PUT operations")
58 "Default number of replicas to write if not specified by the client.")
64 "Path to write pid file")
66 flagset.Parse(os.Args[1:])
68 kc, err := keepclient.MakeKeepClient()
70 log.Fatalf("Error setting up keep client %s", err.Error())
74 f, err := os.Create(pidfile)
76 fmt.Fprint(f, os.Getpid())
79 log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
83 kc.Want_replicas = default_replicas
85 listener, err = net.Listen("tcp", listen)
87 log.Fatalf("Could not listen on %v", listen)
90 go RefreshServicesList(&kc)
92 log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
94 // Start listening for requests.
95 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
98 type ApiTokenCache struct {
99 tokens map[string]int64
104 // Refresh the keep service list every five minutes.
105 func RefreshServicesList(kc *keepclient.KeepClient) {
107 time.Sleep(300 * time.Second)
108 oldservices := kc.ServiceRoots()
109 kc.DiscoverKeepServers()
110 newservices := kc.ServiceRoots()
111 s1 := fmt.Sprint(oldservices)
112 s2 := fmt.Sprint(newservices)
114 log.Printf("Updated server list to %v", s2)
119 // Cache the token and set an expire time. If we already have an expire time
120 // on the token, it is not updated.
121 func (this *ApiTokenCache) RememberToken(token string) {
123 defer this.lock.Unlock()
125 now := time.Now().Unix()
126 if this.tokens[token] == 0 {
127 this.tokens[token] = now + this.expireTime
131 // Check if the cached token is known and still believed to be valid.
132 func (this *ApiTokenCache) RecallToken(token string) bool {
134 defer this.lock.Unlock()
136 now := time.Now().Unix()
137 if this.tokens[token] == 0 {
140 } else if now < this.tokens[token] {
141 // Token is known and still valid
145 this.tokens[token] = 0
150 func GetRemoteAddress(req *http.Request) string {
151 if realip := req.Header.Get("X-Real-IP"); realip != "" {
152 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
153 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
158 return req.RemoteAddr
161 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) bool {
163 if auth = req.Header.Get("Authorization"); auth == "" {
168 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
174 if cache.RecallToken(tok) {
175 // Valid in the cache, short circut
179 var usersreq *http.Request
181 if usersreq, err = http.NewRequest("HEAD", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
182 // Can't construct the request
183 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
187 // Add api token header
188 usersreq.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", tok))
190 // Actually make the request
191 var resp *http.Response
192 if resp, err = kc.Client.Do(usersreq); err != nil {
193 // Something else failed
194 log.Printf("%s: CheckAuthorizationHeader error connecting to API server: %v", GetRemoteAddress(req), err.Error())
198 if resp.StatusCode != http.StatusOK {
200 log.Printf("%s: CheckAuthorizationHeader API server responded: %v", GetRemoteAddress(req), resp.Status)
204 // Success! Update cache
205 cache.RememberToken(tok)
210 type GetBlockHandler struct {
211 *keepclient.KeepClient
215 type PutBlockHandler struct {
216 *keepclient.KeepClient
220 type InvalidPathHandler struct{}
223 // Returns a mux.Router that passes GET and PUT requests to the
224 // appropriate handlers.
229 kc *keepclient.KeepClient) *mux.Router {
231 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
233 rest := mux.NewRouter()
236 gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
237 ghsig := rest.Handle(
238 `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
239 GetBlockHandler{kc, t})
241 gh.Methods("GET", "HEAD")
242 ghsig.Methods("GET", "HEAD")
246 rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
249 rest.NotFoundHandler = InvalidPathHandler{}
254 func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
255 log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
256 http.Error(resp, "Bad request", http.StatusBadRequest)
259 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
261 kc := *this.KeepClient
263 hash := mux.Vars(req)["hash"]
264 signature := mux.Vars(req)["signature"]
265 timestamp := mux.Vars(req)["timestamp"]
267 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
269 if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
270 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
274 var reader io.ReadCloser
278 if req.Method == "GET" {
279 reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
281 } else if req.Method == "HEAD" {
282 blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
285 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
290 n, err2 := io.Copy(resp, reader)
292 log.Printf("%s: %s %s mismatched return %v with Content-Length %v error", GetRemoteAddress(req), req.Method, hash, n, blocklen, err.Error())
293 } else if err2 == nil {
294 log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
296 log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
299 log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
301 case keepclient.BlockNotFound:
302 http.Error(resp, "Not found", http.StatusNotFound)
304 http.Error(resp, err.Error(), http.StatusBadGateway)
308 log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
312 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
314 kc := *this.KeepClient
316 hash := mux.Vars(req)["hash"]
318 var contentLength int64 = -1
319 if req.Header.Get("Content-Length") != "" {
320 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
322 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
327 log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
329 if contentLength < 1 {
330 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
334 if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
335 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
339 // Check if the client specified the number of replicas
340 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
342 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
348 // Now try to put the block through
349 replicas, err := kc.PutHR(hash, req.Body, contentLength)
351 // Tell the client how many successful PUTs we accomplished
352 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
356 // Default will return http.StatusOK
357 log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
359 case keepclient.OversizeBlockError:
361 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
363 case keepclient.InsufficientReplicasError:
365 // At least one write is considered success. The
366 // client can decide if getting less than the number of
367 // replications it asked for is a fatal error.
368 // Default will return http.StatusOK
370 http.Error(resp, "", http.StatusServiceUnavailable)
374 http.Error(resp, err.Error(), http.StatusBadGateway)
378 log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, err.Error())