+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"flag"
"fmt"
- "log"
"net"
- "net/http"
"os"
"os/signal"
"syscall"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/coreos/go-systemd/daemon"
- "github.com/ghodss/yaml"
)
+var version = "dev"
+
// A Keep "block" is 64MB.
const BlockSize = 64 * 1024 * 1024
var pullq *WorkQueue
var trashq *WorkQueue
-var (
- flagSerializeIO bool
- flagReadonly bool
-)
-
-// TODO(twp): continue moving as much code as possible out of main
-// so it can be effectively tested. Esp. handling and postprocessing
-// of command line flags (identifying Keep volumes and initializing
-// permission arguments).
-
func main() {
- neverDelete := !theConfig.EnableDelete
- signatureTTLSeconds := int(theConfig.BlobSignatureTTL.Duration() / time.Second)
- flag.StringVar(&theConfig.Listen, "listen", theConfig.Listen, "see Listen configuration")
- flag.IntVar(&theConfig.MaxBuffers, "max-buffers", theConfig.MaxBuffers, "see MaxBuffers configuration")
- flag.IntVar(&theConfig.MaxRequests, "max-requests", theConfig.MaxRequests, "see MaxRequests configuration")
- flag.BoolVar(&neverDelete, "never-delete", neverDelete, "see EnableDelete configuration")
- flag.BoolVar(&theConfig.RequireSignatures, "enforce-permissions", theConfig.RequireSignatures, "see RequireSignatures configuration")
- flag.StringVar(&theConfig.BlobSigningKeyFile, "permission-key-file", theConfig.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
- flag.StringVar(&theConfig.BlobSigningKeyFile, "blob-signing-key-file", theConfig.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
- flag.StringVar(&theConfig.SystemAuthTokenFile, "data-manager-token-file", theConfig.SystemAuthTokenFile, "see SystemAuthToken`File` configuration")
- flag.IntVar(&signatureTTLSeconds, "permission-ttl", signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
- flag.IntVar(&signatureTTLSeconds, "blob-signature-ttl", signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
- flag.Var(&theConfig.TrashLifetime, "trash-lifetime", "see TrashLifetime configuration")
- flag.BoolVar(&flagSerializeIO, "serialize", false, "serialize read and write operations on the following volumes.")
- flag.BoolVar(&flagReadonly, "readonly", false, "do not write, delete, or touch anything on the following volumes.")
- flag.StringVar(&theConfig.PIDFile, "pid", theConfig.PIDFile, "see `PIDFile` configuration")
- flag.Var(&theConfig.TrashCheckInterval, "trash-check-interval", "see TrashCheckInterval configuration")
+ deprecated.beforeFlagParse(theConfig)
dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+ getVersion := flag.Bool("version", false, "Print version information and exit.")
defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
var configPath string
flag.Usage = usage
flag.Parse()
- theConfig.BlobSignatureTTL = arvados.Duration(signatureTTLSeconds) * arvados.Duration(time.Second)
- theConfig.EnableDelete = !neverDelete
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("keepstore %s\n", version)
+ return
+ }
+
+ deprecated.afterFlagParse(theConfig)
- // TODO: Load config
err := config.LoadFile(theConfig, configPath)
if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
log.Fatal(err)
}
if *dumpConfig {
- y, err := yaml.Marshal(theConfig)
- if err != nil {
- log.Fatal(err)
- }
- os.Stdout.Write(y)
- os.Exit(0)
+ log.Fatal(config.DumpAndExit(theConfig))
}
+ log.Printf("keepstore %s started", version)
+
err = theConfig.Start()
+ if err != nil {
+ log.Fatal(err)
+ }
if pidfile := theConfig.PIDFile; pidfile != "" {
f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
}
}
+ var cluster *arvados.Cluster
+ cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+ if err != nil && os.IsNotExist(err) {
+ log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+ cluster = &arvados.Cluster{
+ ClusterID: "xxxxx",
+ }
+ } else if err != nil {
+ log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+ } else {
+ cluster, err = cfg.GetCluster("")
+ if err != nil {
+ log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+ }
+ }
+
log.Println("keepstore starting, pid", os.Getpid())
defer log.Println("keepstore exiting, pid", os.Getpid())
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
- // Middleware stack: logger, MaxRequests limiter, method handlers
- http.Handle("/", &LoggingRESTRouter{
- httpserver.NewRequestLimiter(theConfig.MaxRequests,
- MakeRESTRouter()),
- })
+ // Middleware/handler stack
+ router := MakeRESTRouter(cluster)
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
log.Fatal(err)
}
- // Initialize Pull queue and worker
+ // Initialize keepclient for pull workers
keepClient := &keepclient.KeepClient{
Arvados: &arvadosclient.ArvadosClient{},
Want_replicas: 1,
- Client: &http.Client{},
}
- // Initialize the pullq and worker
+ // Initialize the pullq and workers
pullq = NewWorkQueue()
- go RunPullWorker(pullq, keepClient)
+ for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
+ go RunPullWorker(pullq, keepClient)
+ }
- // Initialize the trashq and worker
+ // Initialize the trashq and workers
trashq = NewWorkQueue()
- go RunTrashWorker(trashq)
+ for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
+ go RunTrashWorker(trashq)
+ }
// Start emptyTrash goroutine
doneEmptyingTrash := make(chan bool)
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
- if _, err := daemon.SdNotify("READY=1"); err != nil {
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
- log.Println("listening at", listener.Addr)
- srv := &http.Server{}
+ log.Println("listening at", listener.Addr())
+ srv := &server{}
+ srv.Handler = router
srv.Serve(listener)
}