b88613488b4fe8e6a1515b48171f85997567306d
[arvados.git] / sdk / go / src / arvados.org / streamer / streamer.go
1 /* Implements a buffer that supports concurrent incremental read and append.
2 New readers start reading from the beginning of the buffer, block when reaching
3 the end of the buffer, and are unblocked as new data is added.
4
5 Usage:
6
7 Begin reading into a buffer with maximum size 'buffersize' from 'source':
8   tr := StartTransferFromReader(buffersize, source)
9
10 To create a new reader (this can be called multiple times):
11   r := tr.MakeStreamReader()
12
13 When you're done with the buffer:
14   tr.Close()
15
16
17 Alternately, if you already have a filled buffer and just want to read out from it:
18   tr := StartTransferFromSlice(buf)
19   r := tr.MakeStreamReader()
20   tr.Close()
21
22 */
23
24 package streamer
25
26 import (
27         "io"
28 )
29
30 type AsyncStream struct {
31         requests      chan readRequest
32         Reader_status chan error
33 }
34
35 // Reads from the buffer managed by the Transfer()
36 type StreamReader struct {
37         offset    int
38         requests  chan<- readRequest
39         responses chan readResult
40 }
41
42 func AsyncReaderStream(buffersize int, source io.Reader) *AsyncStream {
43         buf := make([]byte, buffersize)
44
45         t := &AsyncStream{make(chan readRequest), make(chan error)}
46
47         go transfer(buf, source, t.requests, t.Reader_status)
48
49         return t
50 }
51
52 func AsyncSliceStream(buf []byte) *AsyncStream {
53         t := &AsyncStream{make(chan readRequest), nil}
54
55         go transfer(buf, nil, t.requests, nil)
56
57         return t
58 }
59
60 func (this *AsyncStream) MakeStreamReader() *StreamReader {
61         return &StreamReader{0, this.requests, make(chan readResult)}
62 }
63
64 // Reads from the buffer managed by the Transfer()
65 func (this *StreamReader) Read(p []byte) (n int, err error) {
66         this.requests <- readRequest{this.offset, len(p), this.responses}
67         rr, valid := <-this.responses
68         if valid {
69                 this.offset += len(rr.slice)
70                 return copy(p, rr.slice), rr.err
71         } else {
72                 return 0, io.ErrUnexpectedEOF
73         }
74 }
75
76 func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
77         // Record starting offset in order to correctly report the number of bytes sent
78         starting_offset := this.offset
79         for {
80                 this.requests <- readRequest{this.offset, 32 * 1024, this.responses}
81                 rr, valid := <-this.responses
82                 if valid {
83                         this.offset += len(rr.slice)
84                         if rr.err != nil {
85                                 if rr.err == io.EOF {
86                                         // EOF is not an error.
87                                         return int64(this.offset - starting_offset), nil
88                                 } else {
89                                         return int64(this.offset - starting_offset), rr.err
90                                 }
91                         } else {
92                                 dest.Write(rr.slice)
93                         }
94                 } else {
95                         return int64(this.offset), io.ErrUnexpectedEOF
96                 }
97         }
98 }
99
100 // Close the responses channel
101 func (this *StreamReader) Close() error {
102         close(this.responses)
103         return nil
104 }
105
106 func (this *AsyncStream) Close() {
107         close(this.requests)
108         if this.Reader_status != nil {
109                 close(this.Reader_status)
110         }
111 }