16809: Test keep-web with s3cmd if available.
[arvados.git] / services / keepproxy / keepproxy.go
index e4e54040f2282cac9a3434273b546f5d67bb1bbc..0191e5ba45391e4058b24e014ae4d2feab16d0e2 100644 (file)
@@ -20,16 +20,16 @@ import (
        "syscall"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/config"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/health"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/coreos/go-systemd/daemon"
+       "github.com/ghodss/yaml"
        "github.com/gorilla/mux"
        log "github.com/sirupsen/logrus"
-       "gopkg.in/yaml.v2"
 )
 
 var version = "dev"
@@ -41,9 +41,8 @@ var (
 
 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
 
-func configure(logger log.FieldLogger, args []string) *arvados.Cluster {
+func configure(logger log.FieldLogger, args []string) (*arvados.Cluster, error) {
        flags := flag.NewFlagSet(args[0], flag.ExitOnError)
-       flags.Usage = usage
 
        dumpConfig := flags.Bool("dump-config", false, "write current configuration to stdout and exit")
        getVersion := flags.Bool("version", false, "Print version information and exit.")
@@ -57,31 +56,29 @@ func configure(logger log.FieldLogger, args []string) *arvados.Cluster {
        // Print version information if requested
        if *getVersion {
                fmt.Printf("keepproxy %s\n", version)
-               return nil
+               return nil, nil
        }
 
        cfg, err := loader.Load()
        if err != nil {
-               log.Fatal(err)
+               return nil, err
        }
-
        cluster, err := cfg.GetCluster("")
        if err != nil {
-               log.Fatal(err)
+               return nil, err
        }
 
        if *dumpConfig {
                out, err := yaml.Marshal(cfg)
                if err != nil {
-                       log.Fatal(err)
+                       return nil, err
                }
-               _, err = os.Stdout.Write(out)
-               if err != nil {
-                       log.Fatal(err)
+               if _, err := os.Stdout.Write(out); err != nil {
+                       return nil, err
                }
-               return nil
+               return nil, nil
        }
-       return cluster
+       return cluster, nil
 }
 
 func main() {
@@ -90,22 +87,39 @@ func main() {
                TimestampFormat: rfc3339NanoFixed,
        }
 
-       cluster := configure(logger, os.Args)
+       cluster, err := configure(logger, os.Args)
+       if err != nil {
+               log.Fatal(err)
+       }
        if cluster == nil {
                return
        }
 
        log.Printf("keepproxy %s started", version)
 
+       if err := run(logger, cluster); err != nil {
+               log.Fatal(err)
+       }
+
+       log.Println("shutting down")
+}
+
+func run(logger log.FieldLogger, cluster *arvados.Cluster) error {
        client, err := arvados.NewClientFromConfig(cluster)
        if err != nil {
-               log.Fatal(err)
+               return err
        }
        client.AuthToken = cluster.SystemRootToken
 
        arv, err := arvadosclient.New(client)
        if err != nil {
-               log.Fatalf("Error setting up arvados client %s", err.Error())
+               return fmt.Errorf("Error setting up arvados client %v", err)
+       }
+
+       // If a config file is available, use the keepstores defined there
+       // instead of the legacy autodiscover mechanism via the API server
+       for k := range cluster.Services.Keepstore.InternalURLs {
+               arv.KeepServiceURIs = append(arv.KeepServiceURIs, strings.TrimRight(k.String(), "/"))
        }
 
        if cluster.SystemLogs.LogLevel == "debug" {
@@ -113,34 +127,10 @@ func main() {
        }
        kc, err := keepclient.MakeKeepClient(arv)
        if err != nil {
-               log.Fatalf("Error setting up keep client %s", err.Error())
+               return fmt.Errorf("Error setting up keep client %v", err)
        }
        keepclient.RefreshServiceDiscoveryOnSIGHUP()
 
-       pidFile := "keepproxy"
-       f, err := os.Create(pidFile)
-       if err != nil {
-               log.Fatal(err)
-       }
-       defer f.Close()
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
-       if err != nil {
-               log.Fatalf("flock(%s): %s", pidFile, err)
-       }
-       defer os.Remove(pidFile)
-       err = f.Truncate(0)
-       if err != nil {
-               log.Fatalf("truncate(%s): %s", pidFile, err)
-       }
-       _, err = fmt.Fprint(f, os.Getpid())
-       if err != nil {
-               log.Fatalf("write(%s): %s", pidFile, err)
-       }
-       err = f.Sync()
-       if err != nil {
-               log.Fatalf("sync(%s): %s", pidFile, err)
-       }
-
        if cluster.Collections.DefaultReplication > 0 {
                kc.Want_replicas = cluster.Collections.DefaultReplication
        }
@@ -149,15 +139,17 @@ func main() {
        for listen = range cluster.Services.Keepproxy.InternalURLs {
                break
        }
-       listener, err := net.Listen("tcp", listen.Host)
-       if err != nil {
-               log.Fatalf("listen(%s): %s", listen, err)
+
+       var lErr error
+       listener, lErr = net.Listen("tcp", listen.Host)
+       if lErr != nil {
+               return fmt.Errorf("listen(%s): %v", listen.Host, lErr)
        }
 
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
-       log.Println("Listening at", listener.Addr())
+       log.Println("listening at", listener.Addr())
 
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
@@ -171,10 +163,8 @@ func main() {
        signal.Notify(term, syscall.SIGINT)
 
        // Start serving requests.
-       router = MakeRESTRouter(kc, time.Duration(cluster.API.KeepServiceRequestTimeout), cluster.SystemRootToken)
-       http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
-
-       log.Println("shutting down")
+       router = MakeRESTRouter(kc, time.Duration(keepclient.DefaultProxyRequestTimeout), cluster.ManagementToken)
+       return http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
 }
 
 type ApiTokenCache struct {