mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
Fixes for message exchange
This commit is contained in:
parent
c211ccf26e
commit
a89b0e5012
3 changed files with 16 additions and 10 deletions
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
anystore "github.com/anyproto/any-store"
|
||||
"github.com/anyproto/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
|
@ -156,7 +157,7 @@ func (s *storage) Set(ctx context.Context, key string, value []byte) error {
|
|||
AclId: headId,
|
||||
ReadKeyId: readKeyId,
|
||||
Value: innerstorage.Value{
|
||||
Value: value,
|
||||
Value: innerBytes,
|
||||
PeerSignature: peerSig,
|
||||
IdentitySignature: identitySig,
|
||||
},
|
||||
|
@ -182,7 +183,7 @@ func (s *storage) SetRaw(ctx context.Context, keyValue ...*spacesyncproto.StoreK
|
|||
}
|
||||
s.mx.Lock()
|
||||
defer s.mx.Unlock()
|
||||
keyValues := make([]innerstorage.KeyValue, len(keyValue))
|
||||
keyValues := make([]innerstorage.KeyValue, 0, len(keyValue))
|
||||
for _, kv := range keyValue {
|
||||
innerKv, err := innerstorage.KeyValueFromProto(kv, true)
|
||||
if err != nil {
|
||||
|
@ -197,10 +198,10 @@ func (s *storage) SetRaw(ctx context.Context, keyValue ...*spacesyncproto.StoreK
|
|||
s.aclList.RUnlock()
|
||||
return err
|
||||
}
|
||||
for _, kv := range keyValues {
|
||||
kv.ReadKeyId, err = state.ReadKeyForAclId(kv.AclId)
|
||||
for i := range keyValues {
|
||||
keyValues[i].ReadKeyId, err = state.ReadKeyForAclId(keyValues[i].AclId)
|
||||
if err != nil {
|
||||
kv.KeyPeerId = ""
|
||||
keyValues[i].KeyPeerId = ""
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -274,7 +275,12 @@ func (s *storage) decrypt(kv innerstorage.KeyValue) (value []byte, err error) {
|
|||
if key == nil {
|
||||
return nil, fmt.Errorf("no read key for %s", kv.ReadKeyId)
|
||||
}
|
||||
value, err = key.Decrypt(kv.Value.Value)
|
||||
msg := &spacesyncproto.StoreKeyInner{}
|
||||
err = proto.Unmarshal(kv.Value.Value, msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
value, err = key.Decrypt(msg.Value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -68,6 +68,10 @@ func (s *syncClient) Broadcast(ctx context.Context, objectId string, keyValue ..
|
|||
inner := &innerUpdate{
|
||||
keyValues: keyValue,
|
||||
}
|
||||
err := inner.Prepare()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
headUpdate := &objectmessages.HeadUpdate{
|
||||
Meta: objectmessages.ObjectMeta{
|
||||
ObjectId: objectId,
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"github.com/anyproto/any-sync/app/logger"
|
||||
"github.com/anyproto/any-sync/commonspace/peermanager"
|
||||
"github.com/anyproto/any-sync/commonspace/spacestate"
|
||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
||||
"github.com/anyproto/any-sync/metric"
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
|
@ -118,9 +117,6 @@ func (s *syncService) HandleMessage(ctx context.Context, msg drpc.Message) error
|
|||
if !ok {
|
||||
return ErrUnexpectedMessage
|
||||
}
|
||||
if idMsg.ObjectType() == spacesyncproto.ObjectType_KeyValue {
|
||||
return nil
|
||||
}
|
||||
objectId := idMsg.ObjectId()
|
||||
err := s.receiveQueue.Add(ctx, objectId, msgCtx{
|
||||
ctx: ctx,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue