13937: Refactors approach to pass volume metrics as curried vecs (WIP)
[arvados.git] / services / keepstore / config.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io/ioutil"
12         "strings"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18 )
19
20 type Config struct {
21         Debug  bool
22         Listen string
23
24         LogFormat string
25
26         PIDFile string
27
28         MaxBuffers  int
29         MaxRequests int
30
31         BlobSignatureTTL    arvados.Duration
32         BlobSigningKeyFile  string
33         RequireSignatures   bool
34         SystemAuthTokenFile string
35         EnableDelete        bool
36         TrashLifetime       arvados.Duration
37         TrashCheckInterval  arvados.Duration
38         PullWorkers         int
39         TrashWorkers        int
40         EmptyTrashWorkers   int
41         TLSCertificateFile  string
42         TLSKeyFile          string
43
44         Volumes VolumeList
45
46         blobSigningKey  []byte
47         systemAuthToken string
48         debugLogf       func(string, ...interface{})
49
50         ManagementToken string
51 }
52
53 var (
54         theConfig = DefaultConfig()
55         formatter = map[string]logrus.Formatter{
56                 "text": &logrus.TextFormatter{
57                         FullTimestamp:   true,
58                         TimestampFormat: rfc3339NanoFixed,
59                 },
60                 "json": &logrus.JSONFormatter{
61                         TimestampFormat: rfc3339NanoFixed,
62                 },
63         }
64         log = logrus.StandardLogger()
65 )
66
67 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
68
69 // DefaultConfig returns the default configuration.
70 func DefaultConfig() *Config {
71         return &Config{
72                 Listen:             ":25107",
73                 LogFormat:          "json",
74                 MaxBuffers:         128,
75                 RequireSignatures:  true,
76                 BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
77                 TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
78                 TrashCheckInterval: arvados.Duration(24 * time.Hour),
79                 Volumes:            []Volume{},
80         }
81 }
82
83 // Start should be called exactly once: after setting all public
84 // fields, and before using the config.
85 func (cfg *Config) Start(reg *prometheus.Registry) error {
86         if cfg.Debug {
87                 log.Level = logrus.DebugLevel
88                 cfg.debugLogf = log.Printf
89                 cfg.debugLogf("debugging enabled")
90         } else {
91                 log.Level = logrus.InfoLevel
92                 cfg.debugLogf = func(string, ...interface{}) {}
93         }
94
95         f := formatter[strings.ToLower(cfg.LogFormat)]
96         if f == nil {
97                 return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
98         }
99         log.Formatter = f
100
101         if cfg.MaxBuffers < 0 {
102                 return fmt.Errorf("MaxBuffers must be greater than zero")
103         }
104         bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
105
106         if cfg.MaxRequests < 1 {
107                 cfg.MaxRequests = cfg.MaxBuffers * 2
108                 log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
109         }
110
111         if cfg.BlobSigningKeyFile != "" {
112                 buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
113                 if err != nil {
114                         return fmt.Errorf("reading blob signing key file: %s", err)
115                 }
116                 cfg.blobSigningKey = bytes.TrimSpace(buf)
117                 if len(cfg.blobSigningKey) == 0 {
118                         return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
119                 }
120         } else if cfg.RequireSignatures {
121                 return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
122         } else {
123                 log.Println("Running without a blob signing key. Block locators " +
124                         "returned by this server will not be signed, and will be rejected " +
125                         "by a server that enforces permissions.")
126                 log.Println("To fix this, use the BlobSigningKeyFile config entry.")
127         }
128
129         if fn := cfg.SystemAuthTokenFile; fn != "" {
130                 buf, err := ioutil.ReadFile(fn)
131                 if err != nil {
132                         return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
133                 }
134                 cfg.systemAuthToken = strings.TrimSpace(string(buf))
135         }
136
137         if cfg.EnableDelete {
138                 log.Print("Trash/delete features are enabled. WARNING: this has not " +
139                         "been extensively tested. You should disable this unless you can afford to lose data.")
140         }
141
142         if len(cfg.Volumes) == 0 {
143                 if (&unixVolumeAdder{cfg}).Discover() == 0 {
144                         return fmt.Errorf("no volumes found")
145                 }
146         }
147         vm := newVolumeMetricsVecs(reg)
148         for _, v := range cfg.Volumes {
149                 metrics := vm.curryWith(
150                         v.String(),
151                         v.Status().MountPoint,
152                         fmt.Sprintf("%d", v.Status().DeviceNum))
153                 if err := v.Start(metrics); err != nil {
154                         return fmt.Errorf("volume %s: %s", v, err)
155                 }
156                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
157         }
158         return nil
159 }
160
161 // VolumeTypes is built up by init() funcs in the source files that
162 // define the volume types.
163 var VolumeTypes = []func() VolumeWithExamples{}
164
165 type VolumeList []Volume
166
167 // UnmarshalJSON -- given an array of objects -- deserializes each
168 // object as the volume type indicated by the object's Type field.
169 func (vl *VolumeList) UnmarshalJSON(data []byte) error {
170         typeMap := map[string]func() VolumeWithExamples{}
171         for _, factory := range VolumeTypes {
172                 t := factory().Type()
173                 if _, ok := typeMap[t]; ok {
174                         log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
175                 }
176                 typeMap[t] = factory
177         }
178
179         var mapList []map[string]interface{}
180         err := json.Unmarshal(data, &mapList)
181         if err != nil {
182                 return err
183         }
184         for _, mapIn := range mapList {
185                 typeIn, ok := mapIn["Type"].(string)
186                 if !ok {
187                         return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
188                 }
189                 factory, ok := typeMap[typeIn]
190                 if !ok {
191                         return fmt.Errorf("unsupported volume type %+q", typeIn)
192                 }
193                 data, err := json.Marshal(mapIn)
194                 if err != nil {
195                         return err
196                 }
197                 vol := factory()
198                 err = json.Unmarshal(data, vol)
199                 if err != nil {
200                         return err
201                 }
202                 *vl = append(*vl, vol)
203         }
204         return nil
205 }
206
207 // MarshalJSON adds a "Type" field to each volume corresponding to its
208 // Type().
209 func (vl *VolumeList) MarshalJSON() ([]byte, error) {
210         data := []byte{'['}
211         for _, vs := range *vl {
212                 j, err := json.Marshal(vs)
213                 if err != nil {
214                         return nil, err
215                 }
216                 if len(data) > 1 {
217                         data = append(data, byte(','))
218                 }
219                 t, err := json.Marshal(vs.Type())
220                 if err != nil {
221                         panic(err)
222                 }
223                 data = append(data, j[0])
224                 data = append(data, []byte(`"Type":`)...)
225                 data = append(data, t...)
226                 data = append(data, byte(','))
227                 data = append(data, j[1:]...)
228         }
229         return append(data, byte(']')), nil
230 }