package main
import (
+ "context"
"errors"
"flag"
"fmt"
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
"git.arvados.org/arvados.git/sdk/go/httpserver"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"github.com/ghodss/yaml"
"github.com/gorilla/mux"
lru "github.com/hashicorp/golang-lru"
- log "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus"
)
var version = "dev"
const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-func configure(logger log.FieldLogger, args []string) (*arvados.Cluster, error) {
- flags := flag.NewFlagSet(args[0], flag.ExitOnError)
+func configure(args []string) (*arvados.Cluster, logrus.FieldLogger, error) {
+ prog := args[0]
+ flags := flag.NewFlagSet(prog, flag.ContinueOnError)
dumpConfig := flags.Bool("dump-config", false, "write current configuration to stdout and exit")
getVersion := flags.Bool("version", false, "Print version information and exit.")
+ initLogger := logrus.New()
+ initLogger.Formatter = &logrus.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ }
+ var logger logrus.FieldLogger = initLogger
+
loader := config.NewLoader(os.Stdin, logger)
loader.SetupFlags(flags)
-
args = loader.MungeLegacyConfigArgs(logger, args[1:], "-legacy-keepproxy-config")
- flags.Parse(args)
- // Print version information if requested
- if *getVersion {
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", os.Stderr); !ok {
+ os.Exit(code)
+ } else if *getVersion {
fmt.Printf("keepproxy %s\n", version)
- return nil, nil
+ return nil, logger, nil
}
cfg, err := loader.Load()
if err != nil {
- return nil, err
+ return nil, logger, err
}
cluster, err := cfg.GetCluster("")
if err != nil {
- return nil, err
+ return nil, logger, err
}
+ logger = ctxlog.New(os.Stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
+ "ClusterID": cluster.ClusterID,
+ "PID": os.Getpid(),
+ })
+
if *dumpConfig {
out, err := yaml.Marshal(cfg)
if err != nil {
- return nil, err
+ return nil, logger, err
}
if _, err := os.Stdout.Write(out); err != nil {
- return nil, err
+ return nil, logger, err
}
- return nil, nil
+ return nil, logger, nil
}
- return cluster, nil
+
+ return cluster, logger, nil
}
func main() {
- logger := log.New()
- logger.Formatter = &log.JSONFormatter{
- TimestampFormat: rfc3339NanoFixed,
- }
-
- cluster, err := configure(logger, os.Args)
+ cluster, logger, err := configure(os.Args)
if err != nil {
- log.Fatal(err)
+ logger.Fatal(err)
}
if cluster == nil {
return
}
- log.Printf("keepproxy %s started", version)
+ logger.Printf("keepproxy %s started", version)
if err := run(logger, cluster); err != nil {
- log.Fatal(err)
+ logger.Fatal(err)
}
- log.Println("shutting down")
+ logger.Println("shutting down")
}
-func run(logger log.FieldLogger, cluster *arvados.Cluster) error {
+func run(logger logrus.FieldLogger, cluster *arvados.Cluster) error {
client, err := arvados.NewClientFromConfig(cluster)
if err != nil {
return err
}
if cluster.SystemLogs.LogLevel == "debug" {
- keepclient.DebugPrintf = log.Printf
+ keepclient.DebugPrintf = logger.Printf
}
kc, err := keepclient.MakeKeepClient(arv)
if err != nil {
}
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
- log.Printf("Error notifying init daemon: %v", err)
+ logger.Printf("Error notifying init daemon: %v", err)
}
- log.Println("listening at", listener.Addr())
+ logger.Println("listening at", listener.Addr())
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
term := make(chan os.Signal, 1)
go func(sig <-chan os.Signal) {
s := <-sig
- log.Println("caught signal:", s)
+ logger.Println("caught signal:", s)
listener.Close()
}(term)
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
// Start serving requests.
- router = MakeRESTRouter(kc, time.Duration(keepclient.DefaultProxyRequestTimeout), cluster, logger)
- return http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
+ router, err = MakeRESTRouter(kc, time.Duration(keepclient.DefaultProxyRequestTimeout), cluster, logger)
+ if err != nil {
+ return err
+ }
+ server := http.Server{
+ Handler: httpserver.AddRequestIDs(httpserver.LogRequests(router)),
+ BaseContext: func(net.Listener) context.Context {
+ return ctxlog.Context(context.Background(), logger)
+ },
+ }
+ return server.Serve(listener)
}
type TokenCacheEntry struct {
}
}
if err != nil {
- log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
+ ctxlog.FromContext(req.Context()).Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
return false, "", nil
}
*APITokenCache
timeout time.Duration
transport *http.Transport
- logger log.FieldLogger
+ logger logrus.FieldLogger
cluster *arvados.Cluster
}
// MakeRESTRouter returns an http.Handler that passes GET and PUT
// requests to the appropriate handlers.
-func MakeRESTRouter(kc *keepclient.KeepClient, timeout time.Duration, cluster *arvados.Cluster, logger log.FieldLogger) http.Handler {
+func MakeRESTRouter(kc *keepclient.KeepClient, timeout time.Duration, cluster *arvados.Cluster, logger logrus.FieldLogger) (http.Handler, error) {
rest := mux.NewRouter()
transport := defaultTransport
cacheQ, err := lru.New2Q(500)
if err != nil {
- panic("Could not create 2Q")
+ return nil, fmt.Errorf("Error from lru.New2Q: %v", err)
}
h := &proxyHandler{
}).Methods("GET")
rest.NotFoundHandler = InvalidPathHandler{}
- return h
+ return h, nil
}
var errLoopDetected = errors.New("loop detected")
type InvalidPathHandler struct{}
func (InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
+ ctxlog.FromContext(req.Context()).Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
http.Error(resp, "Bad request", http.StatusBadRequest)
}
func (h *proxyHandler) Options(resp http.ResponseWriter, req *http.Request) {
- log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
+ ctxlog.FromContext(req.Context()).Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
SetCorsHeaders(resp)
}
var proxiedURI = "-"
defer func() {
- log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
+ h.logger.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
if status != http.StatusOK {
http.Error(resp, err.Error(), status)
}
}
if expectLength == -1 {
- log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
+ h.logger.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
}
switch respErr := err.(type) {
var locatorOut string = "-"
defer func() {
- log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, kc.Want_replicas, wroteReplicas, locatorOut, err)
+ h.logger.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, kc.Want_replicas, wroteReplicas, locatorOut, err)
if status != http.StatusOK {
http.Error(resp, err.Error(), status)
}
for _, sc := range strings.Split(req.Header.Get("X-Keep-Storage-Classes"), ",") {
scl = append(scl, strings.Trim(sc, " "))
}
- kc.StorageClasses = scl
+ kc.SetStorageClasses(scl)
}
_, err = fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
kc.Arvados = &arvclient
// Check if the client specified the number of replicas
- if req.Header.Get("X-Keep-Desired-Replicas") != "" {
+ if desiredReplicas := req.Header.Get(keepclient.XKeepDesiredReplicas); desiredReplicas != "" {
var r int
- _, err := fmt.Sscanf(req.Header.Get(keepclient.XKeepDesiredReplicas), "%d", &r)
+ _, err := fmt.Sscanf(desiredReplicas, "%d", &r)
if err == nil {
kc.Want_replicas = r
}
switch err.(type) {
case nil:
status = http.StatusOK
+ if len(kc.StorageClasses) > 0 {
+ // A successful PUT request with storage classes means that all
+ // storage classes were fulfilled, so the client will get a
+ // confirmation via the X-Storage-Classes-Confirmed header.
+ hdr := ""
+ isFirst := true
+ for _, sc := range kc.StorageClasses {
+ if isFirst {
+ hdr = fmt.Sprintf("%s=%d", sc, wroteReplicas)
+ isFirst = false
+ } else {
+ hdr += fmt.Sprintf(", %s=%d", sc, wroteReplicas)
+ }
+ }
+ resp.Header().Set(keepclient.XKeepStorageClassesConfirmed, hdr)
+ }
_, err = io.WriteString(resp, locatorOut)
-
case keepclient.OversizeBlockError:
// Too much data
status = http.StatusRequestEntityTooLarge
-
case keepclient.InsufficientReplicasError:
- if wroteReplicas > 0 {
- // At least one write is considered success. The
- // client can decide if getting less than the number of
- // replications it asked for is a fatal error.
- status = http.StatusOK
- _, err = io.WriteString(resp, locatorOut)
- } else {
- status = http.StatusServiceUnavailable
- }
-
+ status = http.StatusServiceUnavailable
default:
status = http.StatusBadGateway
}