4869: KeepClient now has a default timeout per block request (10 minutes). In
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 29 Dec 2014 14:09:13 +0000 (09:09 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 29 Dec 2014 14:09:46 +0000 (09:09 -0500)
keepproxy, the timeout is set to 20 seconds per block.  Also rearranged some
keepclient and keepproxy logging to provide better information.

sdk/go/keepclient/keepclient.go
sdk/go/keepclient/support.go
services/keepproxy/keepproxy.go

index 326c2a06ae1a5620af8a388ffb9392357dc7d15e..4733bb76293b760641573943f1dd7355501d3e7f 100644 (file)
@@ -2,11 +2,11 @@
 package keepclient
 
 import (
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "crypto/md5"
        "errors"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/streamer"
        "io"
        "io/ioutil"
        "log"
@@ -15,6 +15,7 @@ import (
        "strings"
        "sync"
        "sync/atomic"
+       "time"
        "unsafe"
 )
 
@@ -47,7 +48,7 @@ func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error)
                Arvados:       arv,
                Want_replicas: 2,
                Using_proxy:   false,
-               Client:        &http.Client{Transport: &http.Transport{}}}
+               Client:        &http.Client{Transport: &http.Transport{}, Timeout: 10 * time.Minute}}
 
        err = (&kc).DiscoverKeepServers()
 
@@ -131,6 +132,10 @@ func (this KeepClient) AuthorizedGet(hash string,
        timestamp string) (reader io.ReadCloser,
        contentLength int64, url string, err error) {
 
+       // Take the hash of locator and timestamp in order to identify this
+       // specific transaction in log statements.
+       tag := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
+
        // Calculate the ordering for asking servers
        sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
@@ -150,12 +155,19 @@ func (this KeepClient) AuthorizedGet(hash string,
 
                req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
 
+               log.Printf("[%v] Begin download %s", tag, url)
+
                var resp *http.Response
-               if resp, err = this.Client.Do(req); err != nil {
+               if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
+                       respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+                       response := strings.TrimSpace(string(respbody))
+                       log.Printf("[%v] Download %v status code: %v error: '%v' response: '%v'",
+                               tag, url, resp.StatusCode, err, response)
                        continue
                }
 
                if resp.StatusCode == http.StatusOK {
+                       log.Printf("[%v] Download %v status code: %v", tag, url, resp.StatusCode)
                        return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
                }
        }
index 26338ca571ecc8494287988fa8c8b397efbbca93..dc521da5e29db0bedb6ab960963ff2869112a160 100644 (file)
@@ -87,12 +87,11 @@ type uploadStatus struct {
 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
        upload_status chan<- uploadStatus, expectedLength int64, tag string) {
 
-       log.Printf("[%v] Begin upload %s to %s", tag, hash, host)
-
        var req *http.Request
        var err error
        var url = fmt.Sprintf("%s/%s", host, hash)
        if req, err = http.NewRequest("PUT", url, nil); err != nil {
+               log.Printf("[%v] Error creating request PUT %v error: %v", tag, url, err.Error())
                upload_status <- uploadStatus{err, url, 0, 0, ""}
                body.Close()
                return
@@ -113,8 +112,8 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 
        var resp *http.Response
        if resp, err = this.Client.Do(req); err != nil {
+               log.Printf("[%v] Upload failed %v error: %v", tag, url, err.Error())
                upload_status <- uploadStatus{err, url, 0, 0, ""}
-               body.Close()
                return
        }
 
@@ -127,17 +126,16 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
        defer io.Copy(ioutil.Discard, resp.Body)
 
        respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+       response := strings.TrimSpace(string(respbody))
        if err2 != nil && err2 != io.EOF {
-               upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
-               return
-       }
-
-       locator := strings.TrimSpace(string(respbody))
-
-       if resp.StatusCode == http.StatusOK {
-               upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
+               log.Printf("[%v] Upload %v error: %v response: %v", tag, url, err2.Error(), response)
+               upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response}
+       } else if resp.StatusCode == http.StatusOK {
+               log.Printf("[%v] Upload %v success", tag, url)
+               upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response}
        } else {
-               upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, string(respbody)}
+               log.Printf("[%v] Upload %v error: %v response: %v", tag, url, resp.StatusCode, response)
+               upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
        }
 }
 
@@ -170,6 +168,7 @@ func (this KeepClient) putReplicas(
                for active < remaining_replicas {
                        // Start some upload requests
                        if next_server < len(sv) {
+                               log.Printf("[%v] Begin upload %s to %s", tag, hash, sv[next_server])
                                go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, tag)
                                next_server += 1
                                active += 1
@@ -181,22 +180,17 @@ func (this KeepClient) putReplicas(
                                }
                        }
                }
+               log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+                       tag, remaining_replicas, active)
 
                // Now wait for something to happen.
                status := <-upload_status
-               log.Printf("[%v] Upload to %v status code: %v remaining replicas: %v active: %v",
-                       tag, status.url, status.statusCode, remaining_replicas, active)
+               active -= 1
                if status.statusCode == 200 {
                        // good news!
                        remaining_replicas -= status.replicas_stored
                        locator = status.response
-               } else {
-                       // writing to keep server failed for some reason
-                       log.Printf("[%v] Upload to %v failed with error '%v', response '%v'",
-                               tag, status.url, status.statusCode, status.err, status.response)
                }
-               active -= 1
-
        }
 
        return locator, this.Want_replicas, nil
index 888db7357d47bc8035301501b8d201a3dc8c4c36..38376c9f7ffa69f8c42b13c29d574cfa14ddaffc 100644 (file)
@@ -1,10 +1,10 @@
 package main
 
 import (
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/gorilla/mux"
        "io"
        "io/ioutil"
@@ -30,6 +30,7 @@ func main() {
                no_get           bool
                no_put           bool
                default_replicas int
+               timeout          int
                pidfile          string
        )
 
@@ -61,6 +62,12 @@ func main() {
                2,
                "Default number of replicas to write if not specified by the client.")
 
+       flagset.IntVar(
+               &timeout,
+               "timeout",
+               20,
+               "Timeout on requests to internal Keep services")
+
        flagset.StringVar(
                &pidfile,
                "pid",
@@ -90,6 +97,7 @@ func main() {
        }
 
        kc.Want_replicas = default_replicas
+       kc.Client.Timeout = 20 * time.Second
 
        listener, err = net.Listen("tcp", listen)
        if err != nil {
@@ -283,7 +291,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
        locator := keepclient.MakeLocator2(hash, hints)
 
-       log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
+       log.Printf("%s: %s %s begin", GetRemoteAddress(req), req.Method, hash)
 
        var pass bool
        var tok string
@@ -308,32 +316,43 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
        }
 
-       if blocklen > 0 {
+       if blocklen > -1 {
                resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
+       } else {
+               log.Printf("%s: %s %s Keep server did not return Content-Length",
+                       GetRemoteAddress(req), req.Method, hash)
        }
 
+       var status = 0
        switch err {
        case nil:
+               status = http.StatusOK
                if reader != nil {
                        n, err2 := io.Copy(resp, reader)
-                       if n != blocklen {
-                               log.Printf("%s: %s %s mismatched return %v with Content-Length %v error %v", GetRemoteAddress(req), req.Method, hash, n, blocklen, err2)
+                       if blocklen > -1 && n != blocklen {
+                               log.Printf("%s: %s %s %v %v mismatched copy size expected Content-Length: %v",
+                                       GetRemoteAddress(req), req.Method, hash, status, n, blocklen)
                        } else if err2 == nil {
-                               log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
+                               log.Printf("%s: %s %s %v %v",
+                                       GetRemoteAddress(req), req.Method, hash, status, n)
                        } else {
-                               log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
+                               log.Printf("%s: %s %s %v %v copy error: %v",
+                                       GetRemoteAddress(req), req.Method, hash, status, n, err2.Error())
                        }
                } else {
-                       log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
+                       log.Printf("%s: %s %s %v 0", GetRemoteAddress(req), req.Method, hash, status)
                }
        case keepclient.BlockNotFound:
+               status = http.StatusNotFound
                http.Error(resp, "Not found", http.StatusNotFound)
        default:
+               status = http.StatusBadGateway
                http.Error(resp, err.Error(), http.StatusBadGateway)
        }
 
        if err != nil {
-               log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
+               log.Printf("%s: %s %s %v error: %v",
+                       GetRemoteAddress(req), req.Method, hash, status, err.Error())
        }
 }