+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
- "bytes"
- "flag"
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "io/ioutil"
- "log"
- "net"
- "net/http"
- "os"
- "os/signal"
- "strings"
- "syscall"
"time"
)
-// ======================
-// Configuration settings
-//
-// TODO(twp): make all of these configurable via command line flags
-// and/or configuration file settings.
-
-// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
-const DEFAULT_ADDR = ":25107"
-
// A Keep "block" is 64MB.
-const BLOCKSIZE = 64 * 1024 * 1024
+const BlockSize = 64 * 1024 * 1024
-// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// A Keep volume must have at least MinFreeKilobytes available
// in order to permit writes.
-const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
-
-var PROC_MOUNTS = "/proc/mounts"
-
-// enforce_permissions controls whether permission signatures
-// should be enforced (affecting GET and DELETE requests).
-// Initialized by the --enforce-permissions flag.
-var enforce_permissions bool
+const MinFreeKilobytes = BlockSize / 1024
-// permission_ttl is the time duration for which new permission
-// signatures (returned by PUT requests) will be valid.
-// Initialized by the --permission-ttl flag.
-var permission_ttl time.Duration
+var bufs *bufferPool
-// data_manager_token represents the API token used by the
-// Data Manager, and is required on certain privileged operations.
-// Initialized by the --data-manager-token-file flag.
-var data_manager_token string
-
-// never_delete can be used to prevent the DELETE handler from
-// actually deleting anything.
-var never_delete = false
-
-// ==========
-// Error types.
+// KeepError types.
//
type KeepError struct {
HTTPCode int
DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
ExpiredError = &KeepError{401, "Expired permission signature"}
NotFoundError = &KeepError{404, "Not Found"}
+ VolumeBusyError = &KeepError{503, "Volume backend busy"}
GenericError = &KeepError{500, "Fail"}
FullError = &KeepError{503, "Full"}
- TooLongError = &KeepError{504, "Timeout"}
+ SizeRequiredError = &KeepError{411, "Missing Content-Length"}
+ TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
+ ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
+ ErrClientDisconnect = &KeepError{503, "Client disconnected"}
)
func (e *KeepError) Error() string {
return e.ErrMsg
}
-// ========================
-// Internal data structures
-//
-// These global variables are used by multiple parts of the
-// program. They are good candidates for moving into their own
-// packages.
-
-// The Keep VolumeManager maintains a list of available volumes.
-// Initialized by the --volumes flag (or by FindKeepVolumes).
-var KeepVM VolumeManager
-
-// The pull list manager and trash queue are threadsafe queues which
-// support atomic update operations. The PullHandler and TrashHandler
-// store results from Data Manager /pull and /trash requests here.
-//
-// See the Keep and Data Manager design documents for more details:
-// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
-// https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
-//
-var pullq *WorkQueue
-var trashq *WorkQueue
-
-// TODO(twp): continue moving as much code as possible out of main
-// so it can be effectively tested. Esp. handling and postprocessing
-// of command line flags (identifying Keep volumes and initializing
-// permission arguments).
-
-func main() {
- log.Println("Keep started: pid", os.Getpid())
-
- // Parse command-line flags:
- //
- // -listen=ipaddr:port
- // Interface on which to listen for requests. Use :port without
- // an ipaddr to listen on all network interfaces.
- // Examples:
- // -listen=127.0.0.1:4949
- // -listen=10.0.1.24:8000
- // -listen=:25107 (to listen to port 25107 on all interfaces)
- //
- // -volumes
- // A comma-separated list of directories to use as Keep volumes.
- // Example:
- // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
- //
- // If -volumes is empty or is not present, Keep will select volumes
- // by looking at currently mounted filesystems for /keep top-level
- // directories.
-
- var (
- data_manager_token_file string
- listen string
- permission_key_file string
- permission_ttl_sec int
- serialize_io bool
- volumearg string
- pidfile string
- )
- flag.StringVar(
- &data_manager_token_file,
- "data-manager-token-file",
- "",
- "File with the API token used by the Data Manager. All DELETE "+
- "requests or GET /index requests must carry this token.")
- flag.BoolVar(
- &enforce_permissions,
- "enforce-permissions",
- false,
- "Enforce permission signatures on requests.")
- flag.StringVar(
- &listen,
- "listen",
- DEFAULT_ADDR,
- "Interface on which to listen for requests, in the format "+
- "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
- "to listen on all network interfaces.")
- flag.BoolVar(
- &never_delete,
- "never-delete",
- false,
- "If set, nothing will be deleted. HTTP 405 will be returned "+
- "for valid DELETE requests.")
- flag.StringVar(
- &permission_key_file,
- "permission-key-file",
- "",
- "File containing the secret key for generating and verifying "+
- "permission signatures.")
- flag.IntVar(
- &permission_ttl_sec,
- "permission-ttl",
- 1209600,
- "Expiration time (in seconds) for newly generated permission "+
- "signatures.")
- flag.BoolVar(
- &serialize_io,
- "serialize",
- false,
- "If set, all read and write operations on local Keep volumes will "+
- "be serialized.")
- flag.StringVar(
- &volumearg,
- "volumes",
- "",
- "Comma-separated list of directories to use for Keep volumes, "+
- "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
- "supplied, Keep will scan mounted filesystems for volumes "+
- "with a /keep top-level directory.")
-
- flag.StringVar(
- &pidfile,
- "pid",
- "",
- "Path to write pid file")
-
- flag.Parse()
-
- // Look for local keep volumes.
- var keepvols []string
- if volumearg == "" {
- // TODO(twp): decide whether this is desirable default behavior.
- // In production we may want to require the admin to specify
- // Keep volumes explicitly.
- keepvols = FindKeepVolumes()
- } else {
- keepvols = strings.Split(volumearg, ",")
- }
-
- // Check that the specified volumes actually exist.
- var goodvols []Volume = nil
- for _, v := range keepvols {
- if _, err := os.Stat(v); err == nil {
- log.Println("adding Keep volume:", v)
- newvol := MakeUnixVolume(v, serialize_io)
- goodvols = append(goodvols, &newvol)
- } else {
- log.Printf("bad Keep volume: %s\n", err)
+// Periodically (once per interval) invoke EmptyTrash on all volumes.
+func emptyTrash(mounts []*VolumeMount, interval time.Duration) {
+ for range time.NewTicker(interval).C {
+ for _, v := range mounts {
+ v.EmptyTrash()
}
}
-
- if len(goodvols) == 0 {
- log.Fatal("could not find any keep volumes")
- }
-
- // Initialize data manager token and permission key.
- // If these tokens are specified but cannot be read,
- // raise a fatal error.
- if data_manager_token_file != "" {
- if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
- data_manager_token = strings.TrimSpace(string(buf))
- } else {
- log.Fatalf("reading data manager token: %s\n", err)
- }
- }
- if permission_key_file != "" {
- if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
- PermissionSecret = bytes.TrimSpace(buf)
- } else {
- log.Fatalf("reading permission key: %s\n", err)
- }
- }
-
- // Initialize permission TTL
- permission_ttl = time.Duration(permission_ttl_sec) * time.Second
-
- // If --enforce-permissions is true, we must have a permission key
- // to continue.
- if PermissionSecret == nil {
- if enforce_permissions {
- log.Fatal("--enforce-permissions requires a permission key")
- } else {
- log.Println("Running without a PermissionSecret. Block locators " +
- "returned by this server will not be signed, and will be rejected " +
- "by a server that enforces permissions.")
- log.Println("To fix this, run Keep with --permission-key-file=<path> " +
- "to define the location of a file containing the permission key.")
- }
- }
-
- // Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(goodvols)
-
- // Tell the built-in HTTP server to direct all requests to the REST router.
- loggingRouter := MakeLoggingRESTRouter()
- http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
- loggingRouter.ServeHTTP(resp, req)
- })
-
- // Set up a TCP listener.
- listener, err := net.Listen("tcp", listen)
- if err != nil {
- log.Fatal(err)
- }
-
- // Initialize Pull queue and worker
- keepClient := keepclient.KeepClient{
- Arvados: nil,
- Want_replicas: 1,
- Using_proxy: true,
- Client: &http.Client{},
- }
-
- pullq = NewWorkQueue()
- go RunPullWorker(pullq, keepClient)
-
- // Shut down the server gracefully (by closing the listener)
- // if SIGTERM is received.
- term := make(chan os.Signal, 1)
- go func(sig <-chan os.Signal) {
- s := <-sig
- log.Println("caught signal:", s)
- listener.Close()
- }(term)
- signal.Notify(term, syscall.SIGTERM)
-
- if pidfile != "" {
- f, err := os.Create(pidfile)
- if err == nil {
- fmt.Fprint(f, os.Getpid())
- f.Close()
- } else {
- log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
- }
- }
-
- // Start listening for requests.
- srv := &http.Server{Addr: listen}
- srv.Serve(listener)
-
- log.Println("shutting down")
-
- if pidfile != "" {
- os.Remove(pidfile)
- }
}