14 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 // ClearCache clears the Keep service discovery cache.
19 svcListCacheMtx.Lock()
20 defer svcListCacheMtx.Unlock()
21 for _, ent := range svcListCache {
22 ent.clear <- struct{}{}
26 // ClearCacheOnSIGHUP installs a signal handler that calls
27 // ClearCache when SIGHUP is received.
28 func ClearCacheOnSIGHUP() {
29 svcListCacheMtx.Lock()
30 defer svcListCacheMtx.Unlock()
31 if svcListCacheSignal != nil {
34 svcListCacheSignal = make(chan os.Signal, 1)
35 signal.Notify(svcListCacheSignal, syscall.SIGHUP)
37 for range svcListCacheSignal {
44 svcListCache = map[string]cachedSvcList{}
45 svcListCacheSignal chan os.Signal
46 svcListCacheMtx sync.Mutex
49 type cachedSvcList struct {
50 arv *arvadosclient.ArvadosClient
55 // Check for new services list every few minutes. Send the latest list
56 // to the "latest" channel as needed.
57 func (ent *cachedSvcList) poll() {
58 wakeup := make(chan struct{})
60 replace := make(chan svcList)
68 // Wait here for the next success, in
69 // order to avoid returning stale
70 // results on the "latest" channel.
72 case current = <-replace:
73 case ent.latest <- current:
78 okDelay := 5 * time.Minute
79 errDelay := 3 * time.Second
80 timer := time.NewTimer(okDelay)
86 // Lost race stopping timer; skip extra firing
91 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
93 log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
102 // discoverServices gets the list of available keep services from
105 // If a list of services is provided in the arvadosclient (e.g., from
106 // an environment variable or local config), that list is used
109 // If an API call is made, the result is cached for 5 minutes or until
110 // ClearCache() is called, and during this interval it is reused by
111 // other KeepClients that use the same API server host.
112 func (kc *KeepClient) discoverServices() error {
113 if kc.disableDiscovery {
117 if kc.Arvados.KeepServiceURIs != nil {
118 kc.disableDiscovery = true
119 kc.foundNonDiskSvc = true
120 kc.replicasPerService = 0
121 roots := make(map[string]string)
122 for i, uri := range kc.Arvados.KeepServiceURIs {
123 roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
125 kc.setServiceRoots(roots, roots, roots)
129 svcListCacheMtx.Lock()
130 cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
133 cacheEnt = cachedSvcList{
134 latest: make(chan svcList),
135 clear: make(chan struct{}),
139 svcListCache[kc.Arvados.ApiServer] = cacheEnt
141 svcListCacheMtx.Unlock()
143 return kc.loadKeepServers(<-cacheEnt.latest)
146 // LoadKeepServicesFromJSON gets list of available keep services from
147 // given JSON and disables automatic service discovery.
148 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
149 kc.disableDiscovery = true
152 dec := json.NewDecoder(strings.NewReader(services))
153 if err := dec.Decode(&list); err != nil {
157 return kc.loadKeepServers(list)
160 func (kc *KeepClient) loadKeepServers(list svcList) error {
161 listed := make(map[string]bool)
162 localRoots := make(map[string]string)
163 gatewayRoots := make(map[string]string)
164 writableLocalRoots := make(map[string]string)
166 // replicasPerService is 1 for disks; unknown or unlimited otherwise
167 kc.replicasPerService = 1
169 for _, service := range list.Items {
174 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
182 localRoots[service.Uuid] = url
183 if service.ReadOnly == false {
184 writableLocalRoots[service.Uuid] = url
185 if service.SvcType != "disk" {
186 kc.replicasPerService = 0
190 if service.SvcType != "disk" {
191 kc.foundNonDiskSvc = true
194 // Gateway services are only used when specified by
195 // UUID, so there's nothing to gain by filtering them
196 // by service type. Including all accessible services
197 // (gateway and otherwise) merely accommodates more
198 // service configurations.
199 gatewayRoots[service.Uuid] = url
202 kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)