diff --git a/datatype/imageblk/imageblk.go b/datatype/imageblk/imageblk.go index fe6e01f7..791a230b 100644 --- a/datatype/imageblk/imageblk.go +++ b/datatype/imageblk/imageblk.go @@ -1902,6 +1902,70 @@ func (d *Data) SendSerializedBlock(w http.ResponseWriter, x, y, z int32, v []byt return nil } +func (d *Data) SendGridBlocks(w http.ResponseWriter, blockCoords []dvid.ChunkPoint3d, gridStore storage.GridStoreGetter, compression string) error { + // Setup a single goroutine for writing to http.ResponseWriter since only one writer at a time is allowed. + writeCh := make(chan *storage.Block, 1000) + var writeWg sync.WaitGroup + writeWg.Add(1) + go func() { + defer writeWg.Done() + for b := range writeCh { + if err := d.SendUnserializedBlock(w, b.Coord[0], b.Coord[1], b.Coord[2], b.Value, compression); err != nil { + dvid.Errorf("Error writing block %s to http.ResponseWriter: %v\n", b.Coord, err) + return + } + } + }() + + // Retrieve blocks in parallel + for _, blockCoord := range blockCoords { + value, err := gridStore.GridGet(d.ScaleLevel, blockCoord) + if err != nil { + dvid.Infof("gridStore GET on scale %d, chunk %s had err: %v", d.ScaleLevel, blockCoord, err) + return err + } + if value == nil { + dvid.Infof("gridStore GET on scale %d, chunk %s had nil value\n", d.ScaleLevel, blockCoord) + return nil + } + writeCh <- &storage.Block{Coord: blockCoord, Value: value} + } + writeWg.Wait() + return nil +} + +// SendGridVolume writes blocks to the writer with given compression. +func (d *Data) SendGridVolume(w http.ResponseWriter, subvol *dvid.Subvolume, gridStore storage.GridStoreGetter, compression string) error { + // Setup a single goroutine for writing to http.ResponseWriter since only one writer at a time is allowed. + writeCh := make(chan *storage.Block, 1000) + var writeWg sync.WaitGroup + writeWg.Add(1) + go func() { + defer writeWg.Done() + for b := range writeCh { + if err := d.SendUnserializedBlock(w, b.Coord[0], b.Coord[1], b.Coord[2], b.Value, compression); err != nil { + dvid.Errorf("Error writing block %s to http.ResponseWriter: %v\n", b.Coord, err) + return + } + } + }() + + // Retrieve blocks in parallel + ordered := false + minBlock, maxBlock, err := subvol.BoundingChunks(d.BlockSize()) + if err != nil { + return err + } + err = gridStore.GridGetVolume(d.ScaleLevel, minBlock, maxBlock, ordered, func(b *storage.Block) error { + if b.Value != nil { + writeCh <- b + } + return nil + }) + writeWg.Wait() + return err +} + // SendBlocksSpecific writes data to the blocks specified -- best for non-ordered backend func (d *Data) SendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWriter, compression string, blockstring string, isprefetch bool) (numBlocks int, err error) { w.Header().Set("Content-type", "application/octet-stream") @@ -1913,7 +1977,7 @@ func (d *Data) SendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr timedLog := dvid.NewTimeLog() defer timedLog.Infof("SendBlocks Specific ") - // extract query string + // extract query string into block coordinates if blockstring == "" { return } @@ -1923,10 +1987,26 @@ func (d *Data) SendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr return } numBlocks = len(coordarray) / 3 + blockCoords := make([]dvid.ChunkPoint3d, numBlocks) + for i := 0; i < len(coordarray); i += 3 { + var xloc, yloc, zloc int + xloc, err = strconv.Atoi(coordarray[i]) + if err != nil { + return + } + yloc, err = strconv.Atoi(coordarray[i+1]) + if err != nil { + return + } + zloc, err = strconv.Atoi(coordarray[i+2]) + if err != nil { + return + } + blockCoords[i/3] = dvid.ChunkPoint3d{int32(xloc), int32(yloc), int32(zloc)} + } // make a finished queue finishedRequests := make(chan error, len(coordarray)/3) - var mutex sync.Mutex // get store for data var gridStore storage.GridStoreGetter @@ -1935,8 +2015,13 @@ func (d *Data) SendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr if err != nil { return } + if gridStore != nil { + err = d.SendGridBlocks(w, blockCoords, gridStore, compression) + return + } // iterate through each block and query + mutex := &sync.Mutex{} for i := 0; i < len(coordarray); i += 3 { var xloc, yloc, zloc int xloc, err = strconv.Atoi(coordarray[i]) @@ -1962,22 +2047,6 @@ func (d *Data) SendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr chunkPt := dvid.ChunkPoint3d{xloc, yloc, zloc} var value []byte - if gridStore != nil { - value, err = gridStore.GridGet(d.ScaleLevel, chunkPt) - if err != nil { - dvid.Infof("gridStore GET on scale %d, chunk %s had err: %v", d.ScaleLevel, chunkPt, err) - err = nil - return - } - if value == nil { - dvid.Infof("gridStore GET on scale %d, chunk %s had nil value\n", d.ScaleLevel, chunkPt) - return - } - mutex.Lock() - defer mutex.Unlock() - d.SendUnserializedBlock(w, xloc, yloc, zloc, value, compression) - return - } idx := dvid.IndexZYX(chunkPt) key := NewTKey(&idx) value, err = kvDB.Get(ctx, key) @@ -2065,19 +2134,7 @@ func (d *Data) SendBlocks(ctx *datastore.VersionedCtx, w http.ResponseWriter, su } if gridStore != nil { - ordered := false - minBlock, maxBlock, err := subvol.BoundingChunks(d.BlockSize()) - if err != nil { - return err - } - return gridStore.GridGetVolume(d.ScaleLevel, minBlock, maxBlock, ordered, &storage.BlockOp{}, func(b *storage.Block) error { - if b.Value != nil { - if err := d.SendUnserializedBlock(w, b.Coord[0], b.Coord[1], b.Coord[2], b.Value, compression); err != nil { - return err - } - } - return nil - }) + return d.SendGridVolume(w, subvol, gridStore, compression) } okv := okvDB.(storage.BufferableOps) @@ -2124,7 +2181,7 @@ func (d *Data) SendBlocks(ctx *datastore.VersionedCtx, w http.ResponseWriter, su }) if err != nil { - return fmt.Errorf("Unable to GET data %s: %v", ctx, err) + return fmt.Errorf("unable to GET data %s: %v", ctx, err) } } else { tkeys := make([]storage.TKey, 0) @@ -2158,7 +2215,7 @@ func (d *Data) SendBlocks(ctx *datastore.VersionedCtx, w http.ResponseWriter, su }) if err != nil { - return fmt.Errorf("Unable to GET data %s: %v", ctx, err) + return fmt.Errorf("unable to GET data %s: %v", ctx, err) } } } @@ -2169,7 +2226,7 @@ func (d *Data) SendBlocks(ctx *datastore.VersionedCtx, w http.ResponseWriter, su err = okv.(storage.RequestBuffer).Flush() if err != nil { - return fmt.Errorf("Unable to GET data %s: %v", ctx, err) + return fmt.Errorf("unable to GET data %s: %v", ctx, err) } } diff --git a/storage/ngprecomputed/ngprecomputed.go b/storage/ngprecomputed/ngprecomputed.go index c5b7b309..971c7372 100644 --- a/storage/ngprecomputed/ngprecomputed.go +++ b/storage/ngprecomputed/ngprecomputed.go @@ -797,19 +797,19 @@ func (ng *ngStore) GridGet(scaleLevel int, blockCoord dvid.ChunkPoint3d) (val [] // GridGetVolume calls the given function with the results of retrieved block data in an ordered or // unordered fashion. Missing blocks in the subvolume are not processed. -func (ng *ngStore) GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPoint3d, ordered bool, op *storage.BlockOp, f storage.BlockFunc) error { +func (ng *ngStore) GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPoint3d, ordered bool, f storage.BlockFunc) error { if ordered { return fmt.Errorf("ordered retrieval not implemented at this time") } - ch := make(chan dvid.ChunkPoint3d) + blockCoordCh := make(chan dvid.ChunkPoint3d) - // Start concurrent processing routines to read each block and then pass it to given function. + // Start concurrent processing routines to read each block and then pass it to a single writer function. concurrency := 10 wg := new(sync.WaitGroup) wg.Add(concurrency) for i := 0; i < concurrency; i++ { go func() { - for blockCoord := range ch { + for blockCoord := range blockCoordCh { val, err := ng.GridGet(scaleLevel, blockCoord) if err != nil { dvid.Errorf("unable to get block %s in GridGetVolume: %v\n", blockCoord, err) @@ -818,11 +818,8 @@ func (ng *ngStore) GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPo if val == nil { continue } - if op != nil && op.Wg != nil { - op.Wg.Add(1) - } block := &storage.Block{ - BlockOp: op, + BlockOp: nil, Coord: blockCoord, Value: val, } @@ -838,12 +835,11 @@ func (ng *ngStore) GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPo for z := minBlock.Value(2); z <= maxBlock.Value(2); z++ { for y := minBlock.Value(1); y <= maxBlock.Value(1); y++ { for x := minBlock.Value(0); x <= maxBlock.Value(0); x++ { - ch <- dvid.ChunkPoint3d{x, y, z} + blockCoordCh <- dvid.ChunkPoint3d{x, y, z} } } } - - close(ch) + close(blockCoordCh) wg.Wait() return nil } diff --git a/storage/storage.go b/storage/storage.go index 5cf99b7b..dc768845 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -324,5 +324,5 @@ type GridProps struct { type GridStoreGetter interface { GridProperties(scaleLevel int) (GridProps, error) GridGet(scaleLevel int, blockCoord dvid.ChunkPoint3d) ([]byte, error) - GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPoint3d, ordered bool, op *BlockOp, f BlockFunc) error + GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPoint3d, ordered bool, f BlockFunc) error }