4 "arvados.org/keepclient"
7 "github.com/gorilla/mux"
17 // Default TCP address on which to listen for requests.
18 // Initialized by the -listen flag.
19 const DEFAULT_ADDR = ":25107"
21 var listener net.Listener
36 "Interface on which to listen for requests, in the format "+
37 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
38 "to listen on all network interfaces.")
44 "If set, disable GET operations")
50 "If set, disable PUT operations")
56 "Default number of replicas to write if not specified by the client.")
62 "Path to write pid file")
66 /*if no_get == false {
67 log.Print("Must specify -no-get")
71 kc, err := keepclient.MakeKeepClient()
78 f, err := os.Create(pidfile)
80 fmt.Fprint(f, os.Getpid())
83 log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
87 kc.Want_replicas = default_replicas
89 listener, err = net.Listen("tcp", listen)
91 log.Printf("Could not listen on %v", listen)
95 go RefreshServicesList(&kc)
97 // Start listening for requests.
98 http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
101 type ApiTokenCache struct {
102 tokens map[string]int64
107 // Refresh the keep service list every five minutes.
108 func RefreshServicesList(kc *keepclient.KeepClient) {
110 time.Sleep(300 * time.Second)
111 kc.DiscoverKeepServers()
115 // Cache the token and set an expire time. If we already have an expire time
116 // on the token, it is not updated.
117 func (this *ApiTokenCache) RememberToken(token string) {
119 defer this.lock.Unlock()
121 now := time.Now().Unix()
122 if this.tokens[token] == 0 {
123 this.tokens[token] = now + this.expireTime
127 // Check if the cached token is known and still believed to be valid.
128 func (this *ApiTokenCache) RecallToken(token string) bool {
130 defer this.lock.Unlock()
132 now := time.Now().Unix()
133 if this.tokens[token] == 0 {
136 } else if now < this.tokens[token] {
137 // Token is known and still valid
141 this.tokens[token] = 0
146 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) bool {
147 if req.Header.Get("Authorization") == "" {
152 _, err := fmt.Sscanf(req.Header.Get("Authorization"), "OAuth2 %s", &tok)
158 if cache.RecallToken(tok) {
159 // Valid in the cache, short circut
163 var usersreq *http.Request
165 if usersreq, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
166 // Can't construct the request
167 log.Print("CheckAuthorizationHeader error: %v", err)
171 // Add api token header
172 usersreq.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", tok))
174 // Actually make the request
175 var resp *http.Response
176 if resp, err = kc.Client.Do(usersreq); err != nil {
177 // Something else failed
178 log.Print("CheckAuthorizationHeader error: %v", err)
182 if resp.StatusCode != http.StatusOK {
187 // Success! Update cache
188 cache.RememberToken(tok)
193 type GetBlockHandler struct {
194 *keepclient.KeepClient
198 type PutBlockHandler struct {
199 *keepclient.KeepClient
204 // Returns a mux.Router that passes GET and PUT requests to the
205 // appropriate handlers.
210 kc *keepclient.KeepClient) *mux.Router {
212 t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
214 rest := mux.NewRouter()
215 gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
216 ghsig := rest.Handle(
217 `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
218 GetBlockHandler{kc, t})
219 ph := rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t})
222 gh.Methods("GET", "HEAD")
223 ghsig.Methods("GET", "HEAD")
233 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
235 kc := *this.KeepClient
237 if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
238 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
241 hash := mux.Vars(req)["hash"]
242 signature := mux.Vars(req)["signature"]
243 timestamp := mux.Vars(req)["timestamp"]
245 var reader io.ReadCloser
249 if req.Method == "GET" {
250 reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
252 } else if req.Method == "HEAD" {
253 blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
256 resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
261 io.Copy(resp, reader)
263 case keepclient.BlockNotFound:
264 http.Error(resp, "Not found", http.StatusNotFound)
266 http.Error(resp, err.Error(), http.StatusBadGateway)
270 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
272 log.Print("PutBlockHandler start")
274 kc := *this.KeepClient
276 if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
277 http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
280 hash := mux.Vars(req)["hash"]
282 var contentLength int64 = -1
283 if req.Header.Get("Content-Length") != "" {
284 _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
286 resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
291 if contentLength < 1 {
292 http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
296 // Check if the client specified the number of replicas
297 if req.Header.Get("X-Keep-Desired-Replicas") != "" {
299 _, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
305 // Now try to put the block through
306 replicas, err := kc.PutHR(hash, req.Body, contentLength)
308 log.Printf("Replicas stored: %v err: %v", replicas, err)
310 // Tell the client how many successful PUTs we accomplished
311 resp.Header().Set("X-Keep-Replicas-Stored", fmt.Sprintf("%d", replicas))
315 // Default will return http.StatusOK
317 case keepclient.OversizeBlockError:
319 http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
321 case keepclient.InsufficientReplicasError:
323 // At least one write is considered success. The
324 // client can decide if getting less than the number of
325 // replications it asked for is a fatal error.
326 // Default will return http.StatusOK
328 http.Error(resp, "", http.StatusServiceUnavailable)
332 http.Error(resp, err.Error(), http.StatusBadGateway)