diff --git a/commonspace/object/tree/synctree/treeremotegetter.go b/commonspace/object/tree/synctree/treeremotegetter.go index 3b50a64c..7a450d8f 100644 --- a/commonspace/object/tree/synctree/treeremotegetter.go +++ b/commonspace/object/tree/synctree/treeremotegetter.go @@ -72,6 +72,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage. if err == nil || !errors.Is(err, treestorage.ErrUnknownTreeId) { return } + storageErr := err status, err := t.deps.SpaceStorage.TreeDeletedStatus(t.treeId) if err != nil { @@ -84,6 +85,9 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage. collector, peerId, err := t.treeRequestLoop(ctx) if err != nil { + if errors.Is(err, peer.ErrPeerIdNotFoundInContext) { + err = storageErr + } return } diff --git a/go.mod b/go.mod index a5a35c52..ab87a580 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/anyproto/go-chash v0.1.0 github.com/anyproto/go-slip10 v1.0.0 github.com/anyproto/go-slip21 v1.0.0 - github.com/anyproto/protobuf v1.3.3-0.20240801222536-5596d71a739e + github.com/anyproto/protobuf v1.3.3-0.20240814124528-72b8c7e0e0f5 github.com/btcsuite/btcd v0.22.1 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/cespare/xxhash v1.1.0 diff --git a/go.sum b/go.sum index b7b64c36..9661b400 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ github.com/anyproto/go-slip21 v1.0.0 h1:CI7lUqTIwmPOEGVAj4jyNLoICvueh++0U2HoAi3m github.com/anyproto/go-slip21 v1.0.0/go.mod h1:gbIJt7HAdr5DuT4f2pFTKCBSUWYsm/fysHBNqgsuxT0= github.com/anyproto/protobuf v1.3.3-0.20240801222536-5596d71a739e h1:BitG6mK/JctFt57+AJO9A6tF06g8Pun/OUjODB/v8z4= github.com/anyproto/protobuf v1.3.3-0.20240801222536-5596d71a739e/go.mod h1:5+PHE01DgsDPkralb8MYmGg2sPQahsqEJ9ue7ciDHKg= +github.com/anyproto/protobuf v1.3.3-0.20240814124528-72b8c7e0e0f5 h1:aY7tBzQ+z8Hr/v8vOL4/EaKwPZx+J/ClZ1WuD6sqPvE= +github.com/anyproto/protobuf v1.3.3-0.20240814124528-72b8c7e0e0f5/go.mod h1:5+PHE01DgsDPkralb8MYmGg2sPQahsqEJ9ue7ciDHKg= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/net/streampool/encoding_test.go b/net/streampool/encoding_test.go index 9bcf4579..694aae17 100644 --- a/net/streampool/encoding_test.go +++ b/net/streampool/encoding_test.go @@ -1,10 +1,13 @@ package streampool import ( - "github.com/anyproto/any-sync/net/streampool/testservice" + "crypto/rand" + "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" + + "github.com/anyproto/any-sync/net/streampool/testservice" ) func TestProtoEncoding(t *testing.T) { @@ -14,11 +17,29 @@ func TestProtoEncoding(t *testing.T) { err = EncodingProto.Unmarshal(nil, "sss") assert.Error(t, err) }) - t.Run("encode", func(t *testing.T) { + t.Run("encode marshal", func(t *testing.T) { data, err := EncodingProto.Marshal(&testservice.StreamMessage{ReqData: "1"}) require.NoError(t, err) msg := &testservice.StreamMessage{} require.NoError(t, EncodingProto.Unmarshal(data, msg)) assert.Equal(t, "1", msg.ReqData) }) + t.Run("encode marshal append empty buf", func(t *testing.T) { + data, err := EncodingProto.(protoEncoding).MarshalAppend(nil, &testservice.StreamMessage{ReqData: "1"}) + require.NoError(t, err) + msg := &testservice.StreamMessage{} + require.NoError(t, EncodingProto.Unmarshal(data, msg)) + assert.Equal(t, "1", msg.ReqData) + }) + t.Run("encode marshal append non-empty buf", func(t *testing.T) { + buf := make([]byte, 150) + _, err := rand.Read(buf) + require.NoError(t, err) + data, err := EncodingProto.(protoEncoding).MarshalAppend(buf, &testservice.StreamMessage{ReqData: "1"}) + require.NoError(t, err) + msg := &testservice.StreamMessage{} + require.NoError(t, EncodingProto.Unmarshal(data[150:], msg)) + require.Equal(t, buf, data[:150]) + assert.Equal(t, "1", msg.ReqData) + }) } diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go index 8c6b65ce..2b17983b 100644 --- a/net/streampool/streampool_test.go +++ b/net/streampool/streampool_test.go @@ -13,9 +13,11 @@ import ( "golang.org/x/net/context" "storj.io/drpc" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/debugstat" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/rpc/rpctest" + "github.com/anyproto/any-sync/net/streampool/streamhandler" "github.com/anyproto/any-sync/net/streampool/testservice" ) @@ -210,13 +212,16 @@ func newFixture(t *testing.T) *fixture { fx.tsh = &testServerHandler{receiveCh: make(chan *testservice.StreamMessage, 100)} require.NoError(t, testservice.DRPCRegisterTest(fx.ts, fx.tsh)) fx.th = &testHandler{} - s := New() - s.(*service).debugStat = debugstat.NewNoOp() - fx.StreamPool = s.NewStreamPool(fx.th, StreamConfig{ + s := New().(*streamPool) + s.handler = fx.th + s.statService = debugstat.NewNoOp() + s.streamConfig = StreamConfig{ SendQueueSize: 10, DialQueueWorkers: 1, DialQueueSize: 10, - }) + } + fx.StreamPool = s + require.NoError(t, fx.StreamPool.Run(context.Background())) return fx } @@ -228,7 +233,7 @@ type fixture struct { } func (fx *fixture) Finish(t *testing.T) { - require.NoError(t, fx.Close()) + require.NoError(t, fx.Close(context.Background())) } type testHandler struct { @@ -237,6 +242,14 @@ type testHandler struct { mu sync.Mutex } +func (t *testHandler) Init(a *app.App) (err error) { + return nil +} + +func (t *testHandler) Name() (name string) { + return streamhandler.CName +} + func (t *testHandler) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) { if t.streamOpenDelay > 0 { time.Sleep(t.streamOpenDelay) diff --git a/net/streampool/testservice/testservice.pb.go b/net/streampool/testservice/testservice.pb.go index 498700f3..96128ff2 100644 --- a/net/streampool/testservice/testservice.pb.go +++ b/net/streampool/testservice/testservice.pb.go @@ -5,7 +5,7 @@ package testservice import ( fmt "fmt" - proto "github.com/gogo/protobuf/proto" + proto "github.com/anyproto/protobuf/proto" io "io" math "math" math_bits "math/bits" @@ -47,6 +47,14 @@ func (m *StreamMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error return b[:n], nil } } +func (m *StreamMessage) XXX_MarshalAppend(b []byte, newLen int) ([]byte, error) { + b = b[:newLen] + _, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b, nil +} func (m *StreamMessage) XXX_Merge(src proto.Message) { xxx_messageInfo_StreamMessage.Merge(m, src) } diff --git a/net/streampool/testservice/testservice_drpc.pb.go b/net/streampool/testservice/testservice_drpc.pb.go index 61586be4..84e1feff 100644 --- a/net/streampool/testservice/testservice_drpc.pb.go +++ b/net/streampool/testservice/testservice_drpc.pb.go @@ -8,8 +8,8 @@ import ( bytes "bytes" context "context" errors "errors" - jsonpb "github.com/gogo/protobuf/jsonpb" - proto "github.com/gogo/protobuf/proto" + jsonpb "github.com/anyproto/protobuf/jsonpb" + proto "github.com/anyproto/protobuf/proto" drpc "storj.io/drpc" drpcerr "storj.io/drpc/drpcerr" )