1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-08 05:47:07 +09:00

GO-4727 fixes

This commit is contained in:
Roman Khafizianov 2025-01-30 19:12:50 +01:00
parent 28946c650f
commit b77014e27b
No known key found for this signature in database
GPG key ID: F07A7D55A2684852
4 changed files with 128 additions and 3 deletions

View file

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"strings"
"time"
anystore "github.com/anyproto/any-store"
"github.com/anyproto/any-store/anyenc"
@ -13,6 +12,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/storestate"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/util/timeid"
)
type ChatHandler struct {
@ -47,7 +47,7 @@ func (d ChatHandler) BeforeCreate(ctx context.Context, ch storestate.ChangeOp) (
} else {
msg.setRead(false)
}
msg.setAddedAt(time.Now().UnixMilli())
msg.setAddedAt(timeid.NewNano())
model := msg.toModel()
model.OrderId = ch.Change.Order
d.subscription.add(model)

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sort"
"time"
@ -205,6 +206,10 @@ func (s *storeObject) markReadMessages(ids []string) {
ctx := txn.Context()
idsModified := make([]string, 0, len(ids))
for _, id := range ids {
if id == s.Id() {
// skip tree root
continue
}
res, err := coll.UpdateId(ctx, id, query.MustParseModifier(`{"$set":{"`+readKey+`":true}}`))
if err != nil {
log.With(zap.Error(err)).With(zap.String("id", id)).With(zap.String("chatId", s.Id())).Error("markReadMessages: update message")
@ -242,7 +247,7 @@ func (s *storeObject) markReadMessages(ids []string) {
newOldestOrderId = val.Value().GetObject(orderKey).Get("id").GetString()
}
}
log.Debug(fmt.Sprintf("markReadMessages: new oldest unread message: %s", s.subscription.chatState.Messages.OldestOrderId))
log.Debug(fmt.Sprintf("markReadMessages: new oldest unread message: %s", newOldestOrderId))
s.subscription.chatState.Messages.OldestOrderId = newOldestOrderId
s.subscription.updateReadStatus(idsModified, true)
s.onUpdate()
@ -271,6 +276,11 @@ func (s *storeObject) GetLastAddedMessageInOrderRange(ctx context.Context, after
return nil, fmt.Errorf("get collection: %w", err)
}
if lastAddedMessageTimestamp < 0 {
// todo: remove this
// for testing purposes
lastAddedMessageTimestamp = math.MaxInt64
}
iter, err := coll.Find(
query.And{
query.Key{Path: []string{orderKey, "id"}, Filter: query.NewComp(query.CompOpGte, afterOrderId)},

33
util/timeid/timeid.go Normal file
View file

@ -0,0 +1,33 @@
package timeid
import (
"sync/atomic"
"time"
)
var lastUsed int64
// NewNano generates a new ID based on the current time in nanoseconds + atomic counter in the lower bits in case of a collision.
// Within the app lifetime it's guaranteed to be unique and strictly increasing, even across multiple goroutines.
func NewNano() int64 {
for {
// Snapshot the current global 64-bit value
old := atomic.LoadInt64(&lastUsed)
// Construct the new value: timestamp (in ms) << SHIFT
now := time.Now().UnixNano()
// If old already >= now, we are in the same or “earlier” millisecond.
// Just bump old by 1 to ensure strictly increasing.
newVal := now
if old >= now {
newVal = old + 1
}
// Try to swap. If successful, return the new value.
if atomic.CompareAndSwapInt64(&lastUsed, old, newVal) {
return newVal
}
// Otherwise, loop and try again.
}
}

View file

@ -0,0 +1,82 @@
package timeid
import (
"fmt"
"sort"
"sync"
"testing"
"time"
)
// TestAtomicCounterSequential checks that repeated calls in a single goroutine produce strictly increasing values.
func TestNewNanoSequential(t *testing.T) {
const calls = 10
fmt.Println(time.Now().Unix())
fmt.Println(time.Now().UnixMilli())
fmt.Println(time.Now().UnixNano())
prev := NewNano()
for i := 1; i < calls; i++ {
val := NewNano()
fmt.Println(val)
if val <= prev {
t.Fatalf("Value did not strictly increase. got=%d, prev=%d", val, prev)
}
prev = val
}
}
// TestAtomicCounterConcurrent checks that calls from multiple goroutines produce unique, strictly increasing values overall.
func TestNewNanoConcurrent(t *testing.T) {
const (
goroutines = 10
callsPerG = 100
totalCalls = goroutines * callsPerG
)
var wg sync.WaitGroup
results := make([]int64, totalCalls)
// We'll start goroutines, each generating callsPerG IDs.
// Each goroutine fills its portion of the `results` slice.
for g := 0; g < goroutines; g++ {
wg.Add(1)
go func(gid int) {
defer wg.Done()
offset := gid * callsPerG
for i := 0; i < callsPerG; i++ {
results[offset+i] = NewNano()
}
}(g)
}
// Wait for all goroutines to finish
wg.Wait()
// Now we sort all the IDs and ensure they are strictly increasing.
sort.Slice(results, func(i, j int) bool {
return results[i] < results[j]
})
for i := 1; i < totalCalls; i++ {
if results[i] <= results[i-1] {
t.Fatalf("Duplicate or non-increasing value at index %d: %d <= %d",
i, results[i], results[i-1])
}
}
}
// (Optional) TestAtomicCounterTimeDrift checks behavior if we sleep between calls
// to ensure the timestamp portion also changes. This is mostly to illustrate timing
// but not strictly necessary for correctness.
func TestNewNanoTimeDrift(t *testing.T) {
// Get one value
first := NewNano()
// Sleep to ensure a new millisecond passes
time.Sleep(time.Millisecond)
second := NewNano()
// second should definitely be greater
if second <= first {
t.Fatalf("Value did not increase across millisecond boundary. first=%d, second=%d", first, second)
}
}