package keepclient
import (
+ "arvados.org/buffer"
"crypto/md5"
"crypto/tls"
"encoding/json"
"strconv"
)
+// A Keep "block" is 64MB.
+const BLOCKSIZE = 64 * 1024 * 1024
+
+var BlockNotFound = errors.New("Block not found")
+var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
+
type KeepClient struct {
ApiServer string
ApiToken string
ApiInsecure bool
Service_roots []string
Want_replicas int
- client *http.Client
+ Client *http.Client
}
type KeepDisk struct {
SSL bool `json:"service_ssl_flag"`
}
-func MakeKeepClient() (kc *KeepClient, err error) {
- kc = &KeepClient{
- ApiServer: os.Getenv("ARVADOS_API_HOST"),
- ApiToken: os.Getenv("ARVADOS_API_TOKEN"),
- ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")}
-
+func MakeKeepClient() (kc KeepClient, err error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure},
}
- kc.client = &http.Client{Transport: tr}
+ kc = KeepClient{
+ ApiServer: os.Getenv("ARVADOS_API_HOST"),
+ ApiToken: os.Getenv("ARVADOS_API_TOKEN"),
+ ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""),
+ Want_replicas: 2,
+ Client: &http.Client{Transport: tr}}
- err = kc.DiscoverKeepDisks()
+ err = (&kc).DiscoverKeepServers()
return kc, err
}
-func (this *KeepClient) DiscoverKeepDisks() error {
+func (this *KeepClient) DiscoverKeepServers() error {
// Construct request of keep disk list
var req *http.Request
var err error
// Make the request
var resp *http.Response
- if resp, err = this.client.Do(req); err != nil {
+ if resp, err = this.Client.Do(req); err != nil {
return err
}
return pseq
}
-type ReaderSlice struct {
- slice []byte
- reader_error error
-}
-
-// Read repeatedly from the reader into the specified buffer, and report each
-// read to channel 'c'. Completes when Reader 'r' reports on the error channel
-// and closes channel 'c'.
-func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
- defer close(slices)
-
- // Initially use entire buffer as scratch space
- ptr := buffer[:]
- for {
- var n int
- var err error
- if len(ptr) > 0 {
- // Read into the scratch space
- n, err = r.Read(ptr)
- } else {
- // Ran out of scratch space, try reading one more byte
- var b [1]byte
- n, err = r.Read(b[:])
-
- if n > 0 {
- // Reader has more data but we have nowhere to
- // put it, so we're stuffed
- slices <- ReaderSlice{nil, io.ErrShortBuffer}
- } else {
- // Return some other error (hopefully EOF)
- slices <- ReaderSlice{nil, err}
- }
- return
- }
-
- // End on error (includes EOF)
- if err != nil {
- slices <- ReaderSlice{nil, err}
- return
- }
-
- if n > 0 {
- // Make a slice with the contents of the read
- slices <- ReaderSlice{ptr[:n], nil}
-
- // Adjust the scratch space slice
- ptr = ptr[n:]
- }
- }
-}
-
-// A read request to the Transfer() function
-type ReadRequest struct {
- offset int
- maxsize int
- result chan<- ReadResult
-}
-
-// A read result from the Transfer() function
-type ReadResult struct {
- slice []byte
- err error
-}
-
-// Reads from the buffer managed by the Transfer()
-type BufferReader struct {
- offset *int
- requests chan<- ReadRequest
- responses chan ReadResult
-}
-
-func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
- return BufferReader{new(int), requests, make(chan ReadResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this BufferReader) Read(p []byte) (n int, err error) {
- this.requests <- ReadRequest{*this.offset, len(p), this.responses}
- rr, valid := <-this.responses
- if valid {
- *this.offset += len(rr.slice)
- return copy(p, rr.slice), rr.err
- } else {
- return 0, io.ErrUnexpectedEOF
- }
-}
-
-func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
- // Record starting offset in order to correctly report the number of bytes sent
- starting_offset := *this.offset
- for {
- this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses}
- rr, valid := <-this.responses
- if valid {
- log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
- *this.offset += len(rr.slice)
- if rr.err != nil {
- if rr.err == io.EOF {
- // EOF is not an error.
- return int64(*this.offset - starting_offset), nil
- } else {
- return int64(*this.offset - starting_offset), rr.err
- }
- } else {
- dest.Write(rr.slice)
- }
- } else {
- return int64(*this.offset), io.ErrUnexpectedEOF
- }
- }
-}
-
-// Close the responses channel
-func (this BufferReader) Close() error {
- close(this.responses)
- return nil
-}
-
-// Handle a read request. Returns true if a response was sent, and false if
-// the request should be queued.
-func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
- log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body))
- if req.offset < len(body) {
- var end int
- if req.offset+req.maxsize < len(body) {
- end = req.offset + req.maxsize
- } else {
- end = len(body)
- }
- req.result <- ReadResult{body[req.offset:end], nil}
- return true
- } else if complete && req.offset >= len(body) {
- req.result <- ReadResult{nil, io.EOF}
- return true
- } else {
- return false
- }
-}
-
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel. Completes
-// when 'requests' channel is closed.
-func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) {
- // currently buffered data
- var body []byte
-
- // for receiving slices from ReadIntoBuffer
- var slices chan ReaderSlice = nil
-
- // indicates whether the buffered data is complete
- var complete bool = false
-
- if source_reader != nil {
- // 'body' is the buffer slice representing the body content read so far
- body = source_buffer[:0]
-
- // used to communicate slices of the buffer as they are
- // ReadIntoBuffer will close 'slices' when it is done with it
- slices = make(chan ReaderSlice)
-
- // Spin it off
- go ReadIntoBuffer(source_buffer, source_reader, slices)
- } else {
- // use the whole buffer
- body = source_buffer[:]
-
- // buffer is complete
- complete = true
- }
-
- pending_requests := make([]ReadRequest, 0)
-
- for {
- select {
- case req, valid := <-requests:
- // Handle a buffer read request
- if valid {
- if !HandleReadRequest(req, body, complete) {
- pending_requests = append(pending_requests, req)
- }
- } else {
- // closed 'requests' channel indicates we're done
- return
- }
-
- case bk, valid := <-slices:
- // Got a new slice from the reader
- if valid {
- if bk.reader_error != nil {
- reader_error <- bk.reader_error
- if bk.reader_error == io.EOF {
- // EOF indicates the reader is done
- // sending, so our buffer is complete.
- complete = true
- } else {
- // some other reader error
- return
- }
- }
-
- if bk.slice != nil {
- // adjust body bounds now that another slice has been read
- body = source_buffer[0 : len(body)+len(bk.slice)]
- }
-
- // handle pending reads
- n := 0
- for n < len(pending_requests) {
- if HandleReadRequest(pending_requests[n], body, complete) {
-
- // move the element from the
- // back of the slice to
- // position 'n', then shorten
- // the slice by one element
- pending_requests[n] = pending_requests[len(pending_requests)-1]
- pending_requests = pending_requests[0 : len(pending_requests)-1]
- } else {
-
- // Request wasn't handled, so keep it in the request slice
- n += 1
- }
- }
- } else {
- if complete {
- // no more reads
- slices = nil
- } else {
- // reader channel closed without signaling EOF
- reader_error <- io.ErrUnexpectedEOF
- return
- }
- }
- }
- }
-}
-
type UploadStatus struct {
Err error
Url string
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ req.Header.Add("Content-Type", "application/octet-stream")
req.Body = body
var resp *http.Response
- if resp, err = this.client.Do(req); err != nil {
+ if resp, err = this.Client.Do(req); err != nil {
upload_status <- UploadStatus{err, url, 0}
return
}
}
}
-var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-
func (this KeepClient) putReplicas(
hash string,
- requests chan ReadRequest,
+ requests chan buffer.ReadRequest,
reader_status chan error,
- expectedLength int64) error {
+ expectedLength int64) (replicas int, err error) {
// Calculate the ordering for uploading to servers
sv := this.ShuffledServiceRoots(hash)
defer close(upload_status)
// Desired number of replicas
- want_replicas := this.Want_replicas
+ remaining_replicas := this.Want_replicas
- for want_replicas > 0 {
- for active < want_replicas {
+ for remaining_replicas > 0 {
+ for active < remaining_replicas {
// Start some upload requests
if next_server < len(sv) {
- go this.uploadToKeepServer(sv[next_server], hash, MakeBufferReader(requests), upload_status, expectedLength)
+ go this.uploadToKeepServer(sv[next_server], hash, buffer.MakeBufferReader(requests), upload_status, expectedLength)
next_server += 1
active += 1
} else {
- return InsufficientReplicasError
+ return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
}
}
// good news!
} else {
// bad news
- return status
+ return (this.Want_replicas - remaining_replicas), status
}
case status := <-upload_status:
if status.StatusCode == 200 {
// good news!
- want_replicas -= 1
+ remaining_replicas -= 1
} else {
// writing to keep server failed for some reason
log.Printf("Keep server put to %v failed with '%v'",
status.Url, status.Err)
}
active -= 1
- log.Printf("Upload status %v %v %v", status.StatusCode, want_replicas, active)
+ log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
}
}
- return nil
+ return (this.Want_replicas - remaining_replicas), nil
}
var OversizeBlockError = errors.New("Block too big")
-func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) error {
+func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) {
// Buffer for reads from 'r'
- var buffer []byte
+ var buf []byte
if expectedLength > 0 {
- if expectedLength > 64*1024*1024 {
- return OversizeBlockError
+ if expectedLength > BLOCKSIZE {
+ return 0, OversizeBlockError
}
- buffer = make([]byte, expectedLength)
+ buf = make([]byte, expectedLength)
} else {
- buffer = make([]byte, 64*1024*1024)
+ buf = make([]byte, BLOCKSIZE)
}
// Read requests on Transfer() buffer
- requests := make(chan ReadRequest)
+ requests := make(chan buffer.ReadRequest)
defer close(requests)
// Reporting reader error states
defer close(reader_status)
// Start the transfer goroutine
- go Transfer(buffer, r, requests, reader_status)
+ go buffer.Transfer(buf, r, requests, reader_status)
return this.putReplicas(hash, requests, reader_status, expectedLength)
}
-func (this KeepClient) PutHB(hash string, buffer []byte) error {
+func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
// Read requests on Transfer() buffer
- requests := make(chan ReadRequest)
+ requests := make(chan buffer.ReadRequest)
defer close(requests)
// Start the transfer goroutine
- go Transfer(buffer, nil, requests, nil)
+ go buffer.Transfer(buf, nil, requests, nil)
- return this.putReplicas(hash, requests, nil, int64(len(buffer)))
+ return this.putReplicas(hash, requests, nil, int64(len(buf)))
}
-func (this KeepClient) PutB(buffer []byte) error {
- return this.PutHB(fmt.Sprintf("%x", md5.Sum(buffer)), buffer)
+func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) {
+ hash = fmt.Sprintf("%x", md5.Sum(buffer))
+ replicas, err = this.PutHB(hash, buffer)
+ return hash, replicas, err
}
-func (this KeepClient) PutR(r io.Reader) error {
+func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) {
if buffer, err := ioutil.ReadAll(r); err != nil {
- return err
+ return "", 0, err
} else {
return this.PutB(buffer)
}
}
+
+func (this KeepClient) Get(hash string) (reader io.ReadCloser,
+ contentLength int64, url string, err error) {
+ return this.AuthorizedGet(hash, "", "")
+}
+
+func (this KeepClient) AuthorizedGet(hash string,
+ signature string,
+ timestamp string) (reader io.ReadCloser,
+ contentLength int64, url string, err error) {
+
+ // Calculate the ordering for asking servers
+ sv := this.ShuffledServiceRoots(hash)
+
+ for _, host := range sv {
+ var req *http.Request
+ var err error
+ var url string
+ if signature != "" {
+ url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
+ signature, timestamp)
+ } else {
+ url = fmt.Sprintf("%s/%s", host, hash)
+ }
+ if req, err = http.NewRequest("GET", url, nil); err != nil {
+ continue
+ }
+
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+
+ var resp *http.Response
+ if resp, err = this.Client.Do(req); err != nil {
+ continue
+ }
+
+ if resp.StatusCode == http.StatusOK {
+ return resp.Body, resp.ContentLength, url, nil
+ }
+ }
+
+ return nil, 0, "", BlockNotFound
+}
+
+func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
+ return this.AuthorizedAsk(hash, "", "")
+}
+
+func (this KeepClient) AuthorizedAsk(hash string, signature string,
+ timestamp string) (contentLength int64, url string, err error) {
+ // Calculate the ordering for asking servers
+ sv := this.ShuffledServiceRoots(hash)
+
+ for _, host := range sv {
+ var req *http.Request
+ var err error
+ if signature != "" {
+ url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
+ signature, timestamp)
+ } else {
+ url = fmt.Sprintf("%s/%s", host, hash)
+ }
+
+ if req, err = http.NewRequest("HEAD", url, nil); err != nil {
+ continue
+ }
+
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+
+ var resp *http.Response
+ if resp, err = this.Client.Do(req); err != nil {
+ continue
+ }
+
+ if resp.StatusCode == http.StatusOK {
+ return resp.ContentLength, url, nil
+ }
+ }
+
+ return 0, "", BlockNotFound
+
+}