X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2a493a9215f604c63ab7bc6f0e0956d10af8ef10..c3a88cbf511aa0954dac271ce6bda9c6e4f3191c:/sdk/go/src/arvados.org/keepclient/keepclient.go?ds=inline diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go index f9dce5f7e8..2738cefa7c 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient.go @@ -1,21 +1,33 @@ package keepclient import ( + "crypto/md5" "crypto/tls" "encoding/json" + "errors" "fmt" "io" + "io/ioutil" + "log" "net/http" "os" "sort" "strconv" ) +// A Keep "block" is 64MB. +const BLOCKSIZE = 64 * 1024 * 1024 + +var BlockNotFound = errors.New("Block not found") +var InsufficientReplicasError = errors.New("Could not write sufficient replicas") + type KeepClient struct { ApiServer string ApiToken string ApiInsecure bool Service_roots []string + Want_replicas int + Client *http.Client } type KeepDisk struct { @@ -24,56 +36,74 @@ type KeepDisk struct { SSL bool `json:"service_ssl_flag"` } -func MakeKeepClient() (kc *KeepClient, err error) { - kc = &KeepClient{ - ApiServer: os.Getenv("ARVADOS_API_HOST"), - ApiToken: os.Getenv("ARVADOS_API_TOKEN"), - ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")} - - if err := kc.DiscoverKeepDisks(); err != nil { - return nil, err +func MakeKeepClient() (kc KeepClient, err error) { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure}, } - return kc, nil -} + kc = KeepClient{ + ApiServer: os.Getenv("ARVADOS_API_HOST"), + ApiToken: os.Getenv("ARVADOS_API_TOKEN"), + ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""), + Want_replicas: 2, + Client: &http.Client{Transport: tr}} -func (this *KeepClient) DiscoverKeepDisks() error { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: this.ApiInsecure}, - } - client := &http.Client{Transport: tr} + err = (&kc).DiscoverKeepServers() + + return kc, err +} +func (this *KeepClient) DiscoverKeepServers() error { + // Construct request of keep disk list var req *http.Request var err error - if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil { + if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil { return err } - var resp *http.Response + // Add api token header req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) - if resp, err = client.Do(req); err != nil { + + // Make the request + var resp *http.Response + if resp, err = this.Client.Do(req); err != nil { return err } type SvcList struct { Items []KeepDisk `json:"items"` } + + // Decode json reply dec := json.NewDecoder(resp.Body) var m SvcList if err := dec.Decode(&m); err != nil { return err } - this.Service_roots = make([]string, len(m.Items)) - for index, element := range m.Items { + listed := make(map[string]bool) + this.Service_roots = make([]string, 0, len(m.Items)) + + for _, element := range m.Items { n := "" if element.SSL { n = "s" } - this.Service_roots[index] = fmt.Sprintf("http%s://%s:%d", - n, element.Hostname, element.Port) + + // Construct server URL + url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port) + + // Skip duplicates + if !listed[url] { + listed[url] = true + this.Service_roots = append(this.Service_roots, url) + } } + + // Must be sorted for ShuffledServiceRoots() to produce consistent + // results. sort.Strings(this.Service_roots) + return nil } @@ -130,92 +160,150 @@ type ReaderSlice struct { reader_error error } -type Source <-chan ReaderSlice -type Sink chan<- ReaderSlice -type Status chan error - // Read repeatedly from the reader into the specified buffer, and report each -// read to channel 'c'. Completes when Reader 'r' reports an error and closes -// channel 'c'. -func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) { - defer close(c) +// read to channel 'c'. Completes when Reader 'r' reports on the error channel +// and closes channel 'c'. +func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) { + defer close(slices) // Initially use entire buffer as scratch space ptr := buffer[:] - for len(ptr) > 0 { - // Read into the scratch space - n, err := r.Read(ptr) + for { + var n int + var err error + if len(ptr) > 0 { + // Read into the scratch space + n, err = r.Read(ptr) + } else { + // Ran out of scratch space, try reading one more byte + var b [1]byte + n, err = r.Read(b[:]) + + if n > 0 { + // Reader has more data but we have nowhere to + // put it, so we're stuffed + slices <- ReaderSlice{nil, io.ErrShortBuffer} + } else { + // Return some other error (hopefully EOF) + slices <- ReaderSlice{nil, err} + } + return + } // End on error (includes EOF) if err != nil { - c <- ReaderSlice{nil, err} + slices <- ReaderSlice{nil, err} return } - // Make a slice with the contents of the read - c <- ReaderSlice{ptr[:n], nil} + if n > 0 { + // Make a slice with the contents of the read + slices <- ReaderSlice{ptr[:n], nil} - // Adjust the scratch space slice - ptr = ptr[n:] - } - if len(ptr) == 0 { - c <- ReaderSlice{nil, io.ErrShortBuffer} + // Adjust the scratch space slice + ptr = ptr[n:] + } } } -// Take slices from 'source' channel and write them to Writer 'w'. Reports read -// or write errors on 'status'. Completes when 'source' channel is closed. -/*func SinkWriter(source Source, w io.Writer, status Status) { - can_write = true +// A read request to the Transfer() function +type ReadRequest struct { + offset int + maxsize int + result chan<- ReadResult +} + +// A read result from the Transfer() function +type ReadResult struct { + slice []byte + err error +} - for { - // Get the next block from the source - rs, valid := <-source +// Reads from the buffer managed by the Transfer() +type BufferReader struct { + offset *int + requests chan<- ReadRequest + responses chan ReadResult +} +func MakeBufferReader(requests chan<- ReadRequest) BufferReader { + return BufferReader{new(int), requests, make(chan ReadResult)} +} + +// Reads from the buffer managed by the Transfer() +func (this BufferReader) Read(p []byte) (n int, err error) { + this.requests <- ReadRequest{*this.offset, len(p), this.responses} + rr, valid := <-this.responses + if valid { + *this.offset += len(rr.slice) + return copy(p, rr.slice), rr.err + } else { + return 0, io.ErrUnexpectedEOF + } +} + +func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) { + // Record starting offset in order to correctly report the number of bytes sent + starting_offset := *this.offset + for { + this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses} + rr, valid := <-this.responses if valid { - if rs.error != nil { - // propagate reader status (should only be EOF) - status <- rs.error - } else if can_write { - buf := rs.slice[:] - for len(buf) > 0 { - n, err := w.Write(buf) - buf = buf[n:] - if err == io.ErrShortWrite { - // short write, so go around again - } else if err != nil { - // some other write error, - // propagate error and stop - // further writes - status <- err - can_write = false - } + log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err) + *this.offset += len(rr.slice) + if rr.err != nil { + if rr.err == io.EOF { + // EOF is not an error. + return int64(*this.offset - starting_offset), nil + } else { + return int64(*this.offset - starting_offset), rr.err } + } else { + dest.Write(rr.slice) } } else { - // source channel closed - break + return int64(*this.offset), io.ErrUnexpectedEOF } } -}*/ +} + +// Close the responses channel +func (this BufferReader) Close() error { + close(this.responses) + return nil +} -func closeSinks(sinks_slice []Sink) { - for _, s := range sinks_slice { - close(s) +// Handle a read request. Returns true if a response was sent, and false if +// the request should be queued. +func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool { + log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body)) + if req.offset < len(body) { + var end int + if req.offset+req.maxsize < len(body) { + end = req.offset + req.maxsize + } else { + end = len(body) + } + req.result <- ReadResult{body[req.offset:end], nil} + return true + } else if complete && req.offset >= len(body) { + req.result <- ReadResult{nil, io.EOF} + return true + } else { + return false } } -// Transfer data from a source (either an already-filled buffer, or a reader) -// into one or more 'sinks'. If 'source' is valid, it will read from the -// reader into the buffer and send the data to the sinks. Otherwise 'buffer' -// it will just send the contents of the buffer to the sinks. Completes when -// the 'sinks' channel is closed. -func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) { +// If 'source_reader' is not nil, reads data from 'source_reader' and stores it +// in the provided buffer. Otherwise, use the contents of 'buffer' as is. +// Accepts read requests on the buffer on the 'requests' channel. Completes +// when 'requests' channel is closed. +func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) { // currently buffered data var body []byte // for receiving slices from ReadIntoBuffer - var slices chan []byte = nil + var slices chan ReaderSlice = nil // indicates whether the buffered data is complete var complete bool = false @@ -224,47 +312,41 @@ func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, // 'body' is the buffer slice representing the body content read so far body = source_buffer[:0] - // used to communicate slices of the buffer as read - reader_slices := make(chan []ReaderSlice) + // used to communicate slices of the buffer as they are + // ReadIntoBuffer will close 'slices' when it is done with it + slices = make(chan ReaderSlice) // Spin it off - go ReadIntoBuffer(source_buffer, source_reader, reader_slices) + go ReadIntoBuffer(source_buffer, source_reader, slices) } else { // use the whole buffer body = source_buffer[:] - // that's it + // buffer is complete complete = true } - // list of sinks to send to - sinks_slice := make([]Sink, 0) - defer closeSinks(sinks_slice) + pending_requests := make([]ReadRequest, 0) for { select { - case s, valid := <-sinks: + case req, valid := <-requests: + // Handle a buffer read request if valid { - // add to the sinks slice - sinks_slice = append(sinks_slice, s) - - // catch up the sink with the current body contents - if len(body) > 0 { - s <- ReaderSlice{body, nil} - if complete { - s <- ReaderSlice{nil, io.EOF} - } + if !HandleReadRequest(req, body, complete) { + pending_requests = append(pending_requests, req) } } else { - // closed 'sinks' channel indicates we're done + // closed 'requests' channel indicates we're done return } case bk, valid := <-slices: + // Got a new slice from the reader if valid { - if bk.err != nil { - reader_error <- bk.err - if bk.err == io.EOF { + if bk.reader_error != nil { + reader_error <- bk.reader_error + if bk.reader_error == io.EOF { // EOF indicates the reader is done // sending, so our buffer is complete. complete = true @@ -279,89 +361,108 @@ func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, body = source_buffer[0 : len(body)+len(bk.slice)] } - // send the new slice to the sinks - for _, s := range sinks_slice { - s <- bk - } - - if complete { - // got an EOF, so close the sinks - closeSinks(sinks_slice) + // handle pending reads + n := 0 + for n < len(pending_requests) { + if HandleReadRequest(pending_requests[n], body, complete) { + + // move the element from the + // back of the slice to + // position 'n', then shorten + // the slice by one element + pending_requests[n] = pending_requests[len(pending_requests)-1] + pending_requests = pending_requests[0 : len(pending_requests)-1] + } else { - // truncate sinks slice - sinks_slice = sinks_slice[:0] + // Request wasn't handled, so keep it in the request slice + n += 1 + } } } else { - // no more reads - slices = nil + if complete { + // no more reads + slices = nil + } else { + // reader channel closed without signaling EOF + reader_error <- io.ErrUnexpectedEOF + return + } } } } } -func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) { - pipereader, pipewriter := io.Pipe() +type UploadStatus struct { + Err error + Url string + StatusCode int +} + +func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, + upload_status chan<- UploadStatus, expectedLength int64) { + + log.Printf("Uploading to %s", host) var req *http.Request - if req, err = http.NewRequest("POST", url, nil); err != nil { - write_status <- err + var err error + var url = fmt.Sprintf("%s/%s", host, hash) + if req, err = http.NewRequest("PUT", url, nil); err != nil { + upload_status <- UploadStatus{err, url, 0} + return } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) - req.Body = pipereader - - // create channel to transfer slices from reader to writer - tr := make(chan ReaderSlice) - // start the writer goroutine - go SinkWriter(tr, pipewriter, write_status) + if expectedLength > 0 { + req.ContentLength = expectedLength + } - // now transfer the channel to the reader goroutine - sinks <- tr + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + req.Header.Add("Content-Type", "application/octet-stream") + req.Body = body var resp *http.Response + if resp, err = this.Client.Do(req); err != nil { + upload_status <- UploadStatus{err, url, 0} + return + } - if resp, err = client.Do(req); err != nil { - return nil, err + if resp.StatusCode == http.StatusOK { + upload_status <- UploadStatus{nil, url, resp.StatusCode} + } else { + upload_status <- UploadStatus{errors.New(resp.Status), url, resp.StatusCode} } } -var KeepWriteError = errors.new("Could not write sufficient replicas") +func (this KeepClient) putReplicas( + hash string, + requests chan ReadRequest, + reader_status chan error, + expectedLength int64) (replicas int, err error) { -func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error { - // Calculate the ordering to try writing to servers + // Calculate the ordering for uploading to servers sv := this.ShuffledServiceRoots(hash) // The next server to try contacting - n := 0 + next_server := 0 // The number of active writers active := 0 - // Used to buffer reads from 'r' - buffer := make([]byte, 64*1024*1024) - - // Used to send writers to the reader goroutine - sinks := make(chan Sink) - defer close(sinks) - - // Used to communicate status from the reader goroutine - reader_status := make(chan error) + // Used to communicate status from the upload goroutines + upload_status := make(chan UploadStatus) + defer close(upload_status) - // Start the reader goroutine - go Transfer(buffer, r, sinks, reader_status) + // Desired number of replicas + remaining_replicas := this.Want_replicas - // Used to communicate status from the writer goroutines - write_status := make(chan error) - - for want_replicas > 0 { - for active < want_replicas { - // Start some writers - if n < len(sv) { - go this.ConnectToKeepServer(sv[n], sinks, write_status) - n += 1 + for remaining_replicas > 0 { + for active < remaining_replicas { + // Start some upload requests + if next_server < len(sv) { + go this.uploadToKeepServer(sv[next_server], hash, MakeBufferReader(requests), upload_status, expectedLength) + next_server += 1 active += 1 } else { - return KeepWriteError + return (this.Want_replicas - remaining_replicas), InsufficientReplicasError } } @@ -372,16 +473,156 @@ func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) erro // good news! } else { // bad news - return status + return (this.Want_replicas - remaining_replicas), status } - case status := <-write_status: - if status == io.EOF { + case status := <-upload_status: + if status.StatusCode == 200 { // good news! - want_replicas -= 1 + remaining_replicas -= 1 } else { - // writing to keep server failed for some reason. + // writing to keep server failed for some reason + log.Printf("Keep server put to %v failed with '%v'", + status.Url, status.Err) } active -= 1 + log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active) } } + + return (this.Want_replicas - remaining_replicas), nil +} + +var OversizeBlockError = errors.New("Block too big") + +func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) { + + // Buffer for reads from 'r' + var buffer []byte + if expectedLength > 0 { + if expectedLength > BLOCKSIZE { + return 0, OversizeBlockError + } + buffer = make([]byte, expectedLength) + } else { + buffer = make([]byte, BLOCKSIZE) + } + + // Read requests on Transfer() buffer + requests := make(chan ReadRequest) + defer close(requests) + + // Reporting reader error states + reader_status := make(chan error) + defer close(reader_status) + + // Start the transfer goroutine + go Transfer(buffer, r, requests, reader_status) + + return this.putReplicas(hash, requests, reader_status, expectedLength) +} + +func (this KeepClient) PutHB(hash string, buffer []byte) (replicas int, err error) { + // Read requests on Transfer() buffer + requests := make(chan ReadRequest) + defer close(requests) + + // Start the transfer goroutine + go Transfer(buffer, nil, requests, nil) + + return this.putReplicas(hash, requests, nil, int64(len(buffer))) +} + +func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) { + hash = fmt.Sprintf("%x", md5.Sum(buffer)) + replicas, err = this.PutHB(hash, buffer) + return hash, replicas, err +} + +func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) { + if buffer, err := ioutil.ReadAll(r); err != nil { + return "", 0, err + } else { + return this.PutB(buffer) + } +} + +func (this KeepClient) Get(hash string) (reader io.ReadCloser, + contentLength int64, url string, err error) { + return this.AuthorizedGet(hash, "", "") +} + +func (this KeepClient) AuthorizedGet(hash string, + signature string, + timestamp string) (reader io.ReadCloser, + contentLength int64, url string, err error) { + + // Calculate the ordering for asking servers + sv := this.ShuffledServiceRoots(hash) + + for _, host := range sv { + var req *http.Request + var err error + var url string + if signature != "" { + url = fmt.Sprintf("%s/%s+A%s@%s", host, hash, + signature, timestamp) + } else { + url = fmt.Sprintf("%s/%s", host, hash) + } + if req, err = http.NewRequest("GET", url, nil); err != nil { + continue + } + + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + + var resp *http.Response + if resp, err = this.Client.Do(req); err != nil { + continue + } + + if resp.StatusCode == http.StatusOK { + return resp.Body, resp.ContentLength, url, nil + } + } + + return nil, 0, "", BlockNotFound +} + +func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) { + return this.AuthorizedAsk(hash, "", "") +} + +func (this KeepClient) AuthorizedAsk(hash string, signature string, + timestamp string) (contentLength int64, url string, err error) { + // Calculate the ordering for asking servers + sv := this.ShuffledServiceRoots(hash) + + for _, host := range sv { + var req *http.Request + var err error + if signature != "" { + url = fmt.Sprintf("%s/%s+A%s@%s", host, hash, + signature, timestamp) + } else { + url = fmt.Sprintf("%s/%s", host, hash) + } + + if req, err = http.NewRequest("HEAD", url, nil); err != nil { + continue + } + + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + + var resp *http.Response + if resp, err = this.Client.Do(req); err != nil { + continue + } + + if resp.StatusCode == http.StatusOK { + return resp.ContentLength, url, nil + } + } + + return 0, "", BlockNotFound + }