"io"
"io/ioutil"
"log"
+ "net"
"net/http"
- "os"
"strings"
"time"
)
return fmt.Sprintf("%x", md5.Sum([]byte(s)))
}
-func (this *KeepClient) DiscoverKeepServers() error {
- if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
- sr := map[string]string{"proxy": prx}
- this.SetServiceRoots(sr)
- this.Using_proxy = true
- if this.Client.Timeout == 0 {
- this.Client.Timeout = 10 * time.Minute
+// Set timeouts apply when connecting to keepproxy services (assumed to be over
+// the Internet).
+func (this *KeepClient) setClientSettingsProxy() {
+ if this.Client.Timeout == 0 {
+ // Maximum time to wait for a complete response
+ this.Client.Timeout = 300 * time.Second
+
+ // TCP and TLS connection settings
+ this.Client.Transport = &http.Transport{
+ Dial: (&net.Dialer{
+ // The maximum time to wait to set up
+ // the initial TCP connection.
+ Timeout: 30 * time.Second,
+
+ // The TCP keep alive heartbeat
+ // interval.
+ KeepAlive: 120 * time.Second,
+ }).Dial,
+
+ TLSHandshakeTimeout: 10 * time.Second,
}
- return nil
}
+}
+
+// Set timeouts apply when connecting to keepstore services directly (assumed
+// to be on the local network).
+func (this *KeepClient) setClientSettingsStore() {
if this.Client.Timeout == 0 {
- this.Client.Timeout = 15 * time.Second
+ // Maximum time to wait for a complete response
+ this.Client.Timeout = 20 * time.Second
+
+ // TCP and TLS connection timeouts
+ this.Client.Transport = &http.Transport{
+ Dial: (&net.Dialer{
+ // The maximum time to wait to set up
+ // the initial TCP connection.
+ Timeout: 2 * time.Second,
+
+ // The TCP keep alive heartbeat
+ // interval.
+ KeepAlive: 180 * time.Second,
+ }).Dial,
+
+ TLSHandshakeTimeout: 4 * time.Second,
+ }
}
+}
+func (this *KeepClient) DiscoverKeepServers() error {
type svcList struct {
Items []keepDisk `json:"items"`
}
}
listed := make(map[string]bool)
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
+ gatewayRoots := make(map[string]string)
- for _, element := range m.Items {
- n := ""
-
- if element.SSL {
- n = "s"
+ for _, service := range m.Items {
+ scheme := "http"
+ if service.SSL {
+ scheme = "https"
}
-
- // Construct server URL
- url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
+ url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
// Skip duplicates
- if !listed[url] {
- listed[url] = true
- service_roots[element.Uuid] = url
+ if listed[url] {
+ continue
}
- if element.SvcType == "proxy" {
+ listed[url] = true
+
+ switch service.SvcType {
+ case "disk":
+ localRoots[service.Uuid] = url
+ case "proxy":
+ localRoots[service.Uuid] = url
this.Using_proxy = true
}
+ // Gateway services are only used when specified by
+ // UUID, so there's nothing to gain by filtering them
+ // by service type. Including all accessible services
+ // (gateway and otherwise) merely accommodates more
+ // service configurations.
+ gatewayRoots[service.Uuid] = url
}
- this.SetServiceRoots(service_roots)
+ if this.Using_proxy {
+ this.setClientSettingsProxy()
+ } else {
+ this.setClientSettingsStore()
+ }
+ this.SetServiceRoots(localRoots, gatewayRoots)
return nil
}
return
}
- if expectedLength > -1 {
- req.ContentLength = expectedLength
- }
- if expectedLength == 0 {
- defer body.Close()
+ req.ContentLength = expectedLength
+ if expectedLength > 0 {
+ // http.Client.Do will close the body ReadCloser when it is
+ // done with it.
+ req.Body = body
+ } else {
+ // "For client requests, a value of 0 means unknown if Body is
+ // not nil." In this case we do want the body to be empty, so
+ // don't set req.Body. However, we still need to close the
+ // body ReadCloser.
+ body.Close()
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
}
- req.Body = body
-
var resp *http.Response
if resp, err = this.Client.Do(req); err != nil {
log.Printf("[%v] Upload failed %v error: %v", requestId, url, err.Error())
requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
// Calculate the ordering for uploading to servers
- sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
+ sv := NewRootSorter(this.LocalRoots(), hash).GetSortedRoots()
// The next server to try contacting
next_server := 0