1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 // streamWriterAt translates random-access writes to sequential
15 // writes. The caller is expected to use an arbitrary sequence of
16 // non-overlapping WriteAt calls covering all positions between 0 and
17 // N, for any N < len(buf), then call Close.
19 // streamWriterAt writes the data to the provided io.Writer in
22 // streamWriterAt can also be used as an asynchronous buffer: the
23 // caller can use the io.Writer interface to write into a memory
24 // buffer and return without waiting for the wrapped writer to catch
27 // Close returns when all data has been written through.
28 type streamWriterAt struct {
31 writepos int // target offset if Write is called
32 partsize int // size of each part written through to writer
33 endpos int // portion of buf actually used, judging by WriteAt calls so far
34 partfilled []int // number of bytes written to each part so far
35 partready chan []byte // parts of buf fully written / waiting for writer goroutine
36 partnext int // index of next part we will send to partready when it's ready
37 wroteAt int // bytes we copied to buf in WriteAt
38 wrote int // bytes successfully written through to writer
39 errWrite chan error // final outcome of writer goroutine
40 closed bool // streamWriterAt has been closed
41 mtx sync.Mutex // guard internal fields during concurrent calls to WriteAt and Close
44 // newStreamWriterAt creates a new streamWriterAt.
45 func newStreamWriterAt(w io.Writer, partsize int, buf []byte) *streamWriterAt {
49 nparts := (len(buf) + partsize - 1) / partsize
50 swa := &streamWriterAt{
54 partfilled: make([]int, nparts),
55 partready: make(chan []byte, nparts),
56 errWrite: make(chan error, 1),
58 go swa.writeToWriter()
62 // Wrote returns the number of bytes written through to the
65 // Wrote must not be called until after Close.
66 func (swa *streamWriterAt) Wrote() int {
70 // Wrote returns the number of bytes passed to WriteAt, regardless of
71 // whether they were written through to the io.Writer.
72 func (swa *streamWriterAt) WroteAt() int {
74 defer swa.mtx.Unlock()
78 func (swa *streamWriterAt) writeToWriter() {
79 defer close(swa.errWrite)
80 for p := range swa.partready {
81 n, err := swa.writer.Write(p)
90 // Write implements io.Writer.
91 func (swa *streamWriterAt) Write(p []byte) (int, error) {
92 n, err := swa.WriteAt(p, int64(swa.writepos))
97 // WriteAt implements io.WriterAt.
98 func (swa *streamWriterAt) WriteAt(p []byte, offset int64) (int, error) {
101 if pos <= len(swa.buf) {
102 n = copy(swa.buf[pos:], p)
105 return n, fmt.Errorf("write beyond end of buffer: offset %d len %d buf %d", offset, len(p), len(swa.buf))
110 defer swa.mtx.Unlock()
111 swa.wroteAt += len(p)
112 if swa.endpos < endpos {
116 return 0, errors.New("invalid use of closed streamWriterAt")
118 // Track the number of bytes that landed in each of our
120 for i := pos; i < endpos; {
121 j := i + swa.partsize - (i % swa.partsize)
125 pf := swa.partfilled[i/swa.partsize]
127 if pf > swa.partsize {
128 return 0, errors.New("streamWriterAt: overlapping WriteAt calls")
130 swa.partfilled[i/swa.partsize] = pf
133 // Flush filled parts to partready.
134 for swa.partnext < len(swa.partfilled) && swa.partfilled[swa.partnext] == swa.partsize {
135 offset := swa.partnext * swa.partsize
136 swa.partready <- swa.buf[offset : offset+swa.partsize]
142 // Close flushes all buffered data through to the io.Writer.
143 func (swa *streamWriterAt) Close() error {
145 defer swa.mtx.Unlock()
147 return errors.New("invalid use of closed streamWriterAt")
150 // Flush last part if needed. If the input doesn't end on a
151 // part boundary, the last part never appears "filled" when we
152 // check in WriteAt. But here, we know endpos is the end of
153 // the stream, so we can check whether the last part is ready.
154 if offset := swa.partnext * swa.partsize; offset < swa.endpos && offset+swa.partfilled[swa.partnext] == swa.endpos {
155 swa.partready <- swa.buf[offset:swa.endpos]
159 err := <-swa.errWrite
163 if swa.wrote != swa.wroteAt {
164 return fmt.Errorf("streamWriterAt: detected hole in input: wrote %d but flushed %d", swa.wroteAt, swa.wrote)