+ return resp, err
+ }
+ if limitedReader.N == 0 {
+ // Data size is either exactly BlockSize, or too big.
+ n, err := opts.Reader.Read(make([]byte, 1))
+ if n > 0 {
+ return resp, httpserver.ErrorWithStatus(err, http.StatusRequestEntityTooLarge)
+ }
+ if err != io.EOF {
+ return resp, err
+ }
+ }
+ opts.Data = buf[:n]
+ if opts.DataSize != 0 && int(n) != opts.DataSize {
+ return resp, httpserver.ErrorWithStatus(fmt.Errorf("content length %d did not match specified data size %d", n, opts.DataSize), http.StatusBadRequest)
+ }
+ hash = fmt.Sprintf("%x", h.Sum(nil))
+ } else {
+ hash = fmt.Sprintf("%x", md5.Sum(opts.Data))
+ }
+ if opts.Hash != "" && !strings.HasPrefix(opts.Hash, hash) {
+ return resp, httpserver.ErrorWithStatus(fmt.Errorf("content hash %s did not match specified locator %s", hash, opts.Hash), http.StatusBadRequest)
+ }
+ rvzmounts := ks.rendezvous(hash, ks.mountsW)
+ result := newPutProgress(opts.StorageClasses)
+ for _, mnt := range rvzmounts {
+ if !result.Want(mnt) {
+ continue
+ }
+ cmp := &checkEqual{Expect: opts.Data}
+ if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
+ if !cmp.Equal() {
+ return resp, errCollision
+ }
+ err := mnt.BlockTouch(hash)
+ if err == nil {
+ result.Add(mnt)
+ }
+ }
+ }
+ var allFull atomic.Bool
+ allFull.Store(true)
+ // pending tracks what result will be if all outstanding
+ // writes succeed.
+ pending := result.Copy()
+ cond := sync.NewCond(new(sync.Mutex))
+ cond.L.Lock()
+ var wg sync.WaitGroup
+nextmnt:
+ for _, mnt := range rvzmounts {
+ for {
+ if result.Done() || ctx.Err() != nil {
+ break nextmnt
+ }
+ if !result.Want(mnt) {
+ continue nextmnt
+ }
+ if pending.Want(mnt) {
+ break
+ }
+ // This mount might not be needed, depending
+ // on the outcome of pending writes. Wait for
+ // a pending write to finish, then check
+ // again.
+ cond.Wait()
+ }
+ mnt := mnt
+ logger := ks.logger.WithField("mount", mnt.UUID)
+ pending.Add(mnt)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ logger.Debug("start write")
+ err := mnt.BlockWrite(ctx, hash, opts.Data)
+ cond.L.Lock()
+ defer cond.L.Unlock()
+ defer cond.Broadcast()
+ if err != nil {
+ logger.Debug("write failed")
+ pending.Sub(mnt)
+ if err != errFull {
+ allFull.Store(false)
+ }
+ } else {
+ result.Add(mnt)
+ pending.Sub(mnt)
+ }
+ }()
+ }
+ cond.L.Unlock()
+ wg.Wait()
+ if ctx.Err() != nil {
+ return resp, ctx.Err()
+ }
+ if result.Done() || result.totalReplication > 0 {
+ resp = arvados.BlockWriteResponse{
+ Locator: ks.signLocator(ctxToken(ctx), fmt.Sprintf("%s+%d", hash, len(opts.Data))),
+ Replicas: result.totalReplication,
+ StorageClasses: result.classDone,