mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
add requestId to separate request/reply flows
This commit is contained in:
parent
a2765a5233
commit
f95123fc43
4 changed files with 132 additions and 80 deletions
|
@ -59,13 +59,13 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
|
|||
ctx, cancel = context.WithTimeout(ctx, time.Second*10)
|
||||
defer cancel()
|
||||
newCounter := s.counter.Add(1)
|
||||
msg.ReplyId = genReplyKey(peerId, msg.ObjectId, newCounter)
|
||||
log.InfoCtx(ctx, "mpool sendSync", zap.String("replyId", msg.ReplyId))
|
||||
msg.RequestId = genReplyKey(peerId, msg.ObjectId, newCounter)
|
||||
log.InfoCtx(ctx, "mpool sendSync", zap.String("requestId", msg.RequestId))
|
||||
s.waitersMx.Lock()
|
||||
waiter := responseWaiter{
|
||||
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
|
||||
}
|
||||
s.waiters[msg.ReplyId] = waiter
|
||||
s.waiters[msg.RequestId] = waiter
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
err = s.SendPeer(ctx, peerId, msg)
|
||||
|
@ -75,10 +75,10 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
s.waitersMx.Lock()
|
||||
delete(s.waiters, msg.ReplyId)
|
||||
delete(s.waiters, msg.RequestId)
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
log.With(zap.String("replyId", msg.ReplyId)).WarnCtx(ctx, "time elapsed when waiting")
|
||||
log.With(zap.String("requestId", msg.RequestId)).WarnCtx(ctx, "time elapsed when waiting")
|
||||
err = fmt.Errorf("sendSync context error: %v", ctx.Err())
|
||||
case reply = <-waiter.ch:
|
||||
// success
|
||||
|
@ -108,7 +108,7 @@ func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *s
|
|||
if s.stopWaiter(msg) {
|
||||
return
|
||||
}
|
||||
log.DebugCtx(ctx, "reply id does not exist", zap.String("replyId", msg.ReplyId))
|
||||
log.WarnCtx(ctx, "reply id does not exist", zap.String("replyId", msg.ReplyId))
|
||||
}
|
||||
return s.messageHandler(ctx, senderId, msg)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue