mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-11 18:20:33 +09:00
GO-2911: Fix a couple of data races
This commit is contained in:
parent
9eb074837b
commit
07358343ad
2 changed files with 9 additions and 6 deletions
|
@ -10,6 +10,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
|
@ -138,12 +139,14 @@ func (e *export) Export(ctx context.Context, req pb.RpcObjectListExportRequest)
|
|||
succeed = e.exportGraphJson(ctx, req, docs, succeed, wr, queue)
|
||||
} else {
|
||||
tasks := make([]process.Task, 0, len(docs))
|
||||
succeed, tasks = e.exportDocs(ctx, req, docs, wr, queue, succeed, tasks)
|
||||
var succeedAsync int64
|
||||
tasks = e.exportDocs(ctx, req, docs, wr, queue, &succeedAsync, tasks)
|
||||
err := queue.Wait(tasks...)
|
||||
if err != nil {
|
||||
e.cleanupFile(wr)
|
||||
return "", 0, err
|
||||
}
|
||||
succeed += int(succeedAsync)
|
||||
}
|
||||
if err = queue.Finalize(); err != nil {
|
||||
e.cleanupFile(wr)
|
||||
|
@ -156,19 +159,19 @@ func (e *export) Export(ctx context.Context, req pb.RpcObjectListExportRequest)
|
|||
return wr.Path(), succeed, nil
|
||||
}
|
||||
|
||||
func (e *export) exportDocs(ctx context.Context, req pb.RpcObjectListExportRequest, docs map[string]*types.Struct, wr writer, queue process.Queue, succeed int, tasks []process.Task) (int, []process.Task) {
|
||||
func (e *export) exportDocs(ctx context.Context, req pb.RpcObjectListExportRequest, docs map[string]*types.Struct, wr writer, queue process.Queue, succeed *int64, tasks []process.Task) []process.Task {
|
||||
for docId := range docs {
|
||||
did := docId
|
||||
task := func() {
|
||||
if werr := e.writeDoc(ctx, req.Format, wr, docs, queue, did, req.IncludeFiles, req.IsJson); werr != nil {
|
||||
log.With("objectID", did).Warnf("can't export doc: %v", werr)
|
||||
} else {
|
||||
succeed++
|
||||
atomic.AddInt64(succeed, 1)
|
||||
}
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
return succeed, tasks
|
||||
return tasks
|
||||
}
|
||||
|
||||
func (e *export) exportGraphJson(ctx context.Context, req pb.RpcObjectListExportRequest, docs map[string]*types.Struct, succeed int, wr writer, queue process.Queue) int {
|
||||
|
|
|
@ -148,10 +148,10 @@ func (c *client) sendNextBatch(info amplitude.AppInfoProvider, batcher *mb.MB[am
|
|||
}
|
||||
|
||||
func (c *client) recordAggregatedData() {
|
||||
c.lock.RLock()
|
||||
c.lock.Lock()
|
||||
toSend := c.aggregatableMap
|
||||
c.aggregatableMap = make(map[string]SamplableEvent)
|
||||
c.lock.RUnlock()
|
||||
c.lock.Unlock()
|
||||
// итерейтим сразу старую мапу и скармливаем ГЦ
|
||||
for _, ev := range toSend {
|
||||
c.send(ev)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue