"sync/atomic"
)
+// writeBuffer uses a ring buffer to implement an asynchronous write
+// buffer.
+//
+// rpos==wpos means the buffer is empty.
+//
+// rpos==(wpos+1)%size means the buffer is full.
+//
+// size<2 means the buffer is always empty and full, so in this case
+// writeBuffer writes through synchronously.
type writeBuffer struct {
out io.Writer
buf []byte
- writesize int
+ writesize int // max bytes flush() should write in a single out.Write()
wpos atomic.Int64 // index in buf where writer (Write()) will write to next
wsignal chan struct{} // receives a value after wpos or closed changes
rpos atomic.Int64 // index in buf where reader (flush()) will read from next
wpos := int(wb.wpos.Load())
rpos := int(wb.rpos.Load())
for len(todo) > 0 {
+ // wait until the buffer is not full.
for rpos == (wpos+1)%len(wb.buf) {
select {
case <-wb.flushed:
rpos = int(wb.rpos.Load())
}
}
+ // determine next contiguous portion of buffer that is
+ // available.
var avail []byte
if rpos == 0 {
avail = wb.buf[wpos : len(wb.buf)-1]
wpos := 0
closed := false
for {
+ // wait until buffer is not empty.
for rpos == wpos {
if closed {
return
closed = wb.closed.Load()
wpos = int(wb.wpos.Load())
}
+ // determine next contiguous portion of buffer that is
+ // ready to write through.
var ready []byte
if rpos < wpos {
ready = wb.buf[rpos:wpos]
}
}
+// responseWriter enables inserting an io.Writer-wrapper (like
+// *writeBuffer) into an http.ResponseWriter stack.
+//
+// It passes Write() calls to an io.Writer, and all other calls to an
+// http.ResponseWriter.
type responseWriter struct {
io.Writer
http.ResponseWriter