1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 // writeBuffer uses a ring buffer to implement an asynchronous write
17 // rpos==wpos means the buffer is empty.
19 // rpos==(wpos+1)%size means the buffer is full.
21 // size<2 means the buffer is always empty and full, so in this case
22 // writeBuffer writes through synchronously.
23 type writeBuffer struct {
26 writesize int // max bytes flush() should write in a single out.Write()
27 wpos atomic.Int64 // index in buf where writer (Write()) will write to next
28 wsignal chan struct{} // receives a value after wpos or closed changes
29 rpos atomic.Int64 // index in buf where reader (flush()) will read from next
30 rsignal chan struct{} // receives a value after rpos or err changes
31 err error // error encountered by flush
33 flushed chan struct{} // closes when flush() is finished
36 func newWriteBuffer(w io.Writer, size int) *writeBuffer {
39 buf: make([]byte, size),
40 writesize: (size + 63) / 64,
41 wsignal: make(chan struct{}, 1),
42 rsignal: make(chan struct{}, 1),
43 flushed: make(chan struct{}),
49 func (wb *writeBuffer) Close() error {
51 return errors.New("writeBuffer: already closed")
56 case wb.wsignal <- struct{}{}:
59 // wait for flush() to finish
64 func (wb *writeBuffer) Write(p []byte) (int, error) {
66 // Our buffer logic doesn't work with size<2, and such
67 // a tiny buffer has no purpose anyway, so just write
68 // through unbuffered.
69 return wb.out.Write(p)
72 wpos := int(wb.wpos.Load())
73 rpos := int(wb.rpos.Load())
75 // wait until the buffer is not full.
76 for rpos == (wpos+1)%len(wb.buf) {
80 return 0, errors.New("Write called on closed writeBuffer")
84 rpos = int(wb.rpos.Load())
87 // determine next contiguous portion of buffer that is
91 avail = wb.buf[wpos : len(wb.buf)-1]
92 } else if wpos >= rpos {
95 avail = wb.buf[wpos : rpos-1]
97 n := copy(avail, todo)
98 wpos = (wpos + n) % len(wb.buf)
99 wb.wpos.Store(int64(wpos))
102 case wb.wsignal <- struct{}{}:
110 func (wb *writeBuffer) flush() {
111 defer close(wb.flushed)
116 // wait until buffer is not empty.
122 closed = wb.closed.Load()
123 wpos = int(wb.wpos.Load())
125 // determine next contiguous portion of buffer that is
126 // ready to write through.
129 ready = wb.buf[rpos:wpos]
131 ready = wb.buf[rpos:]
133 if len(ready) > wb.writesize {
134 ready = ready[:wb.writesize]
136 _, wb.err = wb.out.Write(ready)
140 rpos = (rpos + len(ready)) % len(wb.buf)
141 wb.rpos.Store(int64(rpos))
143 case wb.rsignal <- struct{}{}:
149 // responseWriter enables inserting an io.Writer-wrapper (like
150 // *writeBuffer) into an http.ResponseWriter stack.
152 // It passes Write() calls to an io.Writer, and all other calls to an
153 // http.ResponseWriter.
154 type responseWriter struct {
159 func (rwc responseWriter) Write(p []byte) (int, error) {
160 return rwc.Writer.Write(p)