4 "arvados.org/keepclient"
7 "github.com/gorilla/mux"
17 // Default TCP address on which to listen for requests.
18 // Initialized by the -listen flag.
19 const DEFAULT_ADDR = ":25107"
21 var listener net.Listener
37 "Interface on which to listen for requests, in the format "+
38 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
39 "to listen on all network interfaces.")
45 "If set, disable GET operations")
51 "If set, disable PUT operations")
57 "If set, disable HEAD operations")
63 "Default number of replicas to write if not specified by the client.")
69 "Path to write pid file")
73 /*if no_get == false || no_head == false {
74 log.Print("Must specify -no-get and -no-head")
78 kc, err := keepclient.MakeKeepClient()
85 f, err := os.Create(pidfile)
87 fmt.Fprint(f, os.Getpid())
90 log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
94 kc.Want_replicas = default_replicas
96 listener, err = net.Listen("tcp", listen)
98 log.Printf("Could not listen on %v", listen)
102 // Start listening for requests.
103 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, !no_head, kc))
106 type ApiTokenCache struct {
107 tokens map[string]int64
112 // Cache the token and set an expire time. If we already have an expire time
113 // on the token, it is not updated.
114 func (this *ApiTokenCache) RememberToken(token string) {
116 defer this.lock.Unlock()
118 now := time.Now().Unix()
119 if this.tokens[token] == 0 {
120 this.tokens[token] = now + this.expireTime
124 // Check if the cached token is known and still believed to be valid.
125 func (this *ApiTokenCache) RecallToken(token string) bool {
127 defer this.lock.Unlock()
129 now := time.Now().Unix()
130 if this.tokens[token] == 0 {
133 } else if now < this.tokens[token] {
134 // Token is known and still valid
138 this.tokens[token] = 0
143 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) bool {
144 if req.Header.Get("Authorization") == "" {
149 _, err := fmt.Sscanf(req.Header.Get("Authorization"), "OAuth2 %s", &tok)
155 if cache.RecallToken(tok) {
156 // Valid in the cache, short circut
160 var usersreq *http.Request
162 if usersreq, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
163 // Can't construct the request
164 log.Print("CheckAuthorizationHeader error: %v", err)
168 // Add api token header
169 usersreq.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", tok))
171 // Actually make the request
172 var resp *http.Response
173 if resp, err = kc.Client.Do(usersreq); err != nil {
174 // Something else failed
175 log.Print("CheckAuthorizationHeader error: %v", err)
179 if resp.StatusCode != http.StatusOK {
184 // Success! Update cache
185 cache.RememberToken(tok)
190 type GetBlockHandler struct {
191 keepclient.KeepClient
195 type PutBlockHandler struct {
196 keepclient.KeepClient
201 // Returns a mux.Router that passes GET and PUT requests to the
202 // appropriate handlers.
208 kc keepclient.KeepClient) *mux.Router {
210 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
212 rest := mux.NewRouter()
213 gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
214 ghsig := rest.Handle(
215 `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
216 GetBlockHandler{kc, t})
217 ph := rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t})
230 ghsig.Methods("HEAD")
236 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
237 hash := mux.Vars(req)["hash"]
238 signature := mux.Vars(req)["signature"]
239 timestamp := mux.Vars(req)["timestamp"]
241 var reader io.ReadCloser
245 if req.Method == "GET" {
246 reader, blocklen, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
248 } else if req.Method == "HEAD" {
249 blocklen, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
252 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
257 io.Copy(resp, reader)
259 case keepclient.BlockNotFound:
260 http.Error(resp, "Not found", http.StatusNotFound)
262 http.Error(resp, err.Error(), http.StatusBadGateway)
266 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
267 if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
268 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
271 hash := mux.Vars(req)["hash"]
273 var contentLength int64 = -1
274 if req.Header.Get("Content-Length") != "" {
275 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
277 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
282 if contentLength < 1 {
283 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
287 // Check if the client specified the number of replicas
288 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
290 _, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
292 this.KeepClient.Want_replicas = r
296 // Now try to put the block through
297 replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength)
299 log.Printf("Replicas stored: %v err: %v", replicas, err)
301 // Tell the client how many successful PUTs we accomplished
302 resp.Header().Set("X-Keep-Replicas-Stored", fmt.Sprintf("%d", replicas))
306 // Default will return http.StatusOK
308 case keepclient.OversizeBlockError:
310 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
312 case keepclient.InsufficientReplicasError:
314 // At least one write is considered success. The
315 // client can decide if getting less than the number of
316 // replications it asked for is a fatal error.
317 // Default will return http.StatusOK
319 http.Error(resp, "", http.StatusServiceUnavailable)
323 http.Error(resp, err.Error(), http.StatusBadGateway)