package main
import (
- "bytes"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "io/ioutil"
- "log"
"net"
"net/http"
"os"
"os/signal"
- "strings"
"syscall"
"time"
-)
-// ======================
-// Configuration settings
-//
-// TODO(twp): make all of these configurable via command line flags
-// and/or configuration file settings.
-
-// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
-const DEFAULT_ADDR = ":25107"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ log "github.com/Sirupsen/logrus"
+ "github.com/coreos/go-systemd/daemon"
+)
// A Keep "block" is 64MB.
-const BLOCKSIZE = 64 * 1024 * 1024
+const BlockSize = 64 * 1024 * 1024
-// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// A Keep volume must have at least MinFreeKilobytes available
// in order to permit writes.
-const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
-
-var PROC_MOUNTS = "/proc/mounts"
-
-// enforce_permissions controls whether permission signatures
-// should be enforced (affecting GET and DELETE requests).
-// Initialized by the --enforce-permissions flag.
-var enforce_permissions bool
-
-// permission_ttl is the time duration for which new permission
-// signatures (returned by PUT requests) will be valid.
-// Initialized by the --permission-ttl flag.
-var permission_ttl time.Duration
+const MinFreeKilobytes = BlockSize / 1024
-// data_manager_token represents the API token used by the
-// Data Manager, and is required on certain privileged operations.
-// Initialized by the --data-manager-token-file flag.
-var data_manager_token string
+// ProcMounts /proc/mounts
+var ProcMounts = "/proc/mounts"
-// never_delete can be used to prevent the DELETE handler from
-// actually deleting anything.
-var never_delete = false
+var bufs *bufferPool
-// ==========
-// Error types.
+// KeepError types.
//
type KeepError struct {
HTTPCode int
NotFoundError = &KeepError{404, "Not Found"}
GenericError = &KeepError{500, "Fail"}
FullError = &KeepError{503, "Full"}
- TooLongError = &KeepError{504, "Timeout"}
+ SizeRequiredError = &KeepError{411, "Missing Content-Length"}
+ TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
+ ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
+ ErrClientDisconnect = &KeepError{503, "Client disconnected"}
)
func (e *KeepError) Error() string {
var pullq *WorkQueue
var trashq *WorkQueue
-// TODO(twp): continue moving as much code as possible out of main
-// so it can be effectively tested. Esp. handling and postprocessing
-// of command line flags (identifying Keep volumes and initializing
-// permission arguments).
-
func main() {
- log.Println("Keep started: pid", os.Getpid())
-
- // Parse command-line flags:
- //
- // -listen=ipaddr:port
- // Interface on which to listen for requests. Use :port without
- // an ipaddr to listen on all network interfaces.
- // Examples:
- // -listen=127.0.0.1:4949
- // -listen=10.0.1.24:8000
- // -listen=:25107 (to listen to port 25107 on all interfaces)
- //
- // -volumes
- // A comma-separated list of directories to use as Keep volumes.
- // Example:
- // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
- //
- // If -volumes is empty or is not present, Keep will select volumes
- // by looking at currently mounted filesystems for /keep top-level
- // directories.
-
- var (
- data_manager_token_file string
- listen string
- permission_key_file string
- permission_ttl_sec int
- serialize_io bool
- volumearg string
- pidfile string
- )
- flag.StringVar(
- &data_manager_token_file,
- "data-manager-token-file",
- "",
- "File with the API token used by the Data Manager. All DELETE "+
- "requests or GET /index requests must carry this token.")
- flag.BoolVar(
- &enforce_permissions,
- "enforce-permissions",
- false,
- "Enforce permission signatures on requests.")
- flag.StringVar(
- &listen,
- "listen",
- DEFAULT_ADDR,
- "Interface on which to listen for requests, in the format "+
- "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
- "to listen on all network interfaces.")
- flag.BoolVar(
- &never_delete,
- "never-delete",
- false,
- "If set, nothing will be deleted. HTTP 405 will be returned "+
- "for valid DELETE requests.")
- flag.StringVar(
- &permission_key_file,
- "permission-key-file",
- "",
- "File containing the secret key for generating and verifying "+
- "permission signatures.")
- flag.IntVar(
- &permission_ttl_sec,
- "permission-ttl",
- 1209600,
- "Expiration time (in seconds) for newly generated permission "+
- "signatures.")
- flag.BoolVar(
- &serialize_io,
- "serialize",
- false,
- "If set, all read and write operations on local Keep volumes will "+
- "be serialized.")
- flag.StringVar(
- &volumearg,
- "volumes",
- "",
- "Comma-separated list of directories to use for Keep volumes, "+
- "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
- "supplied, Keep will scan mounted filesystems for volumes "+
- "with a /keep top-level directory.")
+ deprecated.beforeFlagParse(theConfig)
- flag.StringVar(
- &pidfile,
- "pid",
- "",
- "Path to write pid file")
+ dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+ defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
+ var configPath string
+ flag.StringVar(
+ &configPath,
+ "config",
+ defaultConfigPath,
+ "YAML or JSON configuration file `path`")
+ flag.Usage = usage
flag.Parse()
- // Look for local keep volumes.
- var keepvols []string
- if volumearg == "" {
- // TODO(twp): decide whether this is desirable default behavior.
- // In production we may want to require the admin to specify
- // Keep volumes explicitly.
- keepvols = FindKeepVolumes()
- } else {
- keepvols = strings.Split(volumearg, ",")
+ deprecated.afterFlagParse(theConfig)
+
+ err := config.LoadFile(theConfig, configPath)
+ if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
+ log.Fatal(err)
}
- // Check that the specified volumes actually exist.
- var goodvols []Volume = nil
- for _, v := range keepvols {
- if _, err := os.Stat(v); err == nil {
- log.Println("adding Keep volume:", v)
- newvol := MakeUnixVolume(v, serialize_io)
- goodvols = append(goodvols, &newvol)
- } else {
- log.Printf("bad Keep volume: %s\n", err)
- }
+ if *dumpConfig {
+ log.Fatal(config.DumpAndExit(theConfig))
}
- if len(goodvols) == 0 {
- log.Fatal("could not find any keep volumes")
+ err = theConfig.Start()
+ if err != nil {
+ log.Fatal(err)
}
- // Initialize data manager token and permission key.
- // If these tokens are specified but cannot be read,
- // raise a fatal error.
- if data_manager_token_file != "" {
- if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
- data_manager_token = strings.TrimSpace(string(buf))
- } else {
- log.Fatalf("reading data manager token: %s\n", err)
+ if pidfile := theConfig.PIDFile; pidfile != "" {
+ f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
+ if err != nil {
+ log.Fatalf("open pidfile (%s): %s", pidfile, err)
}
- }
- if permission_key_file != "" {
- if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
- PermissionSecret = bytes.TrimSpace(buf)
- } else {
- log.Fatalf("reading permission key: %s\n", err)
+ defer f.Close()
+ err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+ if err != nil {
+ log.Fatalf("flock pidfile (%s): %s", pidfile, err)
}
- }
-
- // Initialize permission TTL
- permission_ttl = time.Duration(permission_ttl_sec) * time.Second
-
- // If --enforce-permissions is true, we must have a permission key
- // to continue.
- if PermissionSecret == nil {
- if enforce_permissions {
- log.Fatal("--enforce-permissions requires a permission key")
- } else {
- log.Println("Running without a PermissionSecret. Block locators " +
- "returned by this server will not be signed, and will be rejected " +
- "by a server that enforces permissions.")
- log.Println("To fix this, run Keep with --permission-key-file=<path> " +
- "to define the location of a file containing the permission key.")
+ defer os.Remove(pidfile)
+ err = f.Truncate(0)
+ if err != nil {
+ log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
+ }
+ _, err = fmt.Fprint(f, os.Getpid())
+ if err != nil {
+ log.Fatalf("write pidfile (%s): %s", pidfile, err)
+ }
+ err = f.Sync()
+ if err != nil {
+ log.Fatalf("sync pidfile (%s): %s", pidfile, err)
}
}
+ log.Println("keepstore starting, pid", os.Getpid())
+ defer log.Println("keepstore exiting, pid", os.Getpid())
+
// Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(goodvols)
+ KeepVM = MakeRRVolumeManager(theConfig.Volumes)
- // Tell the built-in HTTP server to direct all requests to the REST router.
- loggingRouter := MakeLoggingRESTRouter()
- http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
- loggingRouter.ServeHTTP(resp, req)
- })
+ // Middleware stack: logger, MaxRequests limiter, method handlers
+ router := MakeRESTRouter()
+ limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
+ router.limiter = limiter
+ http.Handle("/", &LoggingRESTRouter{router: limiter})
// Set up a TCP listener.
- listener, err := net.Listen("tcp", listen)
+ listener, err := net.Listen("tcp", theConfig.Listen)
if err != nil {
log.Fatal(err)
}
// Initialize Pull queue and worker
- keepClient := keepclient.KeepClient{
- Arvados: nil,
+ keepClient := &keepclient.KeepClient{
+ Arvados: &arvadosclient.ArvadosClient{},
Want_replicas: 1,
- Using_proxy: true,
Client: &http.Client{},
}
trashq = NewWorkQueue()
go RunTrashWorker(trashq)
+ // Start emptyTrash goroutine
+ doneEmptyingTrash := make(chan bool)
+ go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
+
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
term := make(chan os.Signal, 1)
go func(sig <-chan os.Signal) {
s := <-sig
log.Println("caught signal:", s)
+ doneEmptyingTrash <- true
listener.Close()
}(term)
signal.Notify(term, syscall.SIGTERM)
+ signal.Notify(term, syscall.SIGINT)
- if pidfile != "" {
- f, err := os.Create(pidfile)
- if err == nil {
- fmt.Fprint(f, os.Getpid())
- f.Close()
- } else {
- log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
- }
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+ log.Printf("Error notifying init daemon: %v", err)
}
-
- // Start listening for requests.
- srv := &http.Server{Addr: listen}
+ log.Println("listening at", listener.Addr())
+ srv := &http.Server{}
srv.Serve(listener)
+}
- log.Println("shutting down")
-
- if pidfile != "" {
- os.Remove(pidfile)
+// Periodically (once per interval) invoke EmptyTrash on all volumes.
+func emptyTrash(done <-chan bool, interval time.Duration) {
+ ticker := time.NewTicker(interval)
+
+ for {
+ select {
+ case <-ticker.C:
+ for _, v := range theConfig.Volumes {
+ if v.Writable() {
+ v.EmptyTrash()
+ }
+ }
+ case <-done:
+ ticker.Stop()
+ return
+ }
}
}