X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f68cf673f8f459ff8140c5a3428a4ef649bb4f1f..84650a094921bb149ffb31a95bee9875dfd1c1df:/services/keep-web/writebuffer.go diff --git a/services/keep-web/writebuffer.go b/services/keep-web/writebuffer.go index f309b69484..90bdcb476b 100644 --- a/services/keep-web/writebuffer.go +++ b/services/keep-web/writebuffer.go @@ -11,10 +11,19 @@ import ( "sync/atomic" ) +// writeBuffer uses a ring buffer to implement an asynchronous write +// buffer. +// +// rpos==wpos means the buffer is empty. +// +// rpos==(wpos+1)%size means the buffer is full. +// +// size<2 means the buffer is always empty and full, so in this case +// writeBuffer writes through synchronously. type writeBuffer struct { out io.Writer buf []byte - writesize int + writesize int // max bytes flush() should write in a single out.Write() wpos atomic.Int64 // index in buf where writer (Write()) will write to next wsignal chan struct{} // receives a value after wpos or closed changes rpos atomic.Int64 // index in buf where reader (flush()) will read from next @@ -63,6 +72,7 @@ func (wb *writeBuffer) Write(p []byte) (int, error) { wpos := int(wb.wpos.Load()) rpos := int(wb.rpos.Load()) for len(todo) > 0 { + // wait until the buffer is not full. for rpos == (wpos+1)%len(wb.buf) { select { case <-wb.flushed: @@ -74,6 +84,8 @@ func (wb *writeBuffer) Write(p []byte) (int, error) { rpos = int(wb.rpos.Load()) } } + // determine next contiguous portion of buffer that is + // available. var avail []byte if rpos == 0 { avail = wb.buf[wpos : len(wb.buf)-1] @@ -101,6 +113,7 @@ func (wb *writeBuffer) flush() { wpos := 0 closed := false for { + // wait until buffer is not empty. for rpos == wpos { if closed { return @@ -109,6 +122,8 @@ func (wb *writeBuffer) flush() { closed = wb.closed.Load() wpos = int(wb.wpos.Load()) } + // determine next contiguous portion of buffer that is + // ready to write through. var ready []byte if rpos < wpos { ready = wb.buf[rpos:wpos] @@ -131,6 +146,11 @@ func (wb *writeBuffer) flush() { } } +// responseWriter enables inserting an io.Writer-wrapper (like +// *writeBuffer) into an http.ResponseWriter stack. +// +// It passes Write() calls to an io.Writer, and all other calls to an +// http.ResponseWriter. type responseWriter struct { io.Writer http.ResponseWriter