2e3fe0a5b130fe5259550b450dea2bf0237cd295
[arvados.git] / services / keepstore / config.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io/ioutil"
12         "strings"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/Sirupsen/logrus"
17 )
18
19 type Config struct {
20         Debug  bool
21         Listen string
22
23         LogFormat string
24
25         PIDFile string
26
27         MaxBuffers  int
28         MaxRequests int
29
30         BlobSignatureTTL    arvados.Duration
31         BlobSigningKeyFile  string
32         RequireSignatures   bool
33         SystemAuthTokenFile string
34         EnableDelete        bool
35         TrashLifetime       arvados.Duration
36         TrashCheckInterval  arvados.Duration
37         PullWorkers         int
38         TrashWorkers        int
39         EmptyTrashWorkers   int
40         TLSCertificateFile  string
41         TLSKeyFile          string
42
43         Volumes VolumeList
44
45         blobSigningKey  []byte
46         systemAuthToken string
47         debugLogf       func(string, ...interface{})
48
49         ManagementToken string
50 }
51
52 var (
53         theConfig = DefaultConfig()
54         formatter = map[string]logrus.Formatter{
55                 "text": &logrus.TextFormatter{
56                         FullTimestamp:   true,
57                         TimestampFormat: rfc3339NanoFixed,
58                 },
59                 "json": &logrus.JSONFormatter{
60                         TimestampFormat: rfc3339NanoFixed,
61                 },
62         }
63         log = logrus.StandardLogger()
64 )
65
66 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
67
68 // DefaultConfig returns the default configuration.
69 func DefaultConfig() *Config {
70         return &Config{
71                 Listen:             ":25107",
72                 LogFormat:          "json",
73                 MaxBuffers:         128,
74                 RequireSignatures:  true,
75                 BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
76                 TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
77                 TrashCheckInterval: arvados.Duration(24 * time.Hour),
78                 Volumes:            []Volume{},
79         }
80 }
81
82 // Start should be called exactly once: after setting all public
83 // fields, and before using the config.
84 func (cfg *Config) Start() error {
85         if cfg.Debug {
86                 log.Level = logrus.DebugLevel
87                 cfg.debugLogf = log.Printf
88                 cfg.debugLogf("debugging enabled")
89         } else {
90                 log.Level = logrus.InfoLevel
91                 cfg.debugLogf = func(string, ...interface{}) {}
92         }
93
94         f := formatter[strings.ToLower(cfg.LogFormat)]
95         if f == nil {
96                 return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
97         }
98         log.Formatter = f
99
100         if cfg.MaxBuffers < 0 {
101                 return fmt.Errorf("MaxBuffers must be greater than zero")
102         }
103         bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
104
105         if cfg.MaxRequests < 1 {
106                 cfg.MaxRequests = cfg.MaxBuffers * 2
107                 log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
108         }
109
110         if cfg.BlobSigningKeyFile != "" {
111                 buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
112                 if err != nil {
113                         return fmt.Errorf("reading blob signing key file: %s", err)
114                 }
115                 cfg.blobSigningKey = bytes.TrimSpace(buf)
116                 if len(cfg.blobSigningKey) == 0 {
117                         return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
118                 }
119         } else if cfg.RequireSignatures {
120                 return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
121         } else {
122                 log.Println("Running without a blob signing key. Block locators " +
123                         "returned by this server will not be signed, and will be rejected " +
124                         "by a server that enforces permissions.")
125                 log.Println("To fix this, use the BlobSigningKeyFile config entry.")
126         }
127
128         if fn := cfg.SystemAuthTokenFile; fn != "" {
129                 buf, err := ioutil.ReadFile(fn)
130                 if err != nil {
131                         return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
132                 }
133                 cfg.systemAuthToken = strings.TrimSpace(string(buf))
134         }
135
136         if cfg.EnableDelete {
137                 log.Print("Trash/delete features are enabled. WARNING: this has not " +
138                         "been extensively tested. You should disable this unless you can afford to lose data.")
139         }
140
141         if len(cfg.Volumes) == 0 {
142                 if (&unixVolumeAdder{cfg}).Discover() == 0 {
143                         return fmt.Errorf("no volumes found")
144                 }
145         }
146         for _, v := range cfg.Volumes {
147                 if err := v.Start(); err != nil {
148                         return fmt.Errorf("volume %s: %s", v, err)
149                 }
150                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
151         }
152         return nil
153 }
154
155 // VolumeTypes is built up by init() funcs in the source files that
156 // define the volume types.
157 var VolumeTypes = []func() VolumeWithExamples{}
158
159 type VolumeList []Volume
160
161 // UnmarshalJSON -- given an array of objects -- deserializes each
162 // object as the volume type indicated by the object's Type field.
163 func (vl *VolumeList) UnmarshalJSON(data []byte) error {
164         typeMap := map[string]func() VolumeWithExamples{}
165         for _, factory := range VolumeTypes {
166                 t := factory().Type()
167                 if _, ok := typeMap[t]; ok {
168                         log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
169                 }
170                 typeMap[t] = factory
171         }
172
173         var mapList []map[string]interface{}
174         err := json.Unmarshal(data, &mapList)
175         if err != nil {
176                 return err
177         }
178         for _, mapIn := range mapList {
179                 typeIn, ok := mapIn["Type"].(string)
180                 if !ok {
181                         return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
182                 }
183                 factory, ok := typeMap[typeIn]
184                 if !ok {
185                         return fmt.Errorf("unsupported volume type %+q", typeIn)
186                 }
187                 data, err := json.Marshal(mapIn)
188                 if err != nil {
189                         return err
190                 }
191                 vol := factory()
192                 err = json.Unmarshal(data, vol)
193                 if err != nil {
194                         return err
195                 }
196                 *vl = append(*vl, vol)
197         }
198         return nil
199 }
200
201 // MarshalJSON adds a "Type" field to each volume corresponding to its
202 // Type().
203 func (vl *VolumeList) MarshalJSON() ([]byte, error) {
204         data := []byte{'['}
205         for _, vs := range *vl {
206                 j, err := json.Marshal(vs)
207                 if err != nil {
208                         return nil, err
209                 }
210                 if len(data) > 1 {
211                         data = append(data, byte(','))
212                 }
213                 t, err := json.Marshal(vs.Type())
214                 if err != nil {
215                         panic(err)
216                 }
217                 data = append(data, j[0])
218                 data = append(data, []byte(`"Type":`)...)
219                 data = append(data, t...)
220                 data = append(data, byte(','))
221                 data = append(data, j[1:]...)
222         }
223         return append(data, byte(']')), nil
224 }