1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-08 05:47:07 +09:00

GO-3538 image resize: add bufferpool

avoid double resizing
This commit is contained in:
Roman Khafizianov 2024-05-28 15:26:20 +02:00
parent df0b26ef82
commit 6b73d0e839
No known key found for this signature in database
GPG key ID: F07A7D55A2684852
12 changed files with 302 additions and 27 deletions

View file

@ -409,8 +409,7 @@ func (s *service) addFileNode(ctx context.Context, spaceID string, mill m.Mill,
return nil, err
}
// because mill result reader doesn't support seek we need to do the mill again
res, err = mill.Mill(conf.Reader, conf.Name)
_, err = res.File.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}
@ -465,6 +464,10 @@ func (s *service) addFileNode(ctx context.Context, spaceID string, mill m.Mill,
fileInfo.MetaHash = metaNode.Cid().String()
pairNode, err := s.addFilePairNode(ctx, spaceID, fileInfo)
err = res.File.Close()
if err != nil {
log.Warnf("failed to close file: %s", err)
}
if err != nil {
return nil, fmt.Errorf("add file pair node: %w", err)
}

View file

@ -181,6 +181,7 @@ func (mw *Middleware) ObjectSearchWithMeta(cctx context.Context, req *pb.RpcObje
rec.Details = pbtypes.StructFilterKeys(rec.Details, req.Keys)
}
resultsModels = append(resultsModels, &model.SearchResult{
ObjectId: pbtypes.GetString(rec.Details, database.RecordIDField),
Details: rec.Details,
Meta: []*model.SearchMeta{&(results[i].Meta)},

View file

@ -23,5 +23,5 @@ func (m *Blob) Options(add map[string]interface{}) (string, error) {
}
func (m *Blob) Mill(r io.ReadSeeker, name string) (*Result, error) {
return &Result{File: r}, nil
return &Result{File: noopCloser(r)}, nil
}

View file

@ -144,5 +144,5 @@ func (m *ImageExif) Mill(r io.ReadSeeker, name string) (*Result, error) {
return nil, err
}
return &Result{File: bytes.NewReader(b)}, nil
return &Result{File: noopCloser(bytes.NewReader(b))}, nil
}

View file

@ -1,7 +1,6 @@
package mill
import (
"bytes"
"errors"
"fmt"
"image"
@ -105,7 +104,6 @@ func (m *ImageResize) Mill(r io.ReadSeeker, name string) (*Result, error) {
return nil, err
}
format := Format(formatStr)
_, err = r.Seek(0, io.SeekStart)
if err != nil {
return nil, err
@ -179,7 +177,7 @@ func (m *ImageResize) resizeJPEG(imgConfig *image.Config, r io.ReadSeeker) (*Res
}
if orientation <= 1 && width == imgConfig.Width {
var r2 io.Reader
var r2 io.ReadSeekCloser
r2, err = patchReaderRemoveExif(r)
if err != nil {
return nil, err
@ -187,7 +185,7 @@ func (m *ImageResize) resizeJPEG(imgConfig *image.Config, r io.ReadSeeker) (*Res
// here is an optimization
// lets return the original picture in case it has not been resized or normalized
return &Result{
File: r2,
File: noopCloser(r2),
Meta: map[string]interface{}{
"width": imgConfig.Width,
"height": imgConfig.Height,
@ -204,13 +202,21 @@ func (m *ImageResize) resizeJPEG(imgConfig *image.Config, r io.ReadSeeker) (*Res
resized := imaging.Resize(img, width, 0, imaging.Lanczos)
width, height = resized.Rect.Max.X, resized.Rect.Max.Y
buff := &bytes.Buffer{}
buff := pool.Get()
defer func() {
_ = buff.Close()
}()
if err = jpeg.Encode(buff, resized, &jpeg.Options{Quality: quality}); err != nil {
return nil, err
}
readCloser, err := buff.GetReadSeekCloser()
if err != nil {
return nil, err
}
return &Result{
File: buff,
File: readCloser,
Meta: map[string]interface{}{
"width": width,
"height": height,
@ -234,7 +240,7 @@ func (m *ImageResize) resizePNG(imgConfig *image.Config, r io.ReadSeeker) (*Resu
// here is an optimization
// lets return the original picture in case it has not been resized or normalized
return &Result{
File: r,
File: noopCloser(r),
Meta: map[string]interface{}{
"width": imgConfig.Width,
"height": imgConfig.Height,
@ -250,13 +256,20 @@ func (m *ImageResize) resizePNG(imgConfig *image.Config, r io.ReadSeeker) (*Resu
resized := imaging.Resize(img, width, 0, imaging.Lanczos)
width, height = resized.Rect.Max.X, resized.Rect.Max.Y
buff := &bytes.Buffer{}
if err = png.Encode(buff, resized); err != nil {
buf := pool.Get()
defer func() {
_ = buf.Close()
}()
if err = png.Encode(buf, resized); err != nil {
return nil, err
}
readSeekCloser, err := buf.GetReadSeekCloser()
if err != nil {
return nil, err
}
return &Result{
File: buff,
File: readSeekCloser,
Meta: map[string]interface{}{
"width": width,
"height": height,
@ -279,7 +292,7 @@ func (m *ImageResize) resizeGIF(imgConfig *image.Config, r io.ReadSeeker) (*Resu
// here is an optimization
// lets return the original picture in case it has not been resized or normalized
return &Result{
File: r,
File: noopCloser(r),
Meta: map[string]interface{}{
"width": imgConfig.Width,
"height": imgConfig.Height,
@ -302,13 +315,20 @@ func (m *ImageResize) resizeGIF(imgConfig *image.Config, r io.ReadSeeker) (*Resu
}
gifImg.Config.Width, gifImg.Config.Height = gifImg.Image[0].Bounds().Dx(), gifImg.Image[0].Bounds().Dy()
buff := bytes.NewBuffer(make([]byte, 0))
if err = gif.EncodeAll(buff, gifImg); err != nil {
buf := pool.Get()
defer func() {
_ = buf.Close()
}()
if err = gif.EncodeAll(buf, gifImg); err != nil {
return nil, err
}
readSeekCloser, err := buf.GetReadSeekCloser()
if err != nil {
return nil, err
}
return &Result{
File: buff,
File: readSeekCloser,
Meta: map[string]interface{}{
"width": gifImg.Config.Width,
"height": gifImg.Config.Height,
@ -379,7 +399,7 @@ func imageToPaletted(img image.Image) *image.Paletted {
return pm
}
func patchReaderRemoveExif(r io.ReadSeeker) (io.Reader, error) {
func patchReaderRemoveExif(r io.ReadSeeker) (io.ReadSeekCloser, error) {
jmp := jpegstructure.NewJpegMediaParser()
size, err := r.Seek(0, io.SeekEnd)
if err != nil {
@ -387,7 +407,10 @@ func patchReaderRemoveExif(r io.ReadSeeker) (io.Reader, error) {
}
_, _ = r.Seek(0, io.SeekStart)
buff := bytes.NewBuffer(make([]byte, 0, size))
buff := pool.Get()
defer func() {
_ = buff.Close()
}()
intfc, err := jmp.Parse(r, int(size))
if err != nil {
return nil, fmt.Errorf("failed to open file to read exif: %w", err)
@ -404,5 +427,5 @@ func patchReaderRemoveExif(r io.ReadSeeker) (io.Reader, error) {
return nil, err
}
return buff, nil
return buff.GetReadSeekCloser()
}

View file

@ -3,7 +3,6 @@
package mill
import (
"bytes"
"fmt"
"image"
"io"
@ -34,7 +33,7 @@ func (m *ImageResize) resizeWEBP(imgConfig *image.Config, r io.ReadSeeker) (*Res
// here is an optimization
// lets return the original picture in case it has not been resized or normalized
return &Result{
File: r,
File: noopCloser(r),
Meta: map[string]interface{}{
"width": imgConfig.Width,
"height": imgConfig.Height,
@ -50,13 +49,21 @@ func (m *ImageResize) resizeWEBP(imgConfig *image.Config, r io.ReadSeeker) (*Res
resized := imaging.Resize(img, width, 0, imaging.Lanczos)
width, height = resized.Rect.Max.X, resized.Rect.Max.Y
buff := &bytes.Buffer{}
if webp.Encode(buff, resized, &webp.Options{Quality: float32(quality)}) != nil {
buf := pool.Get()
defer func() {
_ = buf.Close()
}()
if webp.Encode(buf, resized, &webp.Options{Quality: float32(quality)}) != nil {
return nil, err
}
readSeekCloser, err := buf.GetReadSeekCloser()
if err != nil {
return nil, err
}
return &Result{
File: buff,
File: readSeekCloser,
Meta: map[string]interface{}{
"width": width,
"height": height,

View file

@ -9,14 +9,17 @@ import (
"github.com/mr-tron/base58/base58"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/util/bufferpool"
)
var log = logging.Logger("tex-mill")
var pool = bufferpool.NewPool()
var ErrMediaTypeNotSupported = fmt.Errorf("media type not supported")
type Result struct {
File io.Reader
File io.ReadSeekCloser
Meta map[string]interface{}
}

View file

@ -0,0 +1,15 @@
package mill
import "io"
type noopCloserWrapper struct {
io.ReadSeeker
}
func (n *noopCloserWrapper) Close() error {
return nil
}
func noopCloser(r io.ReadSeeker) io.ReadSeekCloser {
return &noopCloserWrapper{r}
}

58
util/bufferpool/buffer.go Normal file
View file

@ -0,0 +1,58 @@
package bufferpool
import (
"bytes"
"io"
"sync"
)
type Buffer interface {
io.Writer
io.Closer
GetReadSeekCloser() (io.ReadSeekCloser, error)
}
type buffer struct {
*bytes.Buffer
buf []byte
pool *sync.Pool
m sync.Mutex
closed bool
}
// GetReadSeekCloser returns a ReadSeekCloser that reads from the buffer.
// GetReadSeekCloser after Close will return EOF.
// It's a responsibility of the caller to Close the ReadSeekCloser to put the buffer back into the pool.
func (b *buffer) GetReadSeekCloser() (io.ReadSeekCloser, error) {
b.m.Lock()
defer b.m.Unlock()
if !b.closed {
b.closed = true
return newPoolReadSeekCloser(b.Buffer.Bytes(), b.pool), nil
}
return nil, io.EOF
}
// Close puts the buffer back into the pool.
// Close after GetReadSeekCloser does nothing.
func (b *buffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
if b.closed {
return 0, io.EOF
}
return b.Buffer.Write(p)
}
// Close puts the buffer back into the pool.
// Close after GetReadSeekCloser does nothing.
func (b *buffer) Close() error {
b.m.Lock()
defer b.m.Unlock()
if !b.closed {
b.pool.Put(b.buf)
b.closed = true
}
return nil
}

34
util/bufferpool/pool.go Normal file
View file

@ -0,0 +1,34 @@
package bufferpool
import (
"bytes"
"sync"
)
type Pool interface {
Get() Buffer
}
func NewPool() Pool {
return &bufferPoolWrapper{pool: &sync.Pool{
New: func() interface{} {
return []byte{}
},
}}
}
type bufferPoolWrapper struct {
pool *sync.Pool
}
func (bp *bufferPoolWrapper) Get() Buffer {
b := bp.pool.Get().([]byte)
buff := &buffer{
Buffer: bytes.NewBuffer(b[:0]),
buf: b,
pool: bp.pool,
}
return buff
}

View file

@ -0,0 +1,84 @@
package bufferpool
import (
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewPool(t *testing.T) {
pool := NewPool()
require.NotNil(t, pool, "NewPool should not return nil")
}
func TestBuffer_Write(t *testing.T) {
pool := NewPool()
buf := pool.Get()
data := []byte("Hello, World!")
n, err := buf.Write(data)
require.NoError(t, err, "Write should not return an error")
assert.Equal(t, len(data), n, "Write should return the number of bytes written")
err = buf.Close()
require.NoError(t, err, "Close should not return an error")
}
func TestBuffer_Close(t *testing.T) {
pool := NewPool()
buf := pool.Get()
err := buf.Close()
require.NoError(t, err, "Close should not return an error")
n, err := buf.Write([]byte("Hello, World!"))
assert.ErrorIs(t, err, io.EOF, "Read after Close should return an error")
require.Zero(t, n, "Write after Close should not write any bytes")
}
func TestBuffer_GetReadSeekCloser(t *testing.T) {
pool := NewPool()
buf := pool.Get()
data := []byte("Hello, World!")
_, err := buf.Write(data)
require.NoError(t, err, "Write should not return an error")
rsc, err := buf.GetReadSeekCloser()
require.NoError(t, err, "GetReadSeekCloser should not return an error")
assert.NotNil(t, rsc, "GetReadSeekCloser should not return nil")
readData := make([]byte, len(data))
readData2 := make([]byte, len(data))
n, err := rsc.Read(readData)
require.NoError(t, err, "Read should not return an error")
assert.Equal(t, len(data), n, "Read should return the number of bytes read")
assert.Equal(t, data, readData, "Read data should match written data")
n2, err := rsc.Seek(0, io.SeekStart)
require.NoError(t, err, "Seek should not return an error")
assert.Equal(t, int64(0), n2, "Seek should return the new offset")
_, err = rsc.Read(readData2)
require.NoError(t, err, "Read after seek should not return an error")
assert.Equal(t, data, readData2, "Read data after seek should match written data")
err = rsc.Close()
require.NoError(t, err, "Close should not return an error")
_, err = rsc.Read(readData)
assert.Error(t, err, "Read after Close should return an error")
// take the existing buffer from the pool
buf = pool.Get()
// check underlying buffer is returned to the pool
assert.GreaterOrEqual(t, cap(buf.(*buffer).buf), 13, "we should get the same buffer from the pool")
assert.GreaterOrEqual(t, buf.(*buffer).Buffer.Cap(), 13, "we should get the same buffer from the pool")
assert.Equalf(t, 0, len(buf.(*buffer).Buffer.Bytes()), "we should get the reseted buffer from the pool")
assert.Equal(t, []byte("Hello, World!"), buf.(*buffer).buf[0:13])
assert.Equal(t, []byte("Hello, World!"), buf.(*buffer).Buffer.Bytes()[0:13])
err = rsc.Close()
require.NoError(t, err, "Close after Close should not return an error")
}

47
util/bufferpool/reader.go Normal file
View file

@ -0,0 +1,47 @@
package bufferpool
import (
"bytes"
"io"
"sync"
)
// poolReadSeekCloser is a custom type that wraps a byte slice and a sync.Pool.
type poolReadSeekCloser struct {
*bytes.Reader
buf []byte
pool *sync.Pool
m sync.RWMutex
closed bool
}
// NewPoolReadSeekCloser creates a new poolReadSeekCloser.
func newPoolReadSeekCloser(buf []byte, pool *sync.Pool) io.ReadSeekCloser {
return &poolReadSeekCloser{
Reader: bytes.NewReader(buf),
buf: buf,
pool: pool,
}
}
// Close puts the buffer back into the pool.
func (prsc *poolReadSeekCloser) Close() error {
prsc.m.Lock()
defer prsc.m.Unlock()
if prsc.closed {
return nil
}
prsc.closed = true
prsc.pool.Put(prsc.buf)
return nil
}
func (prsc *poolReadSeekCloser) Read(p []byte) (n int, err error) {
prsc.m.RLock()
defer prsc.m.RUnlock()
if prsc.closed {
return 0, io.EOF
}
return prsc.Reader.Read(p)
}