X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2b19cf9f9522dd0e8774031a54ce695e73fb72fe..cd85fc2c07a0e9d8c34b81e4523ee3d1ebd696e9:/sdk/go/src/arvados.org/keepclient/keepclient.go diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go index 4222124196..2738cefa7c 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient.go @@ -15,13 +15,19 @@ import ( "strconv" ) +// A Keep "block" is 64MB. +const BLOCKSIZE = 64 * 1024 * 1024 + +var BlockNotFound = errors.New("Block not found") +var InsufficientReplicasError = errors.New("Could not write sufficient replicas") + type KeepClient struct { ApiServer string ApiToken string ApiInsecure bool Service_roots []string Want_replicas int - client *http.Client + Client *http.Client } type KeepDisk struct { @@ -30,24 +36,24 @@ type KeepDisk struct { SSL bool `json:"service_ssl_flag"` } -func MakeKeepClient() (kc *KeepClient, err error) { - kc = &KeepClient{ - ApiServer: os.Getenv("ARVADOS_API_HOST"), - ApiToken: os.Getenv("ARVADOS_API_TOKEN"), - ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")} - +func MakeKeepClient() (kc KeepClient, err error) { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure}, } - kc.client = &http.Client{Transport: tr} + kc = KeepClient{ + ApiServer: os.Getenv("ARVADOS_API_HOST"), + ApiToken: os.Getenv("ARVADOS_API_TOKEN"), + ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""), + Want_replicas: 2, + Client: &http.Client{Transport: tr}} - err = kc.DiscoverKeepDisks() + err = (&kc).DiscoverKeepServers() return kc, err } -func (this *KeepClient) DiscoverKeepDisks() error { +func (this *KeepClient) DiscoverKeepServers() error { // Construct request of keep disk list var req *http.Request var err error @@ -60,7 +66,7 @@ func (this *KeepClient) DiscoverKeepDisks() error { // Make the request var resp *http.Response - if resp, err = this.client.Do(req); err != nil { + if resp, err = this.Client.Do(req); err != nil { return err } @@ -410,10 +416,11 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read } req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + req.Header.Add("Content-Type", "application/octet-stream") req.Body = body var resp *http.Response - if resp, err = this.client.Do(req); err != nil { + if resp, err = this.Client.Do(req); err != nil { upload_status <- UploadStatus{err, url, 0} return } @@ -425,13 +432,11 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read } } -var InsufficientReplicasError = errors.New("Could not write sufficient replicas") - func (this KeepClient) putReplicas( hash string, requests chan ReadRequest, reader_status chan error, - expectedLength int64) error { + expectedLength int64) (replicas int, err error) { // Calculate the ordering for uploading to servers sv := this.ShuffledServiceRoots(hash) @@ -447,17 +452,17 @@ func (this KeepClient) putReplicas( defer close(upload_status) // Desired number of replicas - want_replicas := this.Want_replicas + remaining_replicas := this.Want_replicas - for want_replicas > 0 { - for active < want_replicas { + for remaining_replicas > 0 { + for active < remaining_replicas { // Start some upload requests if next_server < len(sv) { go this.uploadToKeepServer(sv[next_server], hash, MakeBufferReader(requests), upload_status, expectedLength) next_server += 1 active += 1 } else { - return InsufficientReplicasError + return (this.Want_replicas - remaining_replicas), InsufficientReplicasError } } @@ -468,38 +473,38 @@ func (this KeepClient) putReplicas( // good news! } else { // bad news - return status + return (this.Want_replicas - remaining_replicas), status } case status := <-upload_status: if status.StatusCode == 200 { // good news! - want_replicas -= 1 + remaining_replicas -= 1 } else { // writing to keep server failed for some reason log.Printf("Keep server put to %v failed with '%v'", status.Url, status.Err) } active -= 1 - log.Printf("Upload status %v %v %v", status.StatusCode, want_replicas, active) + log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active) } } - return nil + return (this.Want_replicas - remaining_replicas), nil } var OversizeBlockError = errors.New("Block too big") -func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) error { +func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) { // Buffer for reads from 'r' var buffer []byte if expectedLength > 0 { - if expectedLength > 64*1024*1024 { - return OversizeBlockError + if expectedLength > BLOCKSIZE { + return 0, OversizeBlockError } buffer = make([]byte, expectedLength) } else { - buffer = make([]byte, 64*1024*1024) + buffer = make([]byte, BLOCKSIZE) } // Read requests on Transfer() buffer @@ -516,7 +521,7 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) err return this.putReplicas(hash, requests, reader_status, expectedLength) } -func (this KeepClient) PutHB(hash string, buffer []byte) error { +func (this KeepClient) PutHB(hash string, buffer []byte) (replicas int, err error) { // Read requests on Transfer() buffer requests := make(chan ReadRequest) defer close(requests) @@ -527,14 +532,97 @@ func (this KeepClient) PutHB(hash string, buffer []byte) error { return this.putReplicas(hash, requests, nil, int64(len(buffer))) } -func (this KeepClient) PutB(buffer []byte) error { - return this.PutHB(fmt.Sprintf("%x", md5.Sum(buffer)), buffer) +func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) { + hash = fmt.Sprintf("%x", md5.Sum(buffer)) + replicas, err = this.PutHB(hash, buffer) + return hash, replicas, err } -func (this KeepClient) PutR(r io.Reader) error { +func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) { if buffer, err := ioutil.ReadAll(r); err != nil { - return err + return "", 0, err } else { return this.PutB(buffer) } } + +func (this KeepClient) Get(hash string) (reader io.ReadCloser, + contentLength int64, url string, err error) { + return this.AuthorizedGet(hash, "", "") +} + +func (this KeepClient) AuthorizedGet(hash string, + signature string, + timestamp string) (reader io.ReadCloser, + contentLength int64, url string, err error) { + + // Calculate the ordering for asking servers + sv := this.ShuffledServiceRoots(hash) + + for _, host := range sv { + var req *http.Request + var err error + var url string + if signature != "" { + url = fmt.Sprintf("%s/%s+A%s@%s", host, hash, + signature, timestamp) + } else { + url = fmt.Sprintf("%s/%s", host, hash) + } + if req, err = http.NewRequest("GET", url, nil); err != nil { + continue + } + + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + + var resp *http.Response + if resp, err = this.Client.Do(req); err != nil { + continue + } + + if resp.StatusCode == http.StatusOK { + return resp.Body, resp.ContentLength, url, nil + } + } + + return nil, 0, "", BlockNotFound +} + +func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) { + return this.AuthorizedAsk(hash, "", "") +} + +func (this KeepClient) AuthorizedAsk(hash string, signature string, + timestamp string) (contentLength int64, url string, err error) { + // Calculate the ordering for asking servers + sv := this.ShuffledServiceRoots(hash) + + for _, host := range sv { + var req *http.Request + var err error + if signature != "" { + url = fmt.Sprintf("%s/%s+A%s@%s", host, hash, + signature, timestamp) + } else { + url = fmt.Sprintf("%s/%s", host, hash) + } + + if req, err = http.NewRequest("HEAD", url, nil); err != nil { + continue + } + + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + + var resp *http.Response + if resp, err = this.Client.Do(req); err != nil { + continue + } + + if resp.StatusCode == http.StatusOK { + return resp.ContentLength, url, nil + } + } + + return 0, "", BlockNotFound + +}