import (
"arvados.org/keepclient"
+ "arvados.org/sdk"
"flag"
"fmt"
"github.com/gorilla/mux"
"net"
"net/http"
"os"
+ "os/signal"
"sync"
+ "syscall"
"time"
)
flagset.Parse(os.Args[1:])
- kc, err := keepclient.MakeKeepClient()
+ arv, err := sdk.MakeArvadosClient()
+ if err != nil {
+ log.Fatalf("Error setting up arvados client %s", err.Error())
+ }
+
+ kc, err := keepclient.MakeKeepClient(&arv)
if err != nil {
log.Fatalf("Error setting up keep client %s", err.Error())
}
go RefreshServicesList(&kc)
+ // Shut down the server gracefully (by closing the listener)
+ // if SIGTERM is received.
+ term := make(chan os.Signal, 1)
+ go func(sig <-chan os.Signal) {
+ s := <-sig
+ log.Println("caught signal:", s)
+ listener.Close()
+ }(term)
+ signal.Notify(term, syscall.SIGTERM)
+ signal.Notify(term, syscall.SIGINT)
+
+ if pidfile != "" {
+ f, err := os.Create(pidfile)
+ if err == nil {
+ fmt.Fprint(f, os.Getpid())
+ f.Close()
+ } else {
+ log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
+ }
+ }
+
log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
// Start listening for requests.
http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
+
+ log.Println("shutting down")
+
+ if pidfile != "" {
+ os.Remove(pidfile)
+ }
}
type ApiTokenCache struct {
return true
}
- var usersreq *http.Request
-
- if usersreq, err = http.NewRequest("HEAD", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
- // Can't construct the request
+ arv := *kc.Arvados
+ arv.ApiToken = tok
+ if err := arv.Call("HEAD", "users", "", "current", nil, nil); err != nil {
log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
return false
}
- // Add api token header
- usersreq.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", tok))
-
- // Actually make the request
- var resp *http.Response
- if resp, err = kc.Client.Do(usersreq); err != nil {
- // Something else failed
- log.Printf("%s: CheckAuthorizationHeader error connecting to API server: %v", GetRemoteAddress(req), err.Error())
- return false
- }
-
- if resp.StatusCode != http.StatusOK {
- // Bad status
- log.Printf("%s: CheckAuthorizationHeader API server responded: %v", GetRemoteAddress(req), resp.Status)
- return false
- }
-
// Success! Update cache
cache.RememberToken(tok)
rest := mux.NewRouter()
if enable_get {
- gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
- ghsig := rest.Handle(
- `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
- GetBlockHandler{kc, t})
-
- gh.Methods("GET", "HEAD")
- ghsig.Methods("GET", "HEAD")
+ rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
+ GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+ rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
}
if enable_put {
+ rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
}
kc := *this.KeepClient
hash := mux.Vars(req)["hash"]
- signature := mux.Vars(req)["signature"]
- timestamp := mux.Vars(req)["timestamp"]
+ hints := mux.Vars(req)["hints"]
+
+ locator := keepclient.MakeLocator2(hash, hints)
log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
var blocklen int64
if req.Method == "GET" {
- reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
+ reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
defer reader.Close()
} else if req.Method == "HEAD" {
- blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
+ blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
}
- resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
+ if blocklen > 0 {
+ resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
+ }
switch err {
case nil:
if reader != nil {
n, err2 := io.Copy(resp, reader)
if n != blocklen {
- log.Printf("%s: %s %s mismatched return %v with Content-Length %v error", GetRemoteAddress(req), req.Method, hash, n, blocklen, err.Error())
+ log.Printf("%s: %s %s mismatched return %v with Content-Length %v error %v", GetRemoteAddress(req), req.Method, hash, n, blocklen, err2)
} else if err2 == nil {
log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
} else {
kc := *this.KeepClient
hash := mux.Vars(req)["hash"]
+ hints := mux.Vars(req)["hints"]
+
+ locator := keepclient.MakeLocator2(hash, hints)
var contentLength int64 = -1
if req.Header.Get("Content-Length") != "" {
return
}
+ if locator.Size > 0 && int64(locator.Size) != contentLength {
+ http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
+ return
+ }
+
if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
return
}
// Now try to put the block through
- replicas, err := kc.PutHR(hash, req.Body, contentLength)
+ hash, replicas, err := kc.PutHR(hash, req.Body, contentLength)
// Tell the client how many successful PUTs we accomplished
resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
case nil:
// Default will return http.StatusOK
log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
+ n, err2 := io.WriteString(resp, hash)
+ if err2 != nil {
+ log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
+ }
case keepclient.OversizeBlockError:
// Too much data
// client can decide if getting less than the number of
// replications it asked for is a fatal error.
// Default will return http.StatusOK
+ n, err2 := io.WriteString(resp, hash)
+ if err2 != nil {
+ log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
+ }
} else {
http.Error(resp, "", http.StatusServiceUnavailable)
}