import (
"bytes"
+ "crypto/sha256"
+ _ "embed"
"encoding/json"
"errors"
"flag"
"io"
"io/ioutil"
"os"
+ "regexp"
+ "runtime"
+ "strconv"
"strings"
+ "time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/ghodss/yaml"
"github.com/imdario/mergo"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
+ "golang.org/x/sys/unix"
)
+//go:embed config.default.yml
+var DefaultYAML []byte
+
var ErrNoClustersDefined = errors.New("config does not define any clusters")
type Loader struct {
Logger logrus.FieldLogger
SkipDeprecated bool // Don't load deprecated config keys
SkipLegacy bool // Don't load legacy config files
+ SkipAPICalls bool // Don't do checks that call RailsAPI/controller
Path string
KeepstorePath string
CrunchDispatchSlurmPath string
WebsocketPath string
KeepproxyPath string
+ GitHttpdPath string
+ KeepBalancePath string
configdata []byte
+ // UTC time for configdata: either the modtime of the file we
+ // read configdata from, or the time when we read configdata
+ // from a pipe.
+ sourceTimestamp time.Time
+ // UTC time when configdata was read.
+ loadTimestamp time.Time
}
// NewLoader returns a new Loader with Stdin and Logger set to the
// // ldr.Path == "/tmp/c.yaml"
func (ldr *Loader) SetupFlags(flagset *flag.FlagSet) {
flagset.StringVar(&ldr.Path, "config", arvados.DefaultConfigFile, "Site configuration `file` (default may be overridden by setting an ARVADOS_CONFIG environment variable)")
- flagset.StringVar(&ldr.KeepstorePath, "legacy-keepstore-config", defaultKeepstoreConfigPath, "Legacy keepstore configuration `file`")
- flagset.StringVar(&ldr.KeepWebPath, "legacy-keepweb-config", defaultKeepWebConfigPath, "Legacy keep-web configuration `file`")
- flagset.StringVar(&ldr.CrunchDispatchSlurmPath, "legacy-crunch-dispatch-slurm-config", defaultCrunchDispatchSlurmConfigPath, "Legacy crunch-dispatch-slurm configuration `file`")
- flagset.StringVar(&ldr.WebsocketPath, "legacy-ws-config", defaultWebsocketConfigPath, "Legacy arvados-ws configuration `file`")
- flagset.StringVar(&ldr.KeepproxyPath, "legacy-keepproxy-config", defaultKeepproxyConfigPath, "Legacy keepproxy configuration `file`")
- flagset.BoolVar(&ldr.SkipLegacy, "skip-legacy", false, "Don't load legacy config files")
+ if !ldr.SkipLegacy {
+ flagset.StringVar(&ldr.KeepstorePath, "legacy-keepstore-config", defaultKeepstoreConfigPath, "Legacy keepstore configuration `file`")
+ flagset.StringVar(&ldr.KeepWebPath, "legacy-keepweb-config", defaultKeepWebConfigPath, "Legacy keep-web configuration `file`")
+ flagset.StringVar(&ldr.CrunchDispatchSlurmPath, "legacy-crunch-dispatch-slurm-config", defaultCrunchDispatchSlurmConfigPath, "Legacy crunch-dispatch-slurm configuration `file`")
+ flagset.StringVar(&ldr.WebsocketPath, "legacy-ws-config", defaultWebsocketConfigPath, "Legacy arvados-ws configuration `file`")
+ flagset.StringVar(&ldr.KeepproxyPath, "legacy-keepproxy-config", defaultKeepproxyConfigPath, "Legacy keepproxy configuration `file`")
+ flagset.StringVar(&ldr.GitHttpdPath, "legacy-git-httpd-config", defaultGitHttpdConfigPath, "Legacy arvados-git-httpd configuration `file`")
+ flagset.StringVar(&ldr.KeepBalancePath, "legacy-keepbalance-config", defaultKeepBalanceConfigPath, "Legacy keep-balance configuration `file`")
+ flagset.BoolVar(&ldr.SkipLegacy, "skip-legacy", false, "Don't load legacy config files")
+ }
}
// MungeLegacyConfigArgs checks args for a -config flag whose argument
ldr.KeepWebPath = ""
}
if legacyConfigArg != "-legacy-keepproxy-config" {
- ldr.WebsocketPath = ""
+ ldr.KeepproxyPath = ""
+ }
+ if legacyConfigArg != "-legacy-git-httpd-config" {
+ ldr.GitHttpdPath = ""
+ }
+ if legacyConfigArg != "-legacy-keepbalance-config" {
+ ldr.KeepBalancePath = ""
}
return munged
}
-func (ldr *Loader) loadBytes(path string) ([]byte, error) {
+func (ldr *Loader) loadBytes(path string) (buf []byte, sourceTime, loadTime time.Time, err error) {
+ loadTime = time.Now().UTC()
if path == "-" {
- return ioutil.ReadAll(ldr.Stdin)
+ buf, err = ioutil.ReadAll(ldr.Stdin)
+ sourceTime = loadTime
+ return
}
f, err := os.Open(path)
if err != nil {
- return nil, err
+ return
}
defer f.Close()
- return ioutil.ReadAll(f)
+ fi, err := f.Stat()
+ if err != nil {
+ return
+ }
+ sourceTime = fi.ModTime().UTC()
+ buf, err = ioutil.ReadAll(f)
+ return
}
func (ldr *Loader) Load() (*arvados.Config, error) {
if ldr.configdata == nil {
- buf, err := ldr.loadBytes(ldr.Path)
+ buf, sourceTime, loadTime, err := ldr.loadBytes(ldr.Path)
if err != nil {
return nil, err
}
ldr.configdata = buf
+ ldr.sourceTimestamp = sourceTime
+ ldr.loadTimestamp = loadTime
}
+ // FIXME: We should reject YAML if the same key is used twice
+ // in a map/object, like {foo: bar, foo: baz}. Maybe we'll get
+ // this fixed free when we upgrade ghodss/yaml to a version
+ // that uses go-yaml v3.
+
// Load the config into a dummy map to get the cluster ID
// keys, discarding the values; then set up defaults for each
// cluster ID; then load the real config on top of the
}
ldr.logExtraKeys(merged, src, "")
removeSampleKeys(merged)
+ // We merge the loaded config into the default, overriding any existing keys.
+ // Make sure we do not override a default with a key that has a 'null' value.
+ removeNullKeys(src)
err = mergo.Merge(&merged, src, mergo.WithOverride)
if err != nil {
return nil, fmt.Errorf("merging config data: %s", err)
return nil, fmt.Errorf("transcoding config data: %s", err)
}
+ var loadFuncs []func(*arvados.Config) error
if !ldr.SkipDeprecated {
- err = ldr.applyDeprecatedConfig(&cfg)
- if err != nil {
- return nil, err
- }
+ loadFuncs = append(loadFuncs,
+ ldr.applyDeprecatedConfig,
+ ldr.applyDeprecatedVolumeDriverParameters,
+ )
}
if !ldr.SkipLegacy {
// legacy file is required when either:
// * a non-default location was specified
// * no primary config was loaded, and this is the
// legacy config file for the current component
- for _, err := range []error{
- ldr.loadOldKeepstoreConfig(&cfg),
- ldr.loadOldKeepWebConfig(&cfg),
- ldr.loadOldCrunchDispatchSlurmConfig(&cfg),
- ldr.loadOldWebsocketConfig(&cfg),
- ldr.loadOldKeepproxyConfig(&cfg),
+ loadFuncs = append(loadFuncs,
+ ldr.loadOldEnvironmentVariables,
+ ldr.loadOldKeepstoreConfig,
+ ldr.loadOldKeepWebConfig,
+ ldr.loadOldCrunchDispatchSlurmConfig,
+ ldr.loadOldWebsocketConfig,
+ ldr.loadOldKeepproxyConfig,
+ ldr.loadOldGitHttpdConfig,
+ ldr.loadOldKeepBalanceConfig,
+ )
+ }
+ loadFuncs = append(loadFuncs,
+ ldr.setImplicitStorageClasses,
+ ldr.setLoopbackInstanceType,
+ )
+ for _, f := range loadFuncs {
+ err = f(&cfg)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // Preprocess/automate some configs
+ for id, cc := range cfg.Clusters {
+ ldr.autofillPreemptible("Clusters."+id, &cc)
+
+ if strings.Count(cc.Users.AnonymousUserToken, "/") == 3 {
+ // V2 token, strip it to just a secret
+ tmp := strings.Split(cc.Users.AnonymousUserToken, "/")
+ cc.Users.AnonymousUserToken = tmp[2]
+ }
+
+ cfg.Clusters[id] = cc
+ }
+
+ // Check for known mistakes
+ for id, cc := range cfg.Clusters {
+ for remote := range cc.RemoteClusters {
+ if remote == "*" || remote == "SAMPLE" {
+ continue
+ }
+ err = ldr.checkClusterID(fmt.Sprintf("Clusters.%s.RemoteClusters.%s", id, remote), remote, true)
+ if err != nil {
+ return nil, err
+ }
+ }
+ for _, err = range []error{
+ ldr.checkClusterID(fmt.Sprintf("Clusters.%s", id), id, false),
+ ldr.checkClusterID(fmt.Sprintf("Clusters.%s.Login.LoginCluster", id), cc.Login.LoginCluster, true),
+ ldr.checkToken(fmt.Sprintf("Clusters.%s.ManagementToken", id), cc.ManagementToken, true, false),
+ ldr.checkToken(fmt.Sprintf("Clusters.%s.SystemRootToken", id), cc.SystemRootToken, true, false),
+ ldr.checkToken(fmt.Sprintf("Clusters.%s.Users.AnonymousUserToken", id), cc.Users.AnonymousUserToken, false, true),
+ ldr.checkToken(fmt.Sprintf("Clusters.%s.Collections.BlobSigningKey", id), cc.Collections.BlobSigningKey, true, false),
+ checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection),
+ ldr.checkEnum("Containers.LocalKeepLogsToContainerLog", cc.Containers.LocalKeepLogsToContainerLog, "none", "all", "errors"),
+ ldr.checkEmptyKeepstores(cc),
+ ldr.checkUnlistedKeepstores(cc),
+ ldr.checkLocalKeepBlobBuffers(cc),
+ ldr.checkStorageClasses(cc),
+ ldr.checkCUDAVersions(cc),
+ // TODO: check non-empty Rendezvous on
+ // services other than Keepstore
} {
if err != nil {
return nil, err
}
}
}
+ cfg.SourceTimestamp = ldr.sourceTimestamp
+ cfg.SourceSHA256 = fmt.Sprintf("%x", sha256.Sum256(ldr.configdata))
+ return &cfg, nil
+}
- // Check for known mistakes
+var acceptableClusterIDRe = regexp.MustCompile(`^[a-z0-9]{5}$`)
+
+func (ldr *Loader) checkClusterID(label, clusterID string, emptyStringOk bool) error {
+ if emptyStringOk && clusterID == "" {
+ return nil
+ } else if !acceptableClusterIDRe.MatchString(clusterID) {
+ return fmt.Errorf("%s: cluster ID should be 5 lowercase alphanumeric characters", label)
+ }
+ return nil
+}
+
+var acceptableTokenRe = regexp.MustCompile(`^[a-zA-Z0-9]+$`)
+var acceptableTokenLength = 32
+
+func (ldr *Loader) checkToken(label, token string, mandatory bool, acceptV2 bool) error {
+ if len(token) == 0 {
+ if !mandatory {
+ // when a token is not mandatory, the acceptable length and content is only checked if its length is non-zero
+ return nil
+ } else {
+ if ldr.Logger != nil {
+ ldr.Logger.Warnf("%s: secret token is not set (use %d+ random characters from a-z, A-Z, 0-9)", label, acceptableTokenLength)
+ }
+ }
+ } else if !acceptableTokenRe.MatchString(token) {
+ if !acceptV2 {
+ return fmt.Errorf("%s: unacceptable characters in token (only a-z, A-Z, 0-9 are acceptable)", label)
+ }
+ // Test for a proper V2 token
+ tmp := strings.SplitN(token, "/", 3)
+ if len(tmp) != 3 {
+ return fmt.Errorf("%s: unacceptable characters in token (only a-z, A-Z, 0-9 are acceptable)", label)
+ }
+ if !strings.HasPrefix(token, "v2/") {
+ return fmt.Errorf("%s: unacceptable characters in token (only a-z, A-Z, 0-9 are acceptable)", label)
+ }
+ if !acceptableTokenRe.MatchString(tmp[2]) {
+ return fmt.Errorf("%s: unacceptable characters in V2 token secret (only a-z, A-Z, 0-9 are acceptable)", label)
+ }
+ if len(tmp[2]) < acceptableTokenLength {
+ ldr.Logger.Warnf("%s: secret is too short (should be at least %d characters)", label, acceptableTokenLength)
+ }
+ } else if len(token) < acceptableTokenLength {
+ if ldr.Logger != nil {
+ ldr.Logger.Warnf("%s: token is too short (should be at least %d characters)", label, acceptableTokenLength)
+ }
+ }
+ return nil
+}
+
+func (ldr *Loader) checkEnum(label, value string, accepted ...string) error {
+ for _, s := range accepted {
+ if s == value {
+ return nil
+ }
+ }
+ return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
+}
+
+func (ldr *Loader) setLoopbackInstanceType(cfg *arvados.Config) error {
for id, cc := range cfg.Clusters {
- err = checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection)
+ if !cc.Containers.CloudVMs.Enable || cc.Containers.CloudVMs.Driver != "loopback" {
+ continue
+ }
+ if len(cc.InstanceTypes) == 1 {
+ continue
+ }
+ if len(cc.InstanceTypes) > 1 {
+ return fmt.Errorf("Clusters.%s.InstanceTypes: cannot use multiple InstanceTypes with loopback driver", id)
+ }
+ // No InstanceTypes configured. Fill in implicit
+ // default.
+ hostram, err := getHostRAM()
if err != nil {
- return nil, err
+ return err
+ }
+ scratch, err := getFilesystemSize(os.TempDir())
+ if err != nil {
+ return err
}
+ cc.InstanceTypes = arvados.InstanceTypeMap{"localhost": {
+ Name: "localhost",
+ ProviderType: "localhost",
+ VCPUs: runtime.NumCPU(),
+ RAM: hostram,
+ Scratch: scratch,
+ IncludedScratch: scratch,
+ Price: 1.0,
+ }}
+ cfg.Clusters[id] = cc
}
- return &cfg, nil
+ return nil
+}
+
+func getFilesystemSize(path string) (arvados.ByteSize, error) {
+ var stat unix.Statfs_t
+ err := unix.Statfs(path, &stat)
+ if err != nil {
+ return 0, err
+ }
+ return arvados.ByteSize(stat.Blocks * uint64(stat.Bsize)), nil
+}
+
+var reMemTotal = regexp.MustCompile(`(^|\n)MemTotal: *(\d+) kB\n`)
+
+func getHostRAM() (arvados.ByteSize, error) {
+ buf, err := os.ReadFile("/proc/meminfo")
+ if err != nil {
+ return 0, err
+ }
+ m := reMemTotal.FindSubmatch(buf)
+ if m == nil {
+ return 0, errors.New("error parsing /proc/meminfo: no MemTotal")
+ }
+ kb, err := strconv.ParseInt(string(m[2]), 10, 64)
+ if err != nil {
+ return 0, fmt.Errorf("error parsing /proc/meminfo: %q: %w", m[2], err)
+ }
+ return arvados.ByteSize(kb) * 1024, nil
+}
+
+func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
+cluster:
+ for id, cc := range cfg.Clusters {
+ if len(cc.StorageClasses) > 0 {
+ continue cluster
+ }
+ for _, vol := range cc.Volumes {
+ if len(vol.StorageClasses) > 0 {
+ continue cluster
+ }
+ }
+ // No explicit StorageClasses config info at all; fill
+ // in implicit defaults.
+ for id, vol := range cc.Volumes {
+ vol.StorageClasses = map[string]bool{"default": true}
+ cc.Volumes[id] = vol
+ }
+ cc.StorageClasses = map[string]arvados.StorageClassConfig{"default": {Default: true}}
+ cfg.Clusters[id] = cc
+ }
+ return nil
+}
+
+func (ldr *Loader) checkLocalKeepBlobBuffers(cc arvados.Cluster) error {
+ kbb := cc.Containers.LocalKeepBlobBuffersPerVCPU
+ if kbb == 0 {
+ return nil
+ }
+ for uuid, vol := range cc.Volumes {
+ if len(vol.AccessViaHosts) > 0 {
+ ldr.Logger.Warnf("LocalKeepBlobBuffersPerVCPU is %d but will not be used because at least one volume (%s) uses AccessViaHosts -- suggest changing to 0", kbb, uuid)
+ return nil
+ }
+ if !vol.ReadOnly && vol.Replication < cc.Collections.DefaultReplication {
+ ldr.Logger.Warnf("LocalKeepBlobBuffersPerVCPU is %d but will not be used because at least one volume (%s) has lower replication than DefaultReplication (%d < %d) -- suggest changing to 0", kbb, uuid, vol.Replication, cc.Collections.DefaultReplication)
+ return nil
+ }
+ }
+ return nil
+}
+
+func (ldr *Loader) checkStorageClasses(cc arvados.Cluster) error {
+ classOnVolume := map[string]bool{}
+ for volid, vol := range cc.Volumes {
+ if len(vol.StorageClasses) == 0 {
+ return fmt.Errorf("%s: volume has no StorageClasses listed", volid)
+ }
+ for classid := range vol.StorageClasses {
+ if _, ok := cc.StorageClasses[classid]; !ok {
+ return fmt.Errorf("%s: volume refers to storage class %q that is not defined in StorageClasses", volid, classid)
+ }
+ classOnVolume[classid] = true
+ }
+ }
+ haveDefault := false
+ for classid, sc := range cc.StorageClasses {
+ if !classOnVolume[classid] && len(cc.Volumes) > 0 {
+ ldr.Logger.Warnf("there are no volumes providing storage class %q", classid)
+ }
+ if sc.Default {
+ haveDefault = true
+ }
+ }
+ if !haveDefault {
+ return fmt.Errorf("there is no default storage class (at least one entry in StorageClasses must have Default: true)")
+ }
+ return nil
+}
+
+func (ldr *Loader) checkCUDAVersions(cc arvados.Cluster) error {
+ for _, it := range cc.InstanceTypes {
+ if it.CUDA.DeviceCount == 0 {
+ continue
+ }
+
+ _, err := strconv.ParseFloat(it.CUDA.DriverVersion, 64)
+ if err != nil {
+ return fmt.Errorf("InstanceType %q has invalid CUDA.DriverVersion %q, expected format X.Y (%v)", it.Name, it.CUDA.DriverVersion, err)
+ }
+ _, err = strconv.ParseFloat(it.CUDA.HardwareCapability, 64)
+ if err != nil {
+ return fmt.Errorf("InstanceType %q has invalid CUDA.HardwareCapability %q, expected format X.Y (%v)", it.Name, it.CUDA.HardwareCapability, err)
+ }
+ }
+ return nil
}
func checkKeyConflict(label string, m map[string]string) error {
return nil
}
+func removeNullKeys(m map[string]interface{}) {
+ for k, v := range m {
+ if v == nil {
+ delete(m, k)
+ }
+ if v, _ := v.(map[string]interface{}); v != nil {
+ removeNullKeys(v)
+ }
+ }
+}
+
func removeSampleKeys(m map[string]interface{}) {
delete(m, "SAMPLE")
for _, v := range m {
if ldr.Logger == nil {
return
}
- allowed := map[string]interface{}{}
- for k, v := range expected {
- allowed[strings.ToLower(k)] = v
- }
for k, vsupp := range supplied {
if k == "SAMPLE" {
// entry will be dropped in removeSampleKeys anyway
continue
}
- vexp, ok := allowed[strings.ToLower(k)]
+ vexp, ok := expected[k]
if expected["SAMPLE"] != nil {
+ // use the SAMPLE entry's keys as the
+ // "expected" map when checking vsupp
+ // recursively.
vexp = expected["SAMPLE"]
} else if !ok {
- ldr.Logger.Warnf("deprecated or unknown config entry: %s%s", prefix, k)
+ // check for a case-insensitive match
+ hint := ""
+ for ek := range expected {
+ if strings.EqualFold(k, ek) {
+ hint = " (perhaps you meant " + ek + "?)"
+ // If we don't delete this, it
+ // will end up getting merged,
+ // unpredictably
+ // merging/overriding the
+ // default.
+ delete(supplied, k)
+ break
+ }
+ }
+ ldr.Logger.Warnf("deprecated or unknown config entry: %s%s%s", prefix, k, hint)
continue
}
if vsupp, ok := vsupp.(map[string]interface{}); !ok {
}
}
}
+
+func (ldr *Loader) autofillPreemptible(label string, cc *arvados.Cluster) {
+ if factor := cc.Containers.PreemptiblePriceFactor; factor > 0 {
+ for name, it := range cc.InstanceTypes {
+ if !it.Preemptible {
+ it.Preemptible = true
+ it.Price = it.Price * factor
+ it.Name = name + ".preemptible"
+ if it2, exists := cc.InstanceTypes[it.Name]; exists && it2 != it {
+ ldr.Logger.Warnf("%s.InstanceTypes[%s]: already exists, so not automatically adding a preemptible variant of %s", label, it.Name, name)
+ continue
+ }
+ cc.InstanceTypes[it.Name] = it
+ }
+ }
+ }
+
+}
+
+// RegisterMetrics registers metrics showing the timestamp and content
+// hash of the currently loaded config.
+//
+// Must not be called more than once for a given registry. Must not be
+// called before Load(). Metrics are not updated by subsequent calls
+// to Load().
+func (ldr *Loader) RegisterMetrics(reg *prometheus.Registry) {
+ hash := fmt.Sprintf("%x", sha256.Sum256(ldr.configdata))
+ vec := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "config",
+ Name: "source_timestamp_seconds",
+ Help: "Timestamp of config file when it was loaded.",
+ }, []string{"sha256"})
+ vec.WithLabelValues(hash).Set(float64(ldr.sourceTimestamp.UnixNano()) / 1e9)
+ reg.MustRegister(vec)
+
+ vec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "config",
+ Name: "load_timestamp_seconds",
+ Help: "Time when config file was loaded.",
+ }, []string{"sha256"})
+ vec.WithLabelValues(hash).Set(float64(ldr.loadTimestamp.UnixNano()) / 1e9)
+ reg.MustRegister(vec)
+}
+
+// Load an SSH private key from the given confvalue, which is either
+// the literal key or an absolute path to a file containing the key.
+func LoadSSHKey(confvalue string) (ssh.Signer, error) {
+ if fnm := strings.TrimPrefix(confvalue, "file://"); fnm != confvalue && strings.HasPrefix(fnm, "/") {
+ keydata, err := os.ReadFile(fnm)
+ if err != nil {
+ return nil, err
+ }
+ return ssh.ParsePrivateKey(keydata)
+ } else {
+ return ssh.ParsePrivateKey([]byte(confvalue))
+ }
+}