mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
Fix streampool tests and marshal append
This commit is contained in:
parent
2a3886a508
commit
3393ccf3f4
7 changed files with 60 additions and 12 deletions
|
@ -72,6 +72,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
|
|||
if err == nil || !errors.Is(err, treestorage.ErrUnknownTreeId) {
|
||||
return
|
||||
}
|
||||
storageErr := err
|
||||
|
||||
status, err := t.deps.SpaceStorage.TreeDeletedStatus(t.treeId)
|
||||
if err != nil {
|
||||
|
@ -84,6 +85,9 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
|
|||
|
||||
collector, peerId, err := t.treeRequestLoop(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, peer.ErrPeerIdNotFoundInContext) {
|
||||
err = storageErr
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -7,7 +7,7 @@ require (
|
|||
github.com/anyproto/go-chash v0.1.0
|
||||
github.com/anyproto/go-slip10 v1.0.0
|
||||
github.com/anyproto/go-slip21 v1.0.0
|
||||
github.com/anyproto/protobuf v1.3.3-0.20240801222536-5596d71a739e
|
||||
github.com/anyproto/protobuf v1.3.3-0.20240814124528-72b8c7e0e0f5
|
||||
github.com/btcsuite/btcd v0.22.1
|
||||
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
|
||||
github.com/cespare/xxhash v1.1.0
|
||||
|
|
2
go.sum
2
go.sum
|
@ -14,6 +14,8 @@ github.com/anyproto/go-slip21 v1.0.0 h1:CI7lUqTIwmPOEGVAj4jyNLoICvueh++0U2HoAi3m
|
|||
github.com/anyproto/go-slip21 v1.0.0/go.mod h1:gbIJt7HAdr5DuT4f2pFTKCBSUWYsm/fysHBNqgsuxT0=
|
||||
github.com/anyproto/protobuf v1.3.3-0.20240801222536-5596d71a739e h1:BitG6mK/JctFt57+AJO9A6tF06g8Pun/OUjODB/v8z4=
|
||||
github.com/anyproto/protobuf v1.3.3-0.20240801222536-5596d71a739e/go.mod h1:5+PHE01DgsDPkralb8MYmGg2sPQahsqEJ9ue7ciDHKg=
|
||||
github.com/anyproto/protobuf v1.3.3-0.20240814124528-72b8c7e0e0f5 h1:aY7tBzQ+z8Hr/v8vOL4/EaKwPZx+J/ClZ1WuD6sqPvE=
|
||||
github.com/anyproto/protobuf v1.3.3-0.20240814124528-72b8c7e0e0f5/go.mod h1:5+PHE01DgsDPkralb8MYmGg2sPQahsqEJ9ue7ciDHKg=
|
||||
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
|
||||
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
package streampool
|
||||
|
||||
import (
|
||||
"github.com/anyproto/any-sync/net/streampool/testservice"
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
|
||||
"github.com/anyproto/any-sync/net/streampool/testservice"
|
||||
)
|
||||
|
||||
func TestProtoEncoding(t *testing.T) {
|
||||
|
@ -14,11 +17,29 @@ func TestProtoEncoding(t *testing.T) {
|
|||
err = EncodingProto.Unmarshal(nil, "sss")
|
||||
assert.Error(t, err)
|
||||
})
|
||||
t.Run("encode", func(t *testing.T) {
|
||||
t.Run("encode marshal", func(t *testing.T) {
|
||||
data, err := EncodingProto.Marshal(&testservice.StreamMessage{ReqData: "1"})
|
||||
require.NoError(t, err)
|
||||
msg := &testservice.StreamMessage{}
|
||||
require.NoError(t, EncodingProto.Unmarshal(data, msg))
|
||||
assert.Equal(t, "1", msg.ReqData)
|
||||
})
|
||||
t.Run("encode marshal append empty buf", func(t *testing.T) {
|
||||
data, err := EncodingProto.(protoEncoding).MarshalAppend(nil, &testservice.StreamMessage{ReqData: "1"})
|
||||
require.NoError(t, err)
|
||||
msg := &testservice.StreamMessage{}
|
||||
require.NoError(t, EncodingProto.Unmarshal(data, msg))
|
||||
assert.Equal(t, "1", msg.ReqData)
|
||||
})
|
||||
t.Run("encode marshal append non-empty buf", func(t *testing.T) {
|
||||
buf := make([]byte, 150)
|
||||
_, err := rand.Read(buf)
|
||||
require.NoError(t, err)
|
||||
data, err := EncodingProto.(protoEncoding).MarshalAppend(buf, &testservice.StreamMessage{ReqData: "1"})
|
||||
require.NoError(t, err)
|
||||
msg := &testservice.StreamMessage{}
|
||||
require.NoError(t, EncodingProto.Unmarshal(data[150:], msg))
|
||||
require.Equal(t, buf, data[:150])
|
||||
assert.Equal(t, "1", msg.ReqData)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -13,9 +13,11 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
"storj.io/drpc"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/app/debugstat"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/anyproto/any-sync/net/rpc/rpctest"
|
||||
"github.com/anyproto/any-sync/net/streampool/streamhandler"
|
||||
"github.com/anyproto/any-sync/net/streampool/testservice"
|
||||
)
|
||||
|
||||
|
@ -210,13 +212,16 @@ func newFixture(t *testing.T) *fixture {
|
|||
fx.tsh = &testServerHandler{receiveCh: make(chan *testservice.StreamMessage, 100)}
|
||||
require.NoError(t, testservice.DRPCRegisterTest(fx.ts, fx.tsh))
|
||||
fx.th = &testHandler{}
|
||||
s := New()
|
||||
s.(*service).debugStat = debugstat.NewNoOp()
|
||||
fx.StreamPool = s.NewStreamPool(fx.th, StreamConfig{
|
||||
s := New().(*streamPool)
|
||||
s.handler = fx.th
|
||||
s.statService = debugstat.NewNoOp()
|
||||
s.streamConfig = StreamConfig{
|
||||
SendQueueSize: 10,
|
||||
DialQueueWorkers: 1,
|
||||
DialQueueSize: 10,
|
||||
})
|
||||
}
|
||||
fx.StreamPool = s
|
||||
require.NoError(t, fx.StreamPool.Run(context.Background()))
|
||||
return fx
|
||||
}
|
||||
|
||||
|
@ -228,7 +233,7 @@ type fixture struct {
|
|||
}
|
||||
|
||||
func (fx *fixture) Finish(t *testing.T) {
|
||||
require.NoError(t, fx.Close())
|
||||
require.NoError(t, fx.Close(context.Background()))
|
||||
}
|
||||
|
||||
type testHandler struct {
|
||||
|
@ -237,6 +242,14 @@ type testHandler struct {
|
|||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (t *testHandler) Init(a *app.App) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testHandler) Name() (name string) {
|
||||
return streamhandler.CName
|
||||
}
|
||||
|
||||
func (t *testHandler) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) {
|
||||
if t.streamOpenDelay > 0 {
|
||||
time.Sleep(t.streamOpenDelay)
|
||||
|
|
|
@ -5,7 +5,7 @@ package testservice
|
|||
|
||||
import (
|
||||
fmt "fmt"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
proto "github.com/anyproto/protobuf/proto"
|
||||
io "io"
|
||||
math "math"
|
||||
math_bits "math/bits"
|
||||
|
@ -47,6 +47,14 @@ func (m *StreamMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error
|
|||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *StreamMessage) XXX_MarshalAppend(b []byte, newLen int) ([]byte, error) {
|
||||
b = b[:newLen]
|
||||
_, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
func (m *StreamMessage) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_StreamMessage.Merge(m, src)
|
||||
}
|
||||
|
|
|
@ -8,8 +8,8 @@ import (
|
|||
bytes "bytes"
|
||||
context "context"
|
||||
errors "errors"
|
||||
jsonpb "github.com/gogo/protobuf/jsonpb"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
jsonpb "github.com/anyproto/protobuf/jsonpb"
|
||||
proto "github.com/anyproto/protobuf/proto"
|
||||
drpc "storj.io/drpc"
|
||||
drpcerr "storj.io/drpc/drpcerr"
|
||||
)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue