import (
"crypto/md5"
+ "encoding/json"
"errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/streamer"
"time"
)
-type keepServices struct {
+type keepService struct {
Uuid string `json:"uuid"`
Hostname string `json:"service_host"`
Port int `json:"service_port"`
}
}
+type svcList struct {
+ Items []keepService `json:"items"`
+}
+
// DiscoverKeepServers gets list of available keep services from api server
func (this *KeepClient) DiscoverKeepServers() error {
- type svcList struct {
- Items []keepServices `json:"items"`
- }
- var m svcList
+ var list svcList
// Get keep services from api server
- err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &m)
-
- // If there is error getting keep services, get list of keep disks
+ err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
if err != nil {
- if err := this.Arvados.List("keep_disks", nil, &m); err != nil {
- return err
- }
+ return err
}
+ return this.loadKeepServers(list)
+}
+
+// DiscoverKeepServersFromJSON gets list of available keep services from given JSON
+func (this *KeepClient) DiscoverKeepServersFromJSON(services string) error {
+ var list svcList
+
+ // Load keep services from given json
+ dec := json.NewDecoder(strings.NewReader(services))
+ if err := dec.Decode(&list); err != nil {
+ return err
+ }
+
+ return this.loadKeepServers(list)
+}
+
+// loadKeepServers
+func (this *KeepClient) loadKeepServers(list svcList) error {
listed := make(map[string]bool)
localRoots := make(map[string]string)
gatewayRoots := make(map[string]string)
writableLocalRoots := make(map[string]string)
- this.replicasPerService = 1 // set to 1 until writable non-disk services are found
+ // replicasPerService is 1 for disks; unknown or unlimited otherwise
+ this.replicasPerService = 1
+ this.Using_proxy = false
- for _, service := range m.Items {
+ for _, service := range list.Items {
scheme := "http"
if service.SSL {
scheme = "https"
// Used to communicate status from the upload goroutines
upload_status := make(chan uploadStatus)
- defer close(upload_status)
+ defer func() {
+ // Wait for any abandoned uploads (e.g., we started
+ // two uploads and the first replied with replicas=2)
+ // to finish before closing the status channel.
+ go func() {
+ for active > 0 {
+ <-upload_status
+ }
+ close(upload_status)
+ }()
+ }()
// Desired number of replicas
remaining_replicas := this.Want_replicas
- for remaining_replicas > 0 {
- replicasPerThread := this.replicasPerService
- if replicasPerThread < 1 {
- // unlimited or unknown
- replicasPerThread = remaining_replicas
- }
+ replicasPerThread := this.replicasPerService
+ if replicasPerThread < 1 {
+ // unlimited or unknown
+ replicasPerThread = remaining_replicas
+ }
+ for remaining_replicas > 0 {
for active*replicasPerThread < remaining_replicas {
// Start some upload requests
if next_server < len(sv) {