Merge branch '18941-arv-prefetch' refs #18941
[arvados.git] / services / keepproxy / keepproxy.go
index 04c95cfee6aa729ce090e19518867e160f47192d..c2760a2a4fb462e193f833ac97f59167871c7065 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "context"
        "errors"
        "flag"
        "fmt"
@@ -19,9 +20,11 @@ import (
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
@@ -29,7 +32,7 @@ import (
        "github.com/ghodss/yaml"
        "github.com/gorilla/mux"
        lru "github.com/hashicorp/golang-lru"
-       log "github.com/sirupsen/logrus"
+       "github.com/sirupsen/logrus"
 )
 
 var version = "dev"
@@ -41,70 +44,77 @@ var (
 
 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
 
-func configure(logger log.FieldLogger, args []string) (*arvados.Cluster, error) {
-       flags := flag.NewFlagSet(args[0], flag.ExitOnError)
+func configure(args []string) (*arvados.Cluster, logrus.FieldLogger, error) {
+       prog := args[0]
+       flags := flag.NewFlagSet(prog, flag.ContinueOnError)
 
        dumpConfig := flags.Bool("dump-config", false, "write current configuration to stdout and exit")
        getVersion := flags.Bool("version", false, "Print version information and exit.")
 
+       initLogger := logrus.New()
+       initLogger.Formatter = &logrus.JSONFormatter{
+               TimestampFormat: rfc3339NanoFixed,
+       }
+       var logger logrus.FieldLogger = initLogger
+
        loader := config.NewLoader(os.Stdin, logger)
        loader.SetupFlags(flags)
-
        args = loader.MungeLegacyConfigArgs(logger, args[1:], "-legacy-keepproxy-config")
-       flags.Parse(args)
 
-       // Print version information if requested
-       if *getVersion {
+       if ok, code := cmd.ParseFlags(flags, prog, args, "", os.Stderr); !ok {
+               os.Exit(code)
+       } else if *getVersion {
                fmt.Printf("keepproxy %s\n", version)
-               return nil, nil
+               return nil, logger, nil
        }
 
        cfg, err := loader.Load()
        if err != nil {
-               return nil, err
+               return nil, logger, err
        }
        cluster, err := cfg.GetCluster("")
        if err != nil {
-               return nil, err
+               return nil, logger, err
        }
 
+       logger = ctxlog.New(os.Stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
+               "ClusterID": cluster.ClusterID,
+               "PID":       os.Getpid(),
+       })
+
        if *dumpConfig {
                out, err := yaml.Marshal(cfg)
                if err != nil {
-                       return nil, err
+                       return nil, logger, err
                }
                if _, err := os.Stdout.Write(out); err != nil {
-                       return nil, err
+                       return nil, logger, err
                }
-               return nil, nil
+               return nil, logger, nil
        }
-       return cluster, nil
+
+       return cluster, logger, nil
 }
 
 func main() {
-       logger := log.New()
-       logger.Formatter = &log.JSONFormatter{
-               TimestampFormat: rfc3339NanoFixed,
-       }
-
-       cluster, err := configure(logger, os.Args)
+       cluster, logger, err := configure(os.Args)
        if err != nil {
-               log.Fatal(err)
+               logger.Fatal(err)
        }
        if cluster == nil {
                return
        }
 
-       log.Printf("keepproxy %s started", version)
+       logger.Printf("keepproxy %s started", version)
 
        if err := run(logger, cluster); err != nil {
-               log.Fatal(err)
+               logger.Fatal(err)
        }
 
-       log.Println("shutting down")
+       logger.Println("shutting down")
 }
 
-func run(logger log.FieldLogger, cluster *arvados.Cluster) error {
+func run(logger logrus.FieldLogger, cluster *arvados.Cluster) error {
        client, err := arvados.NewClientFromConfig(cluster)
        if err != nil {
                return err
@@ -123,7 +133,7 @@ func run(logger log.FieldLogger, cluster *arvados.Cluster) error {
        }
 
        if cluster.SystemLogs.LogLevel == "debug" {
-               keepclient.DebugPrintf = log.Printf
+               keepclient.DebugPrintf = logger.Printf
        }
        kc, err := keepclient.MakeKeepClient(arv)
        if err != nil {
@@ -147,24 +157,33 @@ func run(logger log.FieldLogger, cluster *arvados.Cluster) error {
        }
 
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
-               log.Printf("Error notifying init daemon: %v", err)
+               logger.Printf("Error notifying init daemon: %v", err)
        }
-       log.Println("listening at", listener.Addr())
+       logger.Println("listening at", listener.Addr())
 
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)
        go func(sig <-chan os.Signal) {
                s := <-sig
-               log.Println("caught signal:", s)
+               logger.Println("caught signal:", s)
                listener.Close()
        }(term)
        signal.Notify(term, syscall.SIGTERM)
        signal.Notify(term, syscall.SIGINT)
 
        // Start serving requests.
-       router = MakeRESTRouter(kc, time.Duration(keepclient.DefaultProxyRequestTimeout), cluster, logger)
-       return http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
+       router, err = MakeRESTRouter(kc, time.Duration(keepclient.DefaultProxyRequestTimeout), cluster, logger)
+       if err != nil {
+               return err
+       }
+       server := http.Server{
+               Handler: httpserver.AddRequestIDs(httpserver.LogRequests(router)),
+               BaseContext: func(net.Listener) context.Context {
+                       return ctxlog.Context(context.Background(), logger)
+               },
+       }
+       return server.Serve(listener)
 }
 
 type TokenCacheEntry struct {
@@ -263,7 +282,7 @@ func (h *proxyHandler) CheckAuthorizationHeader(req *http.Request) (pass bool, t
                }
        }
        if err != nil {
-               log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
+               ctxlog.FromContext(req.Context()).Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
                return false, "", nil
        }
 
@@ -305,13 +324,13 @@ type proxyHandler struct {
        *APITokenCache
        timeout   time.Duration
        transport *http.Transport
-       logger    log.FieldLogger
+       logger    logrus.FieldLogger
        cluster   *arvados.Cluster
 }
 
 // MakeRESTRouter returns an http.Handler that passes GET and PUT
 // requests to the appropriate handlers.
-func MakeRESTRouter(kc *keepclient.KeepClient, timeout time.Duration, cluster *arvados.Cluster, logger log.FieldLogger) http.Handler {
+func MakeRESTRouter(kc *keepclient.KeepClient, timeout time.Duration, cluster *arvados.Cluster, logger logrus.FieldLogger) (http.Handler, error) {
        rest := mux.NewRouter()
 
        transport := defaultTransport
@@ -325,7 +344,7 @@ func MakeRESTRouter(kc *keepclient.KeepClient, timeout time.Duration, cluster *a
 
        cacheQ, err := lru.New2Q(500)
        if err != nil {
-               panic("Could not create 2Q")
+               return nil, fmt.Errorf("Error from lru.New2Q: %v", err)
        }
 
        h := &proxyHandler{
@@ -362,7 +381,7 @@ func MakeRESTRouter(kc *keepclient.KeepClient, timeout time.Duration, cluster *a
        }).Methods("GET")
 
        rest.NotFoundHandler = InvalidPathHandler{}
-       return h
+       return h, nil
 }
 
 var errLoopDetected = errors.New("loop detected")
@@ -386,12 +405,12 @@ func SetCorsHeaders(resp http.ResponseWriter) {
 type InvalidPathHandler struct{}
 
 func (InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-       log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
+       ctxlog.FromContext(req.Context()).Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
        http.Error(resp, "Bad request", http.StatusBadRequest)
 }
 
 func (h *proxyHandler) Options(resp http.ResponseWriter, req *http.Request) {
-       log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
+       ctxlog.FromContext(req.Context()).Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
        SetCorsHeaders(resp)
 }
 
@@ -415,7 +434,7 @@ func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) {
        var proxiedURI = "-"
 
        defer func() {
-               log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
+               h.logger.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
                if status != http.StatusOK {
                        http.Error(resp, err.Error(), status)
                }
@@ -466,7 +485,7 @@ func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) {
        }
 
        if expectLength == -1 {
-               log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
+               h.logger.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
        }
 
        switch respErr := err.(type) {
@@ -514,7 +533,7 @@ func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) {
        var locatorOut string = "-"
 
        defer func() {
-               log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, kc.Want_replicas, wroteReplicas, locatorOut, err)
+               h.logger.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, kc.Want_replicas, wroteReplicas, locatorOut, err)
                if status != http.StatusOK {
                        http.Error(resp, err.Error(), status)
                }
@@ -528,7 +547,7 @@ func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) {
                for _, sc := range strings.Split(req.Header.Get("X-Keep-Storage-Classes"), ",") {
                        scl = append(scl, strings.Trim(sc, " "))
                }
-               kc.StorageClasses = scl
+               kc.SetStorageClasses(scl)
        }
 
        _, err = fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
@@ -565,9 +584,9 @@ func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) {
        kc.Arvados = &arvclient
 
        // Check if the client specified the number of replicas
-       if req.Header.Get("X-Keep-Desired-Replicas") != "" {
+       if desiredReplicas := req.Header.Get(keepclient.XKeepDesiredReplicas); desiredReplicas != "" {
                var r int
-               _, err := fmt.Sscanf(req.Header.Get(keepclient.XKeepDesiredReplicas), "%d", &r)
+               _, err := fmt.Sscanf(desiredReplicas, "%d", &r)
                if err == nil {
                        kc.Want_replicas = r
                }
@@ -604,23 +623,28 @@ func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) {
        switch err.(type) {
        case nil:
                status = http.StatusOK
+               if len(kc.StorageClasses) > 0 {
+                       // A successful PUT request with storage classes means that all
+                       // storage classes were fulfilled, so the client will get a
+                       // confirmation via the X-Storage-Classes-Confirmed header.
+                       hdr := ""
+                       isFirst := true
+                       for _, sc := range kc.StorageClasses {
+                               if isFirst {
+                                       hdr = fmt.Sprintf("%s=%d", sc, wroteReplicas)
+                                       isFirst = false
+                               } else {
+                                       hdr += fmt.Sprintf(", %s=%d", sc, wroteReplicas)
+                               }
+                       }
+                       resp.Header().Set(keepclient.XKeepStorageClassesConfirmed, hdr)
+               }
                _, err = io.WriteString(resp, locatorOut)
-
        case keepclient.OversizeBlockError:
                // Too much data
                status = http.StatusRequestEntityTooLarge
-
        case keepclient.InsufficientReplicasError:
-               if wroteReplicas > 0 {
-                       // At least one write is considered success.  The
-                       // client can decide if getting less than the number of
-                       // replications it asked for is a fatal error.
-                       status = http.StatusOK
-                       _, err = io.WriteString(resp, locatorOut)
-               } else {
-                       status = http.StatusServiceUnavailable
-               }
-
+               status = http.StatusServiceUnavailable
        default:
                status = http.StatusBadGateway
        }