package keepclient
import (
- "git.curoverse.com/arvados.git/sdk/go/streamer"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"crypto/md5"
"errors"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
"log"
"strings"
"sync"
"sync/atomic"
+ "time"
"unsafe"
)
Arvados: arv,
Want_replicas: 2,
Using_proxy: false,
- Client: &http.Client{Transport: &http.Transport{}}}
+ Client: &http.Client{Transport: &http.Transport{}, Timeout: 10 * time.Minute}}
err = (&kc).DiscoverKeepServers()
timestamp string) (reader io.ReadCloser,
contentLength int64, url string, err error) {
+ // Take the hash of locator and timestamp in order to identify this
+ // specific transaction in log statements.
+ tag := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
+
// Calculate the ordering for asking servers
sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
+ log.Printf("[%v] Begin download %s", tag, url)
+
var resp *http.Response
- if resp, err = this.Client.Do(req); err != nil {
+ if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
+ respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ response := strings.TrimSpace(string(respbody))
+ log.Printf("[%v] Download %v status code: %v error: '%v' response: '%v'",
+ tag, url, resp.StatusCode, err, response)
continue
}
if resp.StatusCode == http.StatusOK {
+ log.Printf("[%v] Download %v status code: %v", tag, url, resp.StatusCode)
return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
}
}
func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
upload_status chan<- uploadStatus, expectedLength int64, tag string) {
- log.Printf("[%v] Begin upload %s to %s", tag, hash, host)
-
var req *http.Request
var err error
var url = fmt.Sprintf("%s/%s", host, hash)
if req, err = http.NewRequest("PUT", url, nil); err != nil {
+ log.Printf("[%v] Error creating request PUT %v error: %v", tag, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
body.Close()
return
var resp *http.Response
if resp, err = this.Client.Do(req); err != nil {
+ log.Printf("[%v] Upload failed %v error: %v", tag, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
- body.Close()
return
}
defer io.Copy(ioutil.Discard, resp.Body)
respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ response := strings.TrimSpace(string(respbody))
if err2 != nil && err2 != io.EOF {
- upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
- return
- }
-
- locator := strings.TrimSpace(string(respbody))
-
- if resp.StatusCode == http.StatusOK {
- upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
+ log.Printf("[%v] Upload %v error: %v response: %v", tag, url, err2.Error(), response)
+ upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response}
+ } else if resp.StatusCode == http.StatusOK {
+ log.Printf("[%v] Upload %v success", tag, url)
+ upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response}
} else {
- upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, string(respbody)}
+ log.Printf("[%v] Upload %v error: %v response: %v", tag, url, resp.StatusCode, response)
+ upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
}
}
for active < remaining_replicas {
// Start some upload requests
if next_server < len(sv) {
+ log.Printf("[%v] Begin upload %s to %s", tag, hash, sv[next_server])
go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, tag)
next_server += 1
active += 1
}
}
}
+ log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+ tag, remaining_replicas, active)
// Now wait for something to happen.
status := <-upload_status
- log.Printf("[%v] Upload to %v status code: %v remaining replicas: %v active: %v",
- tag, status.url, status.statusCode, remaining_replicas, active)
+ active -= 1
if status.statusCode == 200 {
// good news!
remaining_replicas -= status.replicas_stored
locator = status.response
- } else {
- // writing to keep server failed for some reason
- log.Printf("[%v] Upload to %v failed with error '%v', response '%v'",
- tag, status.url, status.statusCode, status.err, status.response)
}
- active -= 1
-
}
return locator, this.Want_replicas, nil
package main
import (
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/gorilla/mux"
"io"
"io/ioutil"
no_get bool
no_put bool
default_replicas int
+ timeout int
pidfile string
)
2,
"Default number of replicas to write if not specified by the client.")
+ flagset.IntVar(
+ &timeout,
+ "timeout",
+ 20,
+ "Timeout on requests to internal Keep services")
+
flagset.StringVar(
&pidfile,
"pid",
}
kc.Want_replicas = default_replicas
+ kc.Client.Timeout = 20 * time.Second
listener, err = net.Listen("tcp", listen)
if err != nil {
locator := keepclient.MakeLocator2(hash, hints)
- log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
+ log.Printf("%s: %s %s begin", GetRemoteAddress(req), req.Method, hash)
var pass bool
var tok string
blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
}
- if blocklen > 0 {
+ if blocklen > -1 {
resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
+ } else {
+ log.Printf("%s: %s %s Keep server did not return Content-Length",
+ GetRemoteAddress(req), req.Method, hash)
}
+ var status = 0
switch err {
case nil:
+ status = http.StatusOK
if reader != nil {
n, err2 := io.Copy(resp, reader)
- if n != blocklen {
- log.Printf("%s: %s %s mismatched return %v with Content-Length %v error %v", GetRemoteAddress(req), req.Method, hash, n, blocklen, err2)
+ if blocklen > -1 && n != blocklen {
+ log.Printf("%s: %s %s %v %v mismatched copy size expected Content-Length: %v",
+ GetRemoteAddress(req), req.Method, hash, status, n, blocklen)
} else if err2 == nil {
- log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
+ log.Printf("%s: %s %s %v %v",
+ GetRemoteAddress(req), req.Method, hash, status, n)
} else {
- log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
+ log.Printf("%s: %s %s %v %v copy error: %v",
+ GetRemoteAddress(req), req.Method, hash, status, n, err2.Error())
}
} else {
- log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
+ log.Printf("%s: %s %s %v 0", GetRemoteAddress(req), req.Method, hash, status)
}
case keepclient.BlockNotFound:
+ status = http.StatusNotFound
http.Error(resp, "Not found", http.StatusNotFound)
default:
+ status = http.StatusBadGateway
http.Error(resp, err.Error(), http.StatusBadGateway)
}
if err != nil {
- log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
+ log.Printf("%s: %s %s %v error: %v",
+ GetRemoteAddress(req), req.Method, hash, status, err.Error())
}
}