10903: support need_transaction for job and pipeline_instance cancel methods.
[arvados.git] / services / keepstore / config.go
1 package main
2
3 import (
4         "bytes"
5         "encoding/json"
6         "fmt"
7         "io/ioutil"
8         "strings"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12         log "github.com/Sirupsen/logrus"
13 )
14
15 type Config struct {
16         Debug  bool
17         Listen string
18
19         LogFormat string
20
21         PIDFile string
22
23         MaxBuffers  int
24         MaxRequests int
25
26         BlobSignatureTTL    arvados.Duration
27         BlobSigningKeyFile  string
28         RequireSignatures   bool
29         SystemAuthTokenFile string
30         EnableDelete        bool
31         TrashLifetime       arvados.Duration
32         TrashCheckInterval  arvados.Duration
33
34         Volumes VolumeList
35
36         blobSigningKey  []byte
37         systemAuthToken string
38         debugLogf       func(string, ...interface{})
39 }
40
41 var theConfig = DefaultConfig()
42
43 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
44
45 // DefaultConfig returns the default configuration.
46 func DefaultConfig() *Config {
47         return &Config{
48                 Listen:             ":25107",
49                 LogFormat:          "json",
50                 MaxBuffers:         128,
51                 RequireSignatures:  true,
52                 BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
53                 TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
54                 TrashCheckInterval: arvados.Duration(24 * time.Hour),
55                 Volumes:            []Volume{},
56         }
57 }
58
59 // Start should be called exactly once: after setting all public
60 // fields, and before using the config.
61 func (cfg *Config) Start() error {
62         if cfg.Debug {
63                 log.SetLevel(log.DebugLevel)
64                 cfg.debugLogf = log.Printf
65                 cfg.debugLogf("debugging enabled")
66         } else {
67                 cfg.debugLogf = func(string, ...interface{}) {}
68         }
69
70         switch strings.ToLower(cfg.LogFormat) {
71         case "text":
72                 log.SetFormatter(&log.TextFormatter{
73                         FullTimestamp:   true,
74                         TimestampFormat: rfc3339NanoFixed,
75                 })
76         case "json":
77                 log.SetFormatter(&log.JSONFormatter{
78                         TimestampFormat: rfc3339NanoFixed,
79                 })
80         default:
81                 return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
82         }
83
84         if cfg.MaxBuffers < 0 {
85                 return fmt.Errorf("MaxBuffers must be greater than zero")
86         }
87         bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
88
89         if cfg.MaxRequests < 1 {
90                 cfg.MaxRequests = cfg.MaxBuffers * 2
91                 log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
92         }
93
94         if cfg.BlobSigningKeyFile != "" {
95                 buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
96                 if err != nil {
97                         return fmt.Errorf("reading blob signing key file: %s", err)
98                 }
99                 cfg.blobSigningKey = bytes.TrimSpace(buf)
100                 if len(cfg.blobSigningKey) == 0 {
101                         return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
102                 }
103         } else if cfg.RequireSignatures {
104                 return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
105         } else {
106                 log.Println("Running without a blob signing key. Block locators " +
107                         "returned by this server will not be signed, and will be rejected " +
108                         "by a server that enforces permissions.")
109                 log.Println("To fix this, use the BlobSigningKeyFile config entry.")
110         }
111
112         if fn := cfg.SystemAuthTokenFile; fn != "" {
113                 buf, err := ioutil.ReadFile(fn)
114                 if err != nil {
115                         return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
116                 }
117                 cfg.systemAuthToken = strings.TrimSpace(string(buf))
118         }
119
120         if cfg.EnableDelete {
121                 log.Print("Trash/delete features are enabled. WARNING: this has not " +
122                         "been extensively tested. You should disable this unless you can afford to lose data.")
123         }
124
125         if len(cfg.Volumes) == 0 {
126                 if (&unixVolumeAdder{cfg}).Discover() == 0 {
127                         return fmt.Errorf("no volumes found")
128                 }
129         }
130         for _, v := range cfg.Volumes {
131                 if err := v.Start(); err != nil {
132                         return fmt.Errorf("volume %s: %s", v, err)
133                 }
134                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
135         }
136         return nil
137 }
138
139 // VolumeTypes is built up by init() funcs in the source files that
140 // define the volume types.
141 var VolumeTypes = []func() VolumeWithExamples{}
142
143 type VolumeList []Volume
144
145 // UnmarshalJSON, given an array of objects, deserializes each object
146 // as the volume type indicated by the object's Type field.
147 func (vols *VolumeList) UnmarshalJSON(data []byte) error {
148         typeMap := map[string]func() VolumeWithExamples{}
149         for _, factory := range VolumeTypes {
150                 t := factory().Type()
151                 if _, ok := typeMap[t]; ok {
152                         log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
153                 }
154                 typeMap[t] = factory
155         }
156
157         var mapList []map[string]interface{}
158         err := json.Unmarshal(data, &mapList)
159         if err != nil {
160                 return err
161         }
162         for _, mapIn := range mapList {
163                 typeIn, ok := mapIn["Type"].(string)
164                 if !ok {
165                         return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
166                 }
167                 factory, ok := typeMap[typeIn]
168                 if !ok {
169                         return fmt.Errorf("unsupported volume type %+q", typeIn)
170                 }
171                 data, err := json.Marshal(mapIn)
172                 if err != nil {
173                         return err
174                 }
175                 vol := factory()
176                 err = json.Unmarshal(data, vol)
177                 if err != nil {
178                         return err
179                 }
180                 *vols = append(*vols, vol)
181         }
182         return nil
183 }
184
185 // MarshalJSON adds a "Type" field to each volume corresponding to its
186 // Type().
187 func (vl *VolumeList) MarshalJSON() ([]byte, error) {
188         data := []byte{'['}
189         for _, vs := range *vl {
190                 j, err := json.Marshal(vs)
191                 if err != nil {
192                         return nil, err
193                 }
194                 if len(data) > 1 {
195                         data = append(data, byte(','))
196                 }
197                 t, err := json.Marshal(vs.Type())
198                 if err != nil {
199                         panic(err)
200                 }
201                 data = append(data, j[0])
202                 data = append(data, []byte(`"Type":`)...)
203                 data = append(data, t...)
204                 data = append(data, byte(','))
205                 data = append(data, j[1:]...)
206         }
207         return append(data, byte(']')), nil
208 }