diff --git a/consensus/cmd/testclient/consensustestclient.go b/consensus/cmd/testclient/consensustestclient.go deleted file mode 100644 index 45d31152..00000000 --- a/consensus/cmd/testclient/consensustestclient.go +++ /dev/null @@ -1,367 +0,0 @@ -package main - -import ( - "bytes" - "context" - "flag" - "fmt" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/dialer" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" - "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/account" - "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusclient" - "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr" - "go.uber.org/zap" - "gopkg.in/mgo.v2/bson" - "math/rand" - "net/http" - _ "net/http/pprof" - "os" - "os/signal" - "sync" - "syscall" - "time" -) - -var log = logger.NewNamed("main") - -var ( - flagConfigFile = flag.String("c", "etc/consensus-config.yml", "path to config file") - flagVersion = flag.Bool("v", false, "show version and exit") - flagHelp = flag.Bool("h", false, "show help and exit") -) - -func main() { - flag.Parse() - rand.Seed(time.Now().UnixNano()) - if *flagVersion { - fmt.Println(app.VersionDescription()) - return - } - if *flagHelp { - flag.PrintDefaults() - return - } - - if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" { - go func() { - http.ListenAndServe(debug, nil) - }() - } - - // create app - ctx := context.Background() - a := new(app.App) - - // open config file - conf, err := config.NewFromFile(*flagConfigFile) - if err != nil { - log.Fatal("can't open config file", zap.Error(err)) - } - - // bootstrap components - a.Register(conf) - Bootstrap(a) - - // start app - if err := a.Start(ctx); err != nil { - log.Fatal("can't start app", zap.Error(err)) - } - log.Info("app started", zap.String("version", a.Version())) - - if err := testClient(a.MustComponent(consensusclient.CName).(consensusclient.Service)); err != nil { - log.Fatal("test error", zap.Error(err)) - } else { - log.Info("test success!") - } - - b := &bench{service: a.MustComponent(consensusclient.CName).(consensusclient.Service)} - b.run() - - // wait exit signal - exit := make(chan os.Signal, 1) - signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT) - sig := <-exit - log.Info("received exit signal, stop app...", zap.String("signal", fmt.Sprint(sig))) - - // close app - ctx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - if err := a.Close(ctx); err != nil { - log.Fatal("close error", zap.Error(err)) - } else { - log.Info("goodbye!") - } - time.Sleep(time.Second / 3) -} - -func Bootstrap(a *app.App) { - a.Register(account.New()). - Register(secure.New()). - Register(nodeconf.New()). - Register(dialer.New()). - Register(pool.New()). - Register(consensusclient.New()) -} - -func testClient(service consensusclient.Service) (err error) { - if err = testCreateLogAndRecord(service); err != nil { - return err - } - if err = testStream(service); err != nil { - return err - } - return -} - -func testCreateLogAndRecord(service consensusclient.Service) (err error) { - ctx := context.Background() - - // create log - newLogId := []byte(bson.NewObjectId()) - st := time.Now() - lastRecId := []byte(bson.NewObjectId()) - err = service.AddLog(ctx, &consensusproto.Log{ - Id: newLogId, - Records: []*consensusproto.Record{ - { - Id: lastRecId, - Payload: []byte("test"), - CreatedUnix: uint64(time.Now().Unix()), - }, - }, - }) - if err != nil { - return err - } - log.Info("log created", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st))) - - // create log with same id - st = time.Now() - - err = service.AddLog(ctx, &consensusproto.Log{ - Id: newLogId, - Records: []*consensusproto.Record{ - { - Id: lastRecId, - Payload: []byte("test"), - CreatedUnix: uint64(time.Now().Unix()), - }, - }, - }) - if err != consensuserr.ErrLogExists { - return fmt.Errorf("unexpected error: '%v' want LogExists", zap.Error(err)) - } - err = nil - log.Info("log duplicate checked", zap.Duration("dur", time.Since(st))) - - // create record - st = time.Now() - recId := []byte(bson.NewObjectId()) - err = service.AddRecord(ctx, newLogId, &consensusproto.Record{ - Id: []byte(bson.NewObjectId()), - PrevId: lastRecId, - CreatedUnix: uint64(time.Now().Unix()), - }) - if err != nil { - return err - } - lastRecId = recId - log.Info("record created", zap.String("id", bson.ObjectId(lastRecId).Hex()), zap.Duration("dur", time.Since(st))) - - // record conflict - st = time.Now() - err = service.AddRecord(ctx, newLogId, &consensusproto.Record{ - Id: []byte(bson.NewObjectId()), - PrevId: []byte(bson.NewObjectId()), - CreatedUnix: uint64(time.Now().Unix()), - }) - if err != consensuserr.ErrConflict { - return fmt.Errorf("unexpected error: '%v' want Conflict", zap.Error(err)) - } - err = nil - log.Info("conflict record checked", zap.Duration("dur", time.Since(st))) - - return -} - -func testStream(service consensusclient.Service) (err error) { - ctx := context.Background() - - // create log - newLogId := []byte(bson.NewObjectId()) - st := time.Now() - lastRecId := []byte(bson.NewObjectId()) - err = service.AddLog(ctx, &consensusproto.Log{ - Id: newLogId, - Records: []*consensusproto.Record{ - { - Id: lastRecId, - Payload: []byte("test"), - CreatedUnix: uint64(time.Now().Unix()), - }, - }, - }) - if err != nil { - return err - } - log.Info("log created", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st))) - - stream, err := service.WatchLog(ctx) - if err != nil { - return err - } - defer stream.Close() - - st = time.Now() - if err = stream.WatchIds([][]byte{newLogId}); err != nil { - return err - } - log.Info("watch", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st))) - - sr := readStream(stream) - for i := 0; i < 10; i++ { - st = time.Now() - recId := []byte(bson.NewObjectId()) - err = service.AddRecord(ctx, newLogId, &consensusproto.Record{ - Id: recId, - PrevId: lastRecId, - CreatedUnix: uint64(time.Now().Unix()), - }) - if err != nil { - return err - } - lastRecId = recId - log.Info("record created", zap.String("id", bson.ObjectId(lastRecId).Hex()), zap.Duration("dur", time.Since(st))) - } - sr.validate() - return nil -} - -func readStream(stream consensusclient.Stream) *streamReader { - sr := &streamReader{stream: stream, logs: map[string]*consensusproto.Log{}} - go sr.read() - return sr -} - -type streamReader struct { - stream consensusclient.Stream - logs map[string]*consensusproto.Log -} - -func (sr *streamReader) read() { - for { - recs := sr.stream.WaitLogs() - if len(recs) == 0 { - return - } - for _, rec := range recs { - if el, ok := sr.logs[string(rec.Id)]; !ok { - sr.logs[string(rec.Id)] = &consensusproto.Log{ - Id: rec.Id, - Records: rec.Records, - } - } else { - el.Records = append(rec.Records, el.Records...) - sr.logs[string(rec.Id)] = el - } - } - } -} - -func (sr *streamReader) validate() { - var lc, rc int - for _, log := range sr.logs { - lc++ - rc += len(log.Records) - validateLog(log) - } - fmt.Println("logs valid; log count:", lc, "records:", rc) -} - -func validateLog(log *consensusproto.Log) { - var prevId []byte - for _, rec := range log.Records { - if len(prevId) != 0 { - if !bytes.Equal(prevId, rec.Id) { - panic(fmt.Sprintf("invalid log: %+v", log)) - } - } - prevId = rec.PrevId - } -} - -type bench struct { - service consensusclient.Service - stream consensusclient.Stream -} - -func (b *bench) run() { - var err error - b.stream, err = b.service.WatchLog(context.Background()) - if err != nil { - panic(err) - } - defer b.stream.Close() - sr := readStream(b.stream) - var wg sync.WaitGroup - for i := 0; i < 1000; i++ { - time.Sleep(time.Second / 100) - wg.Add(1) - go func() { - defer wg.Done() - b.client() - }() - fmt.Println("total streams:", i+1) - } - wg.Wait() - sr.validate() -} - -func (b *bench) client() { - ctx := context.Background() - - // create log - newLogId := []byte(bson.NewObjectId()) - lastRecId := []byte(bson.NewObjectId()) - err := b.service.AddLog(ctx, &consensusproto.Log{ - Id: newLogId, - Records: []*consensusproto.Record{ - { - Id: lastRecId, - Payload: []byte("test"), - CreatedUnix: uint64(time.Now().Unix()), - }, - }, - }) - for i := 0; i < 5; i++ { - fmt.Println("watch", bson.ObjectId(newLogId).Hex()) - if err = b.stream.WatchIds([][]byte{newLogId}); err != nil { - panic(err) - } - for i := 0; i < rand.Intn(20); i++ { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(10000))) - recId := []byte(bson.NewObjectId()) - err = b.service.AddRecord(ctx, newLogId, &consensusproto.Record{ - Id: recId, - PrevId: lastRecId, - Payload: []byte("some payload 1 2 3 4 5 6 6 7 oijoj"), - CreatedUnix: uint64(time.Now().Unix()), - }) - if err != nil { - panic(err) - } - lastRecId = recId - } - if err = b.stream.UnwatchIds([][]byte{newLogId}); err != nil { - panic(err) - } - fmt.Println("unwatch", bson.ObjectId(newLogId).Hex()) - time.Sleep(time.Minute * 1) - } -} diff --git a/consensus/consensus.go b/consensus/consensus.go index 7098984e..02371b1b 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -5,7 +5,7 @@ import "time" type Log struct { Id []byte `bson:"_id"` Records []Record `bson:"records"` - Err error + Err error `bson:"-"` } type Record struct {