4 "arvados.org/keepclient"
7 "github.com/gorilla/mux"
17 // Default TCP address on which to listen for requests.
18 // Initialized by the -listen flag.
19 const DEFAULT_ADDR = ":25107"
21 var listener net.Listener
36 "Interface on which to listen for requests, in the format "+
37 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
38 "to listen on all network interfaces.")
44 "If set, disable GET operations")
50 "If set, disable PUT operations")
56 "Default number of replicas to write if not specified by the client.")
62 "Path to write pid file")
66 kc, err := keepclient.MakeKeepClient()
72 f, err := os.Create(pidfile)
74 fmt.Fprint(f, os.Getpid())
77 log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
81 kc.Want_replicas = default_replicas
83 listener, err = net.Listen("tcp", listen)
85 log.Fatalf("Could not listen on %v", listen)
88 go RefreshServicesList(&kc)
90 // Start listening for requests.
91 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
94 type ApiTokenCache struct {
95 tokens map[string]int64
100 // Refresh the keep service list every five minutes.
101 func RefreshServicesList(kc *keepclient.KeepClient) {
103 time.Sleep(300 * time.Second)
104 kc.DiscoverKeepServers()
108 // Cache the token and set an expire time. If we already have an expire time
109 // on the token, it is not updated.
110 func (this *ApiTokenCache) RememberToken(token string) {
112 defer this.lock.Unlock()
114 now := time.Now().Unix()
115 if this.tokens[token] == 0 {
116 this.tokens[token] = now + this.expireTime
120 // Check if the cached token is known and still believed to be valid.
121 func (this *ApiTokenCache) RecallToken(token string) bool {
123 defer this.lock.Unlock()
125 now := time.Now().Unix()
126 if this.tokens[token] == 0 {
129 } else if now < this.tokens[token] {
130 // Token is known and still valid
134 this.tokens[token] = 0
139 func GetRemoteAddress(req *http.Request) string {
140 if realip := req.Header.Get("X-Real-IP"); realip != "" {
141 if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
142 return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
147 return req.RemoteAddr
150 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) bool {
152 if auth = req.Header.Get("Authorization"); auth == "" {
157 _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
163 if cache.RecallToken(tok) {
164 // Valid in the cache, short circut
168 var usersreq *http.Request
170 if usersreq, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
171 // Can't construct the request
172 log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
176 // Add api token header
177 usersreq.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", tok))
179 // Actually make the request
180 var resp *http.Response
181 if resp, err = kc.Client.Do(usersreq); err != nil {
182 // Something else failed
183 log.Printf("%s: CheckAuthorizationHeader error connecting to API server: %v", GetRemoteAddress(req), err.Error())
187 if resp.StatusCode != http.StatusOK {
189 log.Printf("%s: CheckAuthorizationHeader API server responded: %v", GetRemoteAddress(req), resp.Status)
193 // Success! Update cache
194 cache.RememberToken(tok)
199 type GetBlockHandler struct {
200 *keepclient.KeepClient
204 type PutBlockHandler struct {
205 *keepclient.KeepClient
210 // Returns a mux.Router that passes GET and PUT requests to the
211 // appropriate handlers.
216 kc *keepclient.KeepClient) *mux.Router {
218 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
220 rest := mux.NewRouter()
221 gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
222 ghsig := rest.Handle(
223 `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
224 GetBlockHandler{kc, t})
225 ph := rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t})
228 gh.Methods("GET", "HEAD")
229 ghsig.Methods("GET", "HEAD")
239 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
241 kc := *this.KeepClient
243 hash := mux.Vars(req)["hash"]
244 signature := mux.Vars(req)["signature"]
245 timestamp := mux.Vars(req)["timestamp"]
247 log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
249 if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
250 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
254 var reader io.ReadCloser
258 if req.Method == "GET" {
259 reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
261 } else if req.Method == "HEAD" {
262 blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
265 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
270 n, err2 := io.Copy(resp, reader)
272 log.Printf("%s: %s %s mismatched return %v with Content-Length %v error", GetRemoteAddress(req), req.Method, hash, n, blocklen, err.Error())
273 } else if err2 == nil {
274 log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
276 log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
279 log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
281 case keepclient.BlockNotFound:
282 http.Error(resp, "Not found", http.StatusNotFound)
284 http.Error(resp, err.Error(), http.StatusBadGateway)
288 log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
292 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
294 kc := *this.KeepClient
296 hash := mux.Vars(req)["hash"]
298 var contentLength int64 = -1
299 if req.Header.Get("Content-Length") != "" {
300 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
302 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
307 log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
309 if contentLength < 1 {
310 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
314 if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
315 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
319 // Check if the client specified the number of replicas
320 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
322 _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
328 // Now try to put the block through
329 replicas, err := kc.PutHR(hash, req.Body, contentLength)
331 // Tell the client how many successful PUTs we accomplished
332 resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
336 // Default will return http.StatusOK
337 log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
339 case keepclient.OversizeBlockError:
341 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
343 case keepclient.InsufficientReplicasError:
345 // At least one write is considered success. The
346 // client can decide if getting less than the number of
347 // replications it asked for is a fatal error.
348 // Default will return http.StatusOK
350 http.Error(resp, "", http.StatusServiceUnavailable)
354 http.Error(resp, err.Error(), http.StatusBadGateway)
358 log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, err.Error())