From fe6b71059d3361c05fd9cfc41f08f9fab94e00f5 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 8 Sep 2022 21:56:32 +0300 Subject: [PATCH] package moving + commonaspace and nidespace services --- Makefile | 2 +- app/app.go | 4 +- app/app_test.go | 2 +- cmd/node/node.go | 20 +- {service => common}/account/service.go | 3 +- common/commonspace/periodicsync.go | 58 +++++ .../commonspace}/remotediff/remotediff.go | 24 +- .../remotediff/remotediff_test.go | 4 +- common/commonspace/rpchandler.go | 25 ++ common/commonspace/service.go | 49 ++++ common/commonspace/space.go | 115 +++++++++ .../spacesyncproto}/protos/spacesync.proto | 10 +- .../spacesyncproto}/spacesync.pb.go | 242 +++++++++++++++--- .../spacesyncproto/spacesync_drpc.pb.go | 188 ++++++++++++++ {service => common}/net/dialer/dialer.go | 6 +- {service => common}/net/peer/peer.go | 0 {service => common}/net/pool/pool.go | 18 +- {service => common}/net/pool/pool_test.go | 6 +- .../net/rpc/server/drpcserver.go | 16 +- {service => common}/net/rpc/server/util.go | 0 .../net/rpc/server/util_windows.go | 0 {service => common}/net/secure/context.go | 0 {service => common}/net/secure/listener.go | 0 {service => common}/net/secure/service.go | 2 +- .../nodeconf}/configuration.go | 8 +- .../nodeconf}/service.go | 9 +- config/config.go | 3 +- node/nodespace/rpchandler.go | 30 +++ node/nodespace/service.go | 71 +++++ pkg/ocache/ocache.go | 31 ++- service/api/service.go | 2 +- service/document/service.go | 4 +- service/node/service.go | 2 +- service/space/context.go | 22 -- service/space/rpc.go | 18 -- service/space/service.go | 76 ------ service/space/space.go | 142 ---------- service/space/spacesync/spacesync_drpc.pb.go | 113 -------- service/sync/message/service.go | 7 +- service/sync/requesthandler/requesthandler.go | 4 +- service/treecache/service.go | 112 ++++---- 41 files changed, 896 insertions(+), 552 deletions(-) rename {service => common}/account/service.go (95%) create mode 100644 common/commonspace/periodicsync.go rename {service/space => common/commonspace}/remotediff/remotediff.go (66%) rename {service/space => common/commonspace}/remotediff/remotediff_test.go (84%) create mode 100644 common/commonspace/rpchandler.go create mode 100644 common/commonspace/service.go create mode 100644 common/commonspace/space.go rename {service/space/spacesync => common/commonspace/spacesyncproto}/protos/spacesync.proto (83%) rename {service/space/spacesync => common/commonspace/spacesyncproto}/spacesync.pb.go (81%) create mode 100644 common/commonspace/spacesyncproto/spacesync_drpc.pb.go rename {service => common}/net/dialer/dialer.go (90%) rename {service => common}/net/peer/peer.go (100%) rename {service => common}/net/pool/pool.go (81%) rename {service => common}/net/pool/pool_test.go (95%) rename {service => common}/net/rpc/server/drpcserver.go (84%) rename {service => common}/net/rpc/server/util.go (100%) rename {service => common}/net/rpc/server/util_windows.go (100%) rename {service => common}/net/secure/context.go (100%) rename {service => common}/net/secure/listener.go (100%) rename {service => common}/net/secure/service.go (96%) rename {service/configuration => common/nodeconf}/configuration.go (88%) rename {service/configuration => common/nodeconf}/service.go (87%) create mode 100644 node/nodespace/rpchandler.go create mode 100644 node/nodespace/service.go delete mode 100644 service/space/context.go delete mode 100644 service/space/rpc.go delete mode 100644 service/space/service.go delete mode 100644 service/space/space.go delete mode 100644 service/space/spacesync/spacesync_drpc.pb.go diff --git a/Makefile b/Makefile index c7282f4f..2480115f 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ protos: $(GOGO_START) protoc --gogofaster_out=:. $(P_PLAINTEXT_CHANGES_PATH_PB)/protos/*.proto; mv $(P_PLAINTEXT_CHANGES_PATH_PB)/protos/*.go $(P_PLAINTEXT_CHANGES_PATH_PB) $(eval PKGMAP := $$(P_ACL_CHANGES),$$(P_TREE_CHANGES)) $(GOGO_START) protoc --gogofaster_out=$(PKGMAP):. $(P_SYNC_CHANGES_PATH_PB)/proto/*.proto - $(GOGO_START) protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. service/space/spacesync/protos/*.proto + protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. common/commonspace/spacesyncproto/protos/*.proto build: @$(eval FLAGS := $$(shell govvv -flags -pkg github.com/anytypeio/go-anytype-infrastructure-experiments/app)) diff --git a/app/app.go b/app/app.go index ef0d46bd..718b24b6 100644 --- a/app/app.go +++ b/app/app.go @@ -27,7 +27,7 @@ var ( type Component interface { // Init will be called first // When returned error is not nil - app start will be aborted - Init(ctx context.Context, a *App) (err error) + Init(a *App) (err error) // Name must return unique service name Name() (name string) } @@ -157,7 +157,7 @@ func (app *App) Start(ctx context.Context) (err error) { } for i, s := range app.components { - if err = s.Init(ctx, app); err != nil { + if err = s.Init(app); err != nil { closeServices(i) return fmt.Errorf("can't init service '%s': %v", s.Name(), err) } diff --git a/app/app_test.go b/app/app_test.go index b4a93007..cdc52445 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -131,7 +131,7 @@ type testComponent struct { ids testIds } -func (t *testComponent) Init(ctx context.Context, a *App) error { +func (t *testComponent) Init(a *App) error { t.ids.initId = t.seq.New() return t.err } diff --git a/cmd/node/node.go b/cmd/node/node.go index c73a164c..6f4cdf88 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -6,15 +6,16 @@ import ( "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/dialer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" + "github.com/anytypeio/go-anytype-infrastructure-experiments/node/nodespace" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space" "go.uber.org/zap" "net/http" _ "net/http/pprof" @@ -90,11 +91,12 @@ func main() { func Bootstrap(a *app.App) { a.Register(account.New()). Register(node.New()). - Register(configuration.New()). + Register(nodeconf.New()). Register(secure.New()). Register(dialer.New()). Register(pool.New()). - Register(space.New()). + Register(nodespace.New()). + Register(commonspace.New()). Register(server.New()) //Register(document.New()). diff --git a/service/account/service.go b/common/account/service.go similarity index 95% rename from service/account/service.go rename to common/account/service.go index f617fdd4..7b0cc621 100644 --- a/service/account/service.go +++ b/common/account/service.go @@ -1,7 +1,6 @@ package account import ( - "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account" @@ -33,7 +32,7 @@ func New() app.Component { return &service{} } -func (s *service) Init(ctx context.Context, a *app.App) (err error) { +func (s *service) Init(a *app.App) (err error) { cfg := a.MustComponent(config.CName).(*config.Config) // decoding our keys diff --git a/common/commonspace/periodicsync.go b/common/commonspace/periodicsync.go new file mode 100644 index 00000000..219f953e --- /dev/null +++ b/common/commonspace/periodicsync.go @@ -0,0 +1,58 @@ +package commonspace + +import ( + "context" + "go.uber.org/zap" + "time" +) + +func newPeriodicSync(periodSeconds int, sync func(ctx context.Context) error, l *zap.Logger) *periodicSync { + ctx, cancel := context.WithCancel(context.Background()) + ps := &periodicSync{ + log: l, + sync: sync, + syncCtx: ctx, + syncCancel: cancel, + syncLoopDone: make(chan struct{}), + } + go ps.syncLoop(periodSeconds) + return ps +} + +type periodicSync struct { + log *zap.Logger + sync func(ctx context.Context) error + syncCtx context.Context + syncCancel context.CancelFunc + syncLoopDone chan struct{} +} + +func (p *periodicSync) syncLoop(periodSeconds int) { + period := time.Duration(periodSeconds) * time.Second + defer close(p.syncLoopDone) + doSync := func() { + ctx, cancel := context.WithTimeout(p.syncCtx, time.Minute) + defer cancel() + if err := p.sync(ctx); err != nil { + p.log.Warn("periodic sync error", zap.Error(err)) + } + } + doSync() + if period > 0 { + ticker := time.NewTicker(period) + defer ticker.Stop() + for { + select { + case <-p.syncCtx.Done(): + return + case <-ticker.C: + doSync() + } + } + } +} + +func (p *periodicSync) Close() { + p.syncCancel() + <-p.syncLoopDone +} diff --git a/service/space/remotediff/remotediff.go b/common/commonspace/remotediff/remotediff.go similarity index 66% rename from service/space/remotediff/remotediff.go rename to common/commonspace/remotediff/remotediff.go index 2ba6769a..1a01814c 100644 --- a/service/space/remotediff/remotediff.go +++ b/common/commonspace/remotediff/remotediff.go @@ -2,12 +2,12 @@ package remotediff import ( "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync" ) type Client interface { - HeadSync(ctx context.Context, in *spacesync.HeadSyncRequest) (*spacesync.HeadSyncResponse, error) + HeadSync(ctx context.Context, in *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) } func NewRemoteDiff(spaceId string, client Client) ldiff.Remote { @@ -24,15 +24,15 @@ type remote struct { func (r remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) { results = resBuf[:0] - pbRanges := make([]*spacesync.HeadSyncRange, 0, len(ranges)) + pbRanges := make([]*spacesyncproto.HeadSyncRange, 0, len(ranges)) for _, rg := range ranges { - pbRanges = append(pbRanges, &spacesync.HeadSyncRange{ + pbRanges = append(pbRanges, &spacesyncproto.HeadSyncRange{ From: rg.From, To: rg.To, Limit: uint32(rg.Limit), }) } - req := &spacesync.HeadSyncRequest{ + req := &spacesyncproto.HeadSyncRequest{ SpaceId: r.spaceId, Ranges: pbRanges, } @@ -60,7 +60,7 @@ func (r remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff return } -func HandlerRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesync.HeadSyncRequest) (resp *spacesync.HeadSyncResponse, err error) { +func HandlerRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { ranges := make([]ldiff.Range, 0, len(req.Ranges)) for _, reqRange := range req.Ranges { ranges = append(ranges, ldiff.Range{ @@ -74,21 +74,21 @@ func HandlerRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesync.HeadS return } - resp = &spacesync.HeadSyncResponse{ - Results: make([]*spacesync.HeadSyncResult, 0, len(res)), + resp = &spacesyncproto.HeadSyncResponse{ + Results: make([]*spacesyncproto.HeadSyncResult, 0, len(res)), } for _, rangeRes := range res { - var elements []*spacesync.HeadSyncResultElement + var elements []*spacesyncproto.HeadSyncResultElement if len(rangeRes.Elements) > 0 { - elements = make([]*spacesync.HeadSyncResultElement, 0, len(rangeRes.Elements)) + elements = make([]*spacesyncproto.HeadSyncResultElement, 0, len(rangeRes.Elements)) for _, el := range rangeRes.Elements { - elements = append(elements, &spacesync.HeadSyncResultElement{ + elements = append(elements, &spacesyncproto.HeadSyncResultElement{ Id: el.Id, Head: el.Head, }) } } - resp.Results = append(resp.Results, &spacesync.HeadSyncResult{ + resp.Results = append(resp.Results, &spacesyncproto.HeadSyncResult{ Hash: rangeRes.Hash, Elements: elements, Count: uint32(rangeRes.Count), diff --git a/service/space/remotediff/remotediff_test.go b/common/commonspace/remotediff/remotediff_test.go similarity index 84% rename from service/space/remotediff/remotediff_test.go rename to common/commonspace/remotediff/remotediff_test.go index e1cf96ed..d209b753 100644 --- a/service/space/remotediff/remotediff_test.go +++ b/common/commonspace/remotediff/remotediff_test.go @@ -3,8 +3,8 @@ package remotediff import ( "context" "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "testing" @@ -36,6 +36,6 @@ type mockClient struct { l ldiff.Diff } -func (m *mockClient) HeadSync(ctx context.Context, in *spacesync.HeadSyncRequest) (*spacesync.HeadSyncResponse, error) { +func (m *mockClient) HeadSync(ctx context.Context, in *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { return HandlerRangeRequest(ctx, m.l, in) } diff --git a/common/commonspace/rpchandler.go b/common/commonspace/rpchandler.go new file mode 100644 index 00000000..3ce71e1a --- /dev/null +++ b/common/commonspace/rpchandler.go @@ -0,0 +1,25 @@ +package commonspace + +import ( + "context" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" +) + +type RpcHandler interface { + HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) + Stream(stream spacesyncproto.DRPCSpace_StreamStream) error +} + +type rpcHandler struct { + s *space +} + +func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { + return remotediff.HandlerRangeRequest(ctx, r.s.diff, req) +} + +func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { + return fmt.Errorf("not implemented") +} diff --git a/common/commonspace/service.go b/common/commonspace/service.go new file mode 100644 index 00000000..85dd8ae9 --- /dev/null +++ b/common/commonspace/service.go @@ -0,0 +1,49 @@ +package commonspace + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" +) + +const CName = "common.commonspace" + +var log = logger.NewNamed(CName) + +func New() Service { + return &service{} +} + +type Service interface { + CreateSpace(ctx context.Context, id string) (sp Space, err error) + app.Component +} + +type service struct { + config config.Space + configurationService nodeconf.Service +} + +func (s *service) Init(a *app.App) (err error) { + s.config = a.MustComponent(config.CName).(*config.Config).Space + s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) + return nil +} + +func (s *service) Name() (name string) { + return CName +} + +func (s *service) CreateSpace(ctx context.Context, id string) (Space, error) { + sp := &space{ + id: id, + nconf: s.configurationService.GetLast(), + conf: s.config, + } + if err := sp.Init(ctx); err != nil { + return nil, err + } + return sp, nil +} diff --git a/common/commonspace/space.go b/common/commonspace/space.go new file mode 100644 index 00000000..4f83dd97 --- /dev/null +++ b/common/commonspace/space.go @@ -0,0 +1,115 @@ +package commonspace + +import ( + "context" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" + "go.uber.org/zap" + "math/rand" + "sync" + "time" +) + +type Space interface { + Id() string + + SpaceSyncRpc() RpcHandler + + Close() error +} + +type space struct { + id string + nconf nodeconf.Configuration + conf config.Space + diff ldiff.Diff + rpc *rpcHandler + periodicSync *periodicSync + mu sync.RWMutex +} + +func (s *space) Id() string { + return s.id +} + +func (s *space) Init(ctx context.Context) error { + s.diff = ldiff.New(16, 16) + s.periodicSync = newPeriodicSync(s.conf.SyncPeriod, s.sync, log.With(zap.String("spaceId", s.id))) + s.rpc = &rpcHandler{s: s} + s.testFill() + return nil +} + +func (s *space) SpaceSyncRpc() RpcHandler { + return s.rpc +} + +func (s *space) testFill() { + var n = 1000 + var els = make([]ldiff.Element, 0, n) + rand.Seed(time.Now().UnixNano()) + for i := 0; i < n; i++ { + if rand.Intn(n) > 2 { + id := fmt.Sprintf("%s.%d", s.id, i) + head := "head." + id + if rand.Intn(n) > n-10 { + head += ".modified" + } + el := ldiff.Element{ + Id: id, + Head: head, + } + els = append(els, el) + } + } + s.diff.Set(els...) +} + +func (s *space) sync(ctx context.Context) error { + st := time.Now() + peers, err := s.getPeers(ctx) + if err != nil { + return err + } + for _, p := range peers { + if err := s.syncWithPeer(ctx, p); err != nil { + log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) + } + } + log.Info("synced", zap.String("spaceId", s.id), zap.Duration("dur", time.Since(st))) + return nil +} + +func (s *space) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { + cl := spacesyncproto.NewDRPCSpaceClient(p) + rdiff := remotediff.NewRemoteDiff(s.id, cl) + newIds, changedIds, removedIds, err := s.diff.Diff(ctx, rdiff) + if err != nil { + return nil + } + log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), zap.Int("removedIds", len(removedIds))) + return +} + +func (s *space) getPeers(ctx context.Context) (peers []peer.Peer, err error) { + if s.nconf.IsResponsible(s.id) { + return s.nconf.AllPeers(ctx, s.id) + } else { + var p peer.Peer + p, err = s.nconf.OnePeer(ctx, s.id) + if err != nil { + return nil, err + } + return []peer.Peer{p}, nil + } +} + +func (s *space) Close() error { + s.periodicSync.Close() + return nil +} diff --git a/service/space/spacesync/protos/spacesync.proto b/common/commonspace/spacesyncproto/protos/spacesync.proto similarity index 83% rename from service/space/spacesync/protos/spacesync.proto rename to common/commonspace/spacesyncproto/protos/spacesync.proto index f0b68b11..0a1e434d 100644 --- a/service/space/spacesync/protos/spacesync.proto +++ b/common/commonspace/spacesyncproto/protos/spacesync.proto @@ -1,16 +1,24 @@ syntax = "proto3"; package anySpace; -option go_package = "service/space/spacesync"; +option go_package = "common/commonspace/spacesyncproto"; enum ErrCodes { Unexpected = 0; } + service Space { // HeadSync compares all objects and their hashes in a space rpc HeadSync(HeadSyncRequest) returns (HeadSyncResponse); + rpc Stream( stream Msg) returns (stream Msg); } +// TODO: temporary mock message +message Msg { + string spaceId = 1; +} + + // HeadSyncRange presenting a request for one range message HeadSyncRange { diff --git a/service/space/spacesync/spacesync.pb.go b/common/commonspace/spacesyncproto/spacesync.pb.go similarity index 81% rename from service/space/spacesync/spacesync.pb.go rename to common/commonspace/spacesyncproto/spacesync.pb.go index 92772dc4..9db560cb 100644 --- a/service/space/spacesync/spacesync.pb.go +++ b/common/commonspace/spacesyncproto/spacesync.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: service/space/spacesync/protos/spacesync.proto +// source: common/commonspace/spacesyncproto/protos/spacesync.proto -package spacesync +package spacesyncproto import ( fmt "fmt" @@ -41,7 +41,52 @@ func (x ErrCodes) String() string { } func (ErrCodes) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_11d78c2fb7c80384, []int{0} + return fileDescriptor_5855f4ef9cf24cdb, []int{0} +} + +// TODO: temporary mock message +type Msg struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` +} + +func (m *Msg) Reset() { *m = Msg{} } +func (m *Msg) String() string { return proto.CompactTextString(m) } +func (*Msg) ProtoMessage() {} +func (*Msg) Descriptor() ([]byte, []int) { + return fileDescriptor_5855f4ef9cf24cdb, []int{0} +} +func (m *Msg) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Msg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Msg.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Msg) XXX_Merge(src proto.Message) { + xxx_messageInfo_Msg.Merge(m, src) +} +func (m *Msg) XXX_Size() int { + return m.Size() +} +func (m *Msg) XXX_DiscardUnknown() { + xxx_messageInfo_Msg.DiscardUnknown(m) +} + +var xxx_messageInfo_Msg proto.InternalMessageInfo + +func (m *Msg) GetSpaceId() string { + if m != nil { + return m.SpaceId + } + return "" } // HeadSyncRange presenting a request for one range @@ -55,7 +100,7 @@ func (m *HeadSyncRange) Reset() { *m = HeadSyncRange{} } func (m *HeadSyncRange) String() string { return proto.CompactTextString(m) } func (*HeadSyncRange) ProtoMessage() {} func (*HeadSyncRange) Descriptor() ([]byte, []int) { - return fileDescriptor_11d78c2fb7c80384, []int{0} + return fileDescriptor_5855f4ef9cf24cdb, []int{1} } func (m *HeadSyncRange) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -116,7 +161,7 @@ func (m *HeadSyncResult) Reset() { *m = HeadSyncResult{} } func (m *HeadSyncResult) String() string { return proto.CompactTextString(m) } func (*HeadSyncResult) ProtoMessage() {} func (*HeadSyncResult) Descriptor() ([]byte, []int) { - return fileDescriptor_11d78c2fb7c80384, []int{1} + return fileDescriptor_5855f4ef9cf24cdb, []int{2} } func (m *HeadSyncResult) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -176,7 +221,7 @@ func (m *HeadSyncResultElement) Reset() { *m = HeadSyncResultElement{} } func (m *HeadSyncResultElement) String() string { return proto.CompactTextString(m) } func (*HeadSyncResultElement) ProtoMessage() {} func (*HeadSyncResultElement) Descriptor() ([]byte, []int) { - return fileDescriptor_11d78c2fb7c80384, []int{2} + return fileDescriptor_5855f4ef9cf24cdb, []int{3} } func (m *HeadSyncResultElement) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -229,7 +274,7 @@ func (m *HeadSyncRequest) Reset() { *m = HeadSyncRequest{} } func (m *HeadSyncRequest) String() string { return proto.CompactTextString(m) } func (*HeadSyncRequest) ProtoMessage() {} func (*HeadSyncRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_11d78c2fb7c80384, []int{3} + return fileDescriptor_5855f4ef9cf24cdb, []int{4} } func (m *HeadSyncRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -281,7 +326,7 @@ func (m *HeadSyncResponse) Reset() { *m = HeadSyncResponse{} } func (m *HeadSyncResponse) String() string { return proto.CompactTextString(m) } func (*HeadSyncResponse) ProtoMessage() {} func (*HeadSyncResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_11d78c2fb7c80384, []int{4} + return fileDescriptor_5855f4ef9cf24cdb, []int{5} } func (m *HeadSyncResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -319,6 +364,7 @@ func (m *HeadSyncResponse) GetResults() []*HeadSyncResult { func init() { proto.RegisterEnum("anySpace.ErrCodes", ErrCodes_name, ErrCodes_value) + proto.RegisterType((*Msg)(nil), "anySpace.Msg") proto.RegisterType((*HeadSyncRange)(nil), "anySpace.HeadSyncRange") proto.RegisterType((*HeadSyncResult)(nil), "anySpace.HeadSyncResult") proto.RegisterType((*HeadSyncResultElement)(nil), "anySpace.HeadSyncResultElement") @@ -327,35 +373,66 @@ func init() { } func init() { - proto.RegisterFile("service/space/spacesync/protos/spacesync.proto", fileDescriptor_11d78c2fb7c80384) + proto.RegisterFile("common/commonspace/spacesyncproto/protos/spacesync.proto", fileDescriptor_5855f4ef9cf24cdb) } -var fileDescriptor_11d78c2fb7c80384 = []byte{ - // 369 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x92, 0xbf, 0x6e, 0xe2, 0x40, - 0x10, 0xc6, 0x6d, 0xf3, 0xcf, 0xcc, 0x1d, 0x1c, 0x5a, 0xdd, 0x09, 0x1f, 0x85, 0x0f, 0xb9, 0x42, - 0x57, 0x18, 0x85, 0x94, 0x54, 0x49, 0x44, 0x14, 0x52, 0x2e, 0x4a, 0x13, 0xa5, 0x71, 0xec, 0x49, - 0xb0, 0x04, 0xbb, 0x8e, 0x77, 0x49, 0xc2, 0x5b, 0xe4, 0xb1, 0x52, 0x52, 0xa6, 0x8c, 0xe0, 0x45, - 0x22, 0x0f, 0x31, 0x24, 0x92, 0x69, 0xac, 0x99, 0xf1, 0x7c, 0xdf, 0xfc, 0x76, 0x34, 0xe0, 0x2b, - 0x4c, 0x1f, 0xe3, 0x10, 0xfb, 0x2a, 0x09, 0xf2, 0xaf, 0x5a, 0x8a, 0xb0, 0x9f, 0xa4, 0x52, 0x4b, - 0xb5, 0x2f, 0xf8, 0x54, 0x60, 0x76, 0x20, 0x96, 0x93, 0xac, 0xe6, 0x8d, 0xa1, 0x71, 0x81, 0x41, - 0x34, 0x59, 0x8a, 0x90, 0x07, 0xe2, 0x1e, 0x19, 0x83, 0xf2, 0x5d, 0x2a, 0xe7, 0x8e, 0xd9, 0x35, - 0x7b, 0x65, 0x4e, 0x31, 0x6b, 0x82, 0xa5, 0xa5, 0x63, 0x51, 0xc5, 0xd2, 0x92, 0xfd, 0x86, 0xca, - 0x2c, 0x9e, 0xc7, 0xda, 0x29, 0x75, 0xcd, 0x5e, 0x83, 0x6f, 0x13, 0xef, 0x09, 0x9a, 0x3b, 0x2b, - 0x54, 0x8b, 0x99, 0xce, 0xbc, 0xa6, 0x81, 0x9a, 0x92, 0xd7, 0x4f, 0x4e, 0x31, 0x1b, 0x82, 0x8d, - 0x33, 0x9c, 0xa3, 0xd0, 0xca, 0xb1, 0xba, 0xa5, 0xde, 0x8f, 0xc1, 0x3f, 0x3f, 0xa7, 0xf1, 0xbf, - 0xeb, 0x47, 0xdb, 0x3e, 0xbe, 0x13, 0x64, 0x83, 0x43, 0xb9, 0x10, 0xbb, 0xc1, 0x94, 0x78, 0x43, - 0xf8, 0x53, 0x28, 0xcc, 0xb8, 0xe3, 0x88, 0xa6, 0xd7, 0xb9, 0x15, 0x47, 0xc4, 0x83, 0x41, 0x44, - 0x2f, 0xa9, 0x73, 0x8a, 0xbd, 0x1b, 0xf8, 0xb5, 0x17, 0x3f, 0x2c, 0x50, 0x69, 0xe6, 0x40, 0x8d, - 0x16, 0x36, 0xce, 0xb5, 0x79, 0xca, 0xfa, 0x50, 0x4d, 0xb3, 0x2d, 0xe5, 0xe8, 0xed, 0x02, 0xf4, - 0xec, 0x3f, 0xff, 0x6c, 0xf3, 0xce, 0xa1, 0xf5, 0x05, 0x2d, 0x91, 0x42, 0x21, 0x1b, 0x40, 0x2d, - 0x25, 0x4c, 0xe5, 0x98, 0xe4, 0xe2, 0x1c, 0x5a, 0x00, 0xcf, 0x1b, 0xff, 0x77, 0xc0, 0x1e, 0xa5, - 0xe9, 0x99, 0x8c, 0x50, 0xb1, 0x26, 0xc0, 0x95, 0xc0, 0xe7, 0x04, 0x43, 0x8d, 0x51, 0xcb, 0x18, - 0x5c, 0x42, 0x85, 0xc4, 0xec, 0x04, 0xec, 0x5c, 0xcf, 0xfe, 0x16, 0x79, 0xd2, 0xf3, 0x3a, 0x9d, - 0xc2, 0x71, 0xc4, 0x76, 0x7a, 0xf4, 0xba, 0x76, 0xcd, 0xd5, 0xda, 0x35, 0xdf, 0xd7, 0xae, 0xf9, - 0xb2, 0x71, 0x8d, 0xd5, 0xc6, 0x35, 0xde, 0x36, 0xae, 0x71, 0xdd, 0x3e, 0x70, 0x62, 0xb7, 0x55, - 0x3a, 0xa9, 0xe3, 0x8f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x09, 0x7e, 0xc8, 0x77, 0x84, 0x02, 0x00, - 0x00, +var fileDescriptor_5855f4ef9cf24cdb = []byte{ + // 400 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x52, 0xcd, 0xce, 0xd2, 0x40, + 0x14, 0xed, 0x94, 0xbf, 0x72, 0x15, 0x24, 0x13, 0x8d, 0xb5, 0x8b, 0x82, 0x5d, 0x35, 0x2c, 0xa8, + 0xc1, 0x8d, 0x09, 0x2b, 0x35, 0x18, 0x59, 0xb0, 0x19, 0xe2, 0xc6, 0xb8, 0xa9, 0xed, 0x08, 0x4d, + 0xe8, 0x4c, 0xed, 0x0c, 0x2a, 0x6f, 0xe1, 0x63, 0xb9, 0x64, 0xe9, 0xd2, 0xc0, 0x8b, 0x98, 0xde, + 0x7e, 0x85, 0x8f, 0xa4, 0xe4, 0xdb, 0xcc, 0xfd, 0xe9, 0x39, 0xe7, 0x9e, 0x7b, 0x53, 0x78, 0x13, + 0xc9, 0x34, 0x95, 0x22, 0x28, 0x83, 0xca, 0xc2, 0x88, 0x07, 0xf8, 0xaa, 0xbd, 0x88, 0xb2, 0x5c, + 0x6a, 0x19, 0xe0, 0xab, 0x2e, 0xdd, 0x09, 0x36, 0xa8, 0x15, 0x8a, 0xfd, 0xaa, 0xe8, 0x79, 0x43, + 0x68, 0x2c, 0xd5, 0x9a, 0xda, 0xd0, 0x41, 0xcc, 0x22, 0xb6, 0xc9, 0x88, 0xf8, 0x5d, 0x56, 0x95, + 0xde, 0x02, 0x7a, 0x1f, 0x79, 0x18, 0xaf, 0xf6, 0x22, 0x62, 0xa1, 0x58, 0x73, 0x4a, 0xa1, 0xf9, + 0x2d, 0x97, 0x29, 0xe2, 0x9a, 0x0c, 0x73, 0xda, 0x07, 0x53, 0x4b, 0xdb, 0xc4, 0x8e, 0xa9, 0x25, + 0x7d, 0x0a, 0xad, 0x6d, 0x92, 0x26, 0xda, 0x6e, 0x8c, 0x88, 0xdf, 0x63, 0x65, 0xe1, 0xfd, 0x84, + 0xfe, 0x59, 0x8a, 0xab, 0xdd, 0x56, 0x17, 0x5a, 0x9b, 0x50, 0x6d, 0x50, 0xeb, 0x31, 0xc3, 0x9c, + 0xce, 0xc0, 0xe2, 0x5b, 0x9e, 0x72, 0xa1, 0x95, 0x6d, 0x8e, 0x1a, 0xfe, 0xa3, 0xe9, 0x70, 0x52, + 0xd9, 0x9d, 0x5c, 0xf3, 0xe7, 0x25, 0x8e, 0x9d, 0x09, 0xc5, 0xe0, 0x48, 0xee, 0xc4, 0x79, 0x30, + 0x16, 0xde, 0x0c, 0x9e, 0xd5, 0x12, 0x0b, 0xdf, 0x49, 0xb5, 0xb1, 0x99, 0xc4, 0xe8, 0x87, 0x87, + 0x31, 0x6e, 0xd2, 0x65, 0x98, 0x7b, 0x5f, 0xe0, 0xc9, 0x85, 0xfc, 0x7d, 0xc7, 0x95, 0xbe, 0x7d, + 0x2d, 0x1a, 0x40, 0x3b, 0x2f, 0xae, 0x54, 0x59, 0x7f, 0x5e, 0x63, 0xbd, 0xf8, 0xce, 0xee, 0x60, + 0xde, 0x07, 0x18, 0xdc, 0xb3, 0x96, 0x49, 0xa1, 0x38, 0x9d, 0x42, 0x27, 0x47, 0x9b, 0xca, 0x26, + 0xa8, 0x62, 0xdf, 0x3a, 0x00, 0xab, 0x80, 0x63, 0x07, 0xac, 0x79, 0x9e, 0xbf, 0x97, 0x31, 0x57, + 0xb4, 0x0f, 0xf0, 0x49, 0xf0, 0x5f, 0x19, 0x8f, 0x34, 0x8f, 0x07, 0xc6, 0xf4, 0x07, 0xb4, 0x90, + 0x4c, 0xdf, 0x82, 0x55, 0xf1, 0xe9, 0x8b, 0x3a, 0x4d, 0x5c, 0xcf, 0x71, 0x6a, 0xc7, 0x95, 0xde, + 0xc6, 0xd0, 0x5e, 0xe9, 0x9c, 0x87, 0x29, 0xed, 0x5d, 0x50, 0x4b, 0xb5, 0x76, 0xae, 0x4b, 0x9f, + 0xbc, 0x22, 0xef, 0x66, 0x7f, 0x8e, 0x2e, 0x39, 0x1c, 0x5d, 0xf2, 0xef, 0xe8, 0x92, 0xdf, 0x27, + 0xd7, 0x38, 0x9c, 0x5c, 0xe3, 0xef, 0xc9, 0x35, 0x3e, 0xbf, 0x7c, 0xf0, 0xcf, 0xfd, 0xda, 0xc6, + 0xf0, 0xfa, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x5a, 0xf6, 0x94, 0x21, 0xe5, 0x02, 0x00, 0x00, +} + +func (m *Msg) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Msg) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Msg) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.SpaceId) > 0 { + i -= len(m.SpaceId) + copy(dAtA[i:], m.SpaceId) + i = encodeVarintSpacesync(dAtA, i, uint64(len(m.SpaceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil } func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) { @@ -574,6 +651,19 @@ func encodeVarintSpacesync(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } +func (m *Msg) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.SpaceId) + if l > 0 { + n += 1 + l + sovSpacesync(uint64(l)) + } + return n +} + func (m *HeadSyncRange) Size() (n int) { if m == nil { return 0 @@ -671,6 +761,88 @@ func sovSpacesync(x uint64) (n int) { func sozSpacesync(x uint64) (n int) { return sovSpacesync(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (m *Msg) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Msg: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Msg: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSpacesync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSpacesync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SpaceId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSpacesync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSpacesync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *HeadSyncRange) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/common/commonspace/spacesyncproto/spacesync_drpc.pb.go b/common/commonspace/spacesyncproto/spacesync_drpc.pb.go new file mode 100644 index 00000000..b17dc589 --- /dev/null +++ b/common/commonspace/spacesyncproto/spacesync_drpc.pb.go @@ -0,0 +1,188 @@ +// Code generated by protoc-gen-go-drpc. DO NOT EDIT. +// protoc-gen-go-drpc version: v0.0.32 +// source: common/commonspace/spacesyncproto/protos/spacesync.proto + +package spacesyncproto + +import ( + bytes "bytes" + context "context" + errors "errors" + jsonpb "github.com/gogo/protobuf/jsonpb" + proto "github.com/gogo/protobuf/proto" + drpc "storj.io/drpc" + drpcerr "storj.io/drpc/drpcerr" +) + +type drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto struct{} + +func (drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto) Marshal(msg drpc.Message) ([]byte, error) { + return proto.Marshal(msg.(proto.Message)) +} + +func (drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto) Unmarshal(buf []byte, msg drpc.Message) error { + return proto.Unmarshal(buf, msg.(proto.Message)) +} + +func (drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto) JSONMarshal(msg drpc.Message) ([]byte, error) { + var buf bytes.Buffer + err := new(jsonpb.Marshaler).Marshal(&buf, msg.(proto.Message)) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto) JSONUnmarshal(buf []byte, msg drpc.Message) error { + return jsonpb.Unmarshal(bytes.NewReader(buf), msg.(proto.Message)) +} + +type DRPCSpaceClient interface { + DRPCConn() drpc.Conn + + HeadSync(ctx context.Context, in *HeadSyncRequest) (*HeadSyncResponse, error) + Stream(ctx context.Context) (DRPCSpace_StreamClient, error) +} + +type drpcSpaceClient struct { + cc drpc.Conn +} + +func NewDRPCSpaceClient(cc drpc.Conn) DRPCSpaceClient { + return &drpcSpaceClient{cc} +} + +func (c *drpcSpaceClient) DRPCConn() drpc.Conn { return c.cc } + +func (c *drpcSpaceClient) HeadSync(ctx context.Context, in *HeadSyncRequest) (*HeadSyncResponse, error) { + out := new(HeadSyncResponse) + err := c.cc.Invoke(ctx, "/anySpace.Space/HeadSync", drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}, in, out) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *drpcSpaceClient) Stream(ctx context.Context) (DRPCSpace_StreamClient, error) { + stream, err := c.cc.NewStream(ctx, "/anySpace.Space/Stream", drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}) + if err != nil { + return nil, err + } + x := &drpcSpace_StreamClient{stream} + return x, nil +} + +type DRPCSpace_StreamClient interface { + drpc.Stream + Send(*Msg) error + Recv() (*Msg, error) +} + +type drpcSpace_StreamClient struct { + drpc.Stream +} + +func (x *drpcSpace_StreamClient) Send(m *Msg) error { + return x.MsgSend(m, drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}) +} + +func (x *drpcSpace_StreamClient) Recv() (*Msg, error) { + m := new(Msg) + if err := x.MsgRecv(m, drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}); err != nil { + return nil, err + } + return m, nil +} + +func (x *drpcSpace_StreamClient) RecvMsg(m *Msg) error { + return x.MsgRecv(m, drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}) +} + +type DRPCSpaceServer interface { + HeadSync(context.Context, *HeadSyncRequest) (*HeadSyncResponse, error) + Stream(DRPCSpace_StreamStream) error +} + +type DRPCSpaceUnimplementedServer struct{} + +func (s *DRPCSpaceUnimplementedServer) HeadSync(context.Context, *HeadSyncRequest) (*HeadSyncResponse, error) { + return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) +} + +func (s *DRPCSpaceUnimplementedServer) Stream(DRPCSpace_StreamStream) error { + return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) +} + +type DRPCSpaceDescription struct{} + +func (DRPCSpaceDescription) NumMethods() int { return 2 } + +func (DRPCSpaceDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { + switch n { + case 0: + return "/anySpace.Space/HeadSync", drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}, + func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { + return srv.(DRPCSpaceServer). + HeadSync( + ctx, + in1.(*HeadSyncRequest), + ) + }, DRPCSpaceServer.HeadSync, true + case 1: + return "/anySpace.Space/Stream", drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}, + func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { + return nil, srv.(DRPCSpaceServer). + Stream( + &drpcSpace_StreamStream{in1.(drpc.Stream)}, + ) + }, DRPCSpaceServer.Stream, true + default: + return "", nil, nil, nil, false + } +} + +func DRPCRegisterSpace(mux drpc.Mux, impl DRPCSpaceServer) error { + return mux.Register(impl, DRPCSpaceDescription{}) +} + +type DRPCSpace_HeadSyncStream interface { + drpc.Stream + SendAndClose(*HeadSyncResponse) error +} + +type drpcSpace_HeadSyncStream struct { + drpc.Stream +} + +func (x *drpcSpace_HeadSyncStream) SendAndClose(m *HeadSyncResponse) error { + if err := x.MsgSend(m, drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}); err != nil { + return err + } + return x.CloseSend() +} + +type DRPCSpace_StreamStream interface { + drpc.Stream + Send(*Msg) error + Recv() (*Msg, error) +} + +type drpcSpace_StreamStream struct { + drpc.Stream +} + +func (x *drpcSpace_StreamStream) Send(m *Msg) error { + return x.MsgSend(m, drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}) +} + +func (x *drpcSpace_StreamStream) Recv() (*Msg, error) { + m := new(Msg) + if err := x.MsgRecv(m, drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}); err != nil { + return nil, err + } + return m, nil +} + +func (x *drpcSpace_StreamStream) RecvMsg(m *Msg) error { + return x.MsgRecv(m, drpcEncoding_File_common_commonspace_spacesyncproto_protos_spacesync_proto{}) +} diff --git a/service/net/dialer/dialer.go b/common/net/dialer/dialer.go similarity index 90% rename from service/net/dialer/dialer.go rename to common/net/dialer/dialer.go index ade01444..80d0d6f2 100644 --- a/service/net/dialer/dialer.go +++ b/common/net/dialer/dialer.go @@ -5,9 +5,9 @@ import ( "errors" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" "github.com/libp2p/go-libp2p-core/sec" "go.uber.org/zap" "net" @@ -39,7 +39,7 @@ type dialer struct { mu sync.RWMutex } -func (d *dialer) Init(ctx context.Context, a *app.App) (err error) { +func (d *dialer) Init(a *app.App) (err error) { d.transport = a.MustComponent(secure.CName).(secure.Service) nodes := a.MustComponent(config.CName).(*config.Config).Nodes d.peerAddrs = map[string][]string{} diff --git a/service/net/peer/peer.go b/common/net/peer/peer.go similarity index 100% rename from service/net/peer/peer.go rename to common/net/peer/peer.go diff --git a/service/net/pool/pool.go b/common/net/pool/pool.go similarity index 81% rename from service/net/pool/pool.go rename to common/net/pool/pool.go index 6b7cfa67..d4e21bf4 100644 --- a/service/net/pool/pool.go +++ b/common/net/pool/pool.go @@ -5,9 +5,9 @@ import ( "errors" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/dialer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" "math/rand" "time" ) @@ -40,11 +40,17 @@ type pool struct { cache ocache.OCache } -func (p *pool) Init(ctx context.Context, a *app.App) (err error) { +func (p *pool) Init(a *app.App) (err error) { dialer := a.MustComponent(dialer.CName).(dialer.Dialer) - p.cache = ocache.New(func(ctx context.Context, id string) (value ocache.Object, err error) { - return dialer.Dial(ctx, id) - }, ocache.WithLogger(log.Sugar()), ocache.WithGCPeriod(time.Minute), ocache.WithTTL(time.Minute*5)) + p.cache = ocache.New( + func(ctx context.Context, id string) (value ocache.Object, err error) { + return dialer.Dial(ctx, id) + }, + ocache.WithLogger(log.Sugar()), + ocache.WithGCPeriod(time.Minute), + ocache.WithTTL(time.Minute*5), + ocache.WithRefCounter(false), + ) return nil } diff --git a/service/net/pool/pool_test.go b/common/net/pool/pool_test.go similarity index 95% rename from service/net/pool/pool_test.go rename to common/net/pool/pool_test.go index efc51d38..a7f4a528 100644 --- a/service/net/pool/pool_test.go +++ b/common/net/pool/pool_test.go @@ -5,8 +5,8 @@ import ( "errors" "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/dialer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "storj.io/drpc" @@ -160,7 +160,7 @@ func (d *dialerMock) UpdateAddrs(addrs map[string][]string) { return } -func (d *dialerMock) Init(ctx context.Context, a *app.App) (err error) { +func (d *dialerMock) Init(a *app.App) (err error) { return } diff --git a/service/net/rpc/server/drpcserver.go b/common/net/rpc/server/drpcserver.go similarity index 84% rename from service/net/rpc/server/drpcserver.go rename to common/net/rpc/server/drpcserver.go index 975c270f..5e6078ff 100644 --- a/service/net/rpc/server/drpcserver.go +++ b/common/net/rpc/server/drpcserver.go @@ -4,9 +4,9 @@ import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" + secure2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" "github.com/zeebo/errs" "go.uber.org/zap" "io" @@ -33,16 +33,16 @@ type DRPCServer interface { type drpcServer struct { config config.GrpcServer drpcServer *drpcserver.Server - transport secure.Service - listeners []secure.ContextListener + transport secure2.Service + listeners []secure2.ContextListener pool pool.Pool cancel func() *drpcmux.Mux } -func (s *drpcServer) Init(ctx context.Context, a *app.App) (err error) { +func (s *drpcServer) Init(a *app.App) (err error) { s.config = a.MustComponent(config.CName).(*config.Config).GrpcServer - s.transport = a.MustComponent(secure.CName).(secure.Service) + s.transport = a.MustComponent(secure2.CName).(secure2.Service) s.pool = a.MustComponent(pool.CName).(pool.Pool) return nil } @@ -65,7 +65,7 @@ func (s *drpcServer) Run(ctx context.Context) (err error) { return } -func (s *drpcServer) serve(ctx context.Context, lis secure.ContextListener) { +func (s *drpcServer) serve(ctx context.Context, lis secure2.ContextListener) { l := log.With(zap.String("localAddr", lis.Addr().String())) l.Info("drpc listener started") defer func() { @@ -89,7 +89,7 @@ func (s *drpcServer) serve(ctx context.Context, lis secure.ContextListener) { } continue } - if _, ok := err.(secure.HandshakeError); ok { + if _, ok := err.(secure2.HandshakeError); ok { l.Warn("listener handshake error", zap.Error(err)) continue } diff --git a/service/net/rpc/server/util.go b/common/net/rpc/server/util.go similarity index 100% rename from service/net/rpc/server/util.go rename to common/net/rpc/server/util.go diff --git a/service/net/rpc/server/util_windows.go b/common/net/rpc/server/util_windows.go similarity index 100% rename from service/net/rpc/server/util_windows.go rename to common/net/rpc/server/util_windows.go diff --git a/service/net/secure/context.go b/common/net/secure/context.go similarity index 100% rename from service/net/secure/context.go rename to common/net/secure/context.go diff --git a/service/net/secure/listener.go b/common/net/secure/listener.go similarity index 100% rename from service/net/secure/listener.go rename to common/net/secure/listener.go diff --git a/service/net/secure/service.go b/common/net/secure/service.go similarity index 96% rename from service/net/secure/service.go rename to common/net/secure/service.go index 5c93862b..40a30f8c 100644 --- a/service/net/secure/service.go +++ b/common/net/secure/service.go @@ -35,7 +35,7 @@ type service struct { key crypto.PrivKey } -func (s *service) Init(ctx context.Context, a *app.App) (err error) { +func (s *service) Init(a *app.App) (err error) { account := a.MustComponent(config.CName).(*config.Config).Account decoder := signingkey.NewEDPrivKeyDecoder() pkb, err := decoder.DecodeFromStringIntoBytes(account.SigningKey) diff --git a/service/configuration/configuration.go b/common/nodeconf/configuration.go similarity index 88% rename from service/configuration/configuration.go rename to common/nodeconf/configuration.go index c242d79d..5c055bbb 100644 --- a/service/configuration/configuration.go +++ b/common/nodeconf/configuration.go @@ -1,10 +1,10 @@ -package configuration +package nodeconf import ( "context" "fmt" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" "github.com/anytypeio/go-chash" ) @@ -13,7 +13,7 @@ func New() Service { } type Configuration interface { - // Id returns current configuration id + // Id returns current nodeconf id Id() string // AllPeers returns all peers by spaceId except current account AllPeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error) diff --git a/service/configuration/service.go b/common/nodeconf/service.go similarity index 87% rename from service/configuration/service.go rename to common/nodeconf/service.go index 9cd920f7..870223cb 100644 --- a/service/configuration/service.go +++ b/common/nodeconf/service.go @@ -1,16 +1,15 @@ -package configuration +package nodeconf import ( - "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" "github.com/anytypeio/go-chash" ) -const CName = "configuration" +const CName = "common.nodeconf" const ( partitionCount = 3000 @@ -32,7 +31,7 @@ type service struct { last Configuration } -func (s *service) Init(ctx context.Context, a *app.App) (err error) { +func (s *service) Init(a *app.App) (err error) { conf := a.MustComponent(config.CName).(*config.Config) s.accountId = conf.Account.PeerId s.pool = a.MustComponent(pool.CName).(pool.Pool) diff --git a/config/config.go b/config/config.go index 4f5ed612..b8a84e34 100644 --- a/config/config.go +++ b/config/config.go @@ -1,7 +1,6 @@ package config import ( - "context" "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" @@ -32,7 +31,7 @@ type Config struct { Space Space `yaml:"space"` } -func (c *Config) Init(ctx context.Context, a *app.App) (err error) { +func (c *Config) Init(a *app.App) (err error) { logger.NewNamed("config").Info(fmt.Sprint(c.Space)) return } diff --git a/node/nodespace/rpchandler.go b/node/nodespace/rpchandler.go new file mode 100644 index 00000000..082e7b90 --- /dev/null +++ b/node/nodespace/rpchandler.go @@ -0,0 +1,30 @@ +package nodespace + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" +) + +type rpcHandler struct { + s *service +} + +func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { + sp, err := r.s.GetSpace(ctx, req.SpaceId) + if err != nil { + return nil, err + } + return sp.SpaceSyncRpc().HeadSync(ctx, req) +} + +func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { + msg, err := stream.Recv() + if err != nil { + return err + } + sp, err := r.s.GetSpace(stream.Context(), msg.SpaceId) + if err != nil { + return err + } + return sp.SpaceSyncRpc().Stream(stream) +} diff --git a/node/nodespace/service.go b/node/nodespace/service.go new file mode 100644 index 00000000..27226bc2 --- /dev/null +++ b/node/nodespace/service.go @@ -0,0 +1,71 @@ +package nodespace + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache" + "time" +) + +const CName = "node.nodespace" + +var log = logger.NewNamed(CName) + +func New() Service { + return &service{} +} + +type Service interface { + GetSpace(ctx context.Context, id string) (commonspace.Space, error) + app.ComponentRunnable +} + +type service struct { + conf config.Space + cache ocache.OCache + commonSpace commonspace.Service +} + +func (s *service) Init(a *app.App) (err error) { + s.conf = a.MustComponent(config.CName).(*config.Config).Space + s.commonSpace = a.MustComponent(commonspace.CName).(commonspace.Service) + s.cache = ocache.New( + func(ctx context.Context, id string) (value ocache.Object, err error) { + return s.commonSpace.CreateSpace(ctx, id) + }, + ocache.WithLogger(log.Sugar()), + ocache.WithGCPeriod(time.Minute), + ocache.WithTTL(time.Duration(s.conf.GCTTL)*time.Second), + ocache.WithRefCounter(false), + ) + return spacesyncproto.DRPCRegisterSpace(a.MustComponent(server.CName).(server.DRPCServer), &rpcHandler{s}) +} + +func (s *service) Name() (name string) { + return CName +} + +func (s *service) Run(ctx context.Context) (err error) { + go func() { + time.Sleep(time.Second * 5) + _, _ = s.GetSpace(ctx, "testDSpace") + }() + return +} + +func (s *service) GetSpace(ctx context.Context, id string) (commonspace.Space, error) { + v, err := s.cache.Get(ctx, id) + if err != nil { + return nil, err + } + return v.(commonspace.Space), nil +} + +func (s *service) Close(ctx context.Context) (err error) { + return s.cache.Close() +} diff --git a/pkg/ocache/ocache.go b/pkg/ocache/ocache.go index 4f0cf0ba..44dc9d42 100644 --- a/pkg/ocache/ocache.go +++ b/pkg/ocache/ocache.go @@ -45,6 +45,12 @@ var WithGCPeriod = func(gcPeriod time.Duration) Option { } } +var WithRefCounter = func(enable bool) Option { + return func(cache *oCache) { + cache.noRefCounter = !enable + } +} + func New(loadFunc LoadFunc, opts ...Option) OCache { c := &oCache{ data: make(map[string]*entry), @@ -124,15 +130,16 @@ type OCache interface { } type oCache struct { - mu sync.Mutex - data map[string]*entry - loadFunc LoadFunc - timeNow func() time.Time - ttl time.Duration - gc time.Duration - closed bool - closeCh chan struct{} - log *zap.SugaredLogger + mu sync.Mutex + data map[string]*entry + loadFunc LoadFunc + timeNow func() time.Time + ttl time.Duration + gc time.Duration + closed bool + closeCh chan struct{} + log *zap.SugaredLogger + noRefCounter bool } func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) { @@ -155,7 +162,9 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) { c.data[id] = e } e.lastUsage = c.timeNow() - e.refCount++ + if !c.noRefCounter { + e.refCount++ + } c.mu.Unlock() if load { @@ -206,7 +215,7 @@ func (c *oCache) Release(id string) bool { return false } if e, ok := c.data[id]; ok { - if e.refCount > 0 { + if !c.noRefCounter && e.refCount > 0 { e.refCount-- return true } diff --git a/service/api/service.go b/service/api/service.go index f1373e28..1b9f9c17 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -30,7 +30,7 @@ type service struct { cfg *config.Config } -func (s *service) Init(ctx context.Context, a *app.App) (err error) { +func (s *service) Init(a *app.App) (err error) { s.treeCache = a.MustComponent(treecache.CName).(treecache.Service) s.documentService = a.MustComponent(document.CName).(document.Service) s.cfg = a.MustComponent(config.CName).(*config.Config) diff --git a/service/document/service.go b/service/document/service.go index 03799c09..b1856b5f 100644 --- a/service/document/service.go +++ b/service/document/service.go @@ -4,11 +4,11 @@ import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/testchanges/testchangepb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" @@ -38,7 +38,7 @@ func New() app.Component { return &service{} } -func (s *service) Init(ctx context.Context, a *app.App) (err error) { +func (s *service) Init(a *app.App) (err error) { s.account = a.MustComponent(account.CName).(account.Service) s.messageService = a.MustComponent(message.CName).(message.Service) s.treeCache = a.MustComponent(treecache.CName).(treecache.Service) diff --git a/service/node/service.go b/service/node/service.go index 877c8860..8b86fa9f 100644 --- a/service/node/service.go +++ b/service/node/service.go @@ -44,7 +44,7 @@ type service struct { nodes []*Node } -func (s *service) Init(ctx context.Context, a *app.App) (err error) { +func (s *service) Init(a *app.App) (err error) { cfg := a.MustComponent(config.CName).(*config.Config) signDecoder := signingkey.NewEDPrivKeyDecoder() rsaDecoder := encryptionkey.NewRSAPrivKeyDecoder() diff --git a/service/space/context.go b/service/space/context.go deleted file mode 100644 index 50d076f1..00000000 --- a/service/space/context.go +++ /dev/null @@ -1,22 +0,0 @@ -package space - -import "context" - -type ctxKey int - -const ( - ctxKeySpaceId ctxKey = iota -) - -// CtxSpaceId gets spaceId from id. If spaceId is not found in context - it returns an empty string -func CtxSpaceId(ctx context.Context) (spaceId string) { - if val := ctx.Value(ctxKeySpaceId); val != nil { - return val.(string) - } - return -} - -// CtxWithSpaceId creates new context with spaceId value -func CtxWithSpaceId(ctx context.Context, spaceId string) context.Context { - return context.WithValue(ctx, ctxKeySpaceId, spaceId) -} diff --git a/service/space/rpc.go b/service/space/rpc.go deleted file mode 100644 index db056079..00000000 --- a/service/space/rpc.go +++ /dev/null @@ -1,18 +0,0 @@ -package space - -import ( - "context" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync" -) - -type rpcServer struct { - s *service -} - -func (r rpcServer) HeadSync(ctx context.Context, request *spacesync.HeadSyncRequest) (*spacesync.HeadSyncResponse, error) { - sp, err := r.s.get(ctx, request.SpaceId) - if err != nil { - return nil, err - } - return sp.HeadSync(ctx, request) -} diff --git a/service/space/service.go b/service/space/service.go deleted file mode 100644 index ae9be5a4..00000000 --- a/service/space/service.go +++ /dev/null @@ -1,76 +0,0 @@ -package space - -import ( - "context" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync" - "time" -) - -const CName = "space" - -var log = logger.NewNamed(CName) - -func New() Service { - return new(service) -} - -type Service interface { - app.ComponentRunnable -} - -type service struct { - conf config.Space - cache ocache.OCache - pool pool.Pool - confService configuration.Service -} - -func (s *service) Init(ctx context.Context, a *app.App) (err error) { - s.conf = a.MustComponent(config.CName).(*config.Config).Space - s.pool = a.MustComponent(pool.CName).(pool.Pool) - s.confService = a.MustComponent(configuration.CName).(configuration.Service) - ttlSec := time.Second * time.Duration(s.conf.GCTTL) - s.cache = ocache.New(s.loadSpace, ocache.WithTTL(ttlSec), ocache.WithGCPeriod(time.Minute), ocache.WithLogger(log.Sugar())) - spacesync.DRPCRegisterSpace(a.MustComponent(server.CName).(server.DRPCServer), rpcServer{s}) - return nil -} - -func (s *service) Name() (name string) { - return CName -} - -func (s *service) Run(ctx context.Context) (err error) { - go func() { - time.Sleep(time.Second * 10) - s.get(ctx, "testSpace") - }() - return -} - -func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object, err error) { - // TODO: load from database here - sp := &space{s: s, id: id, conf: s.confService.GetLast()} - if err = sp.Run(ctx); err != nil { - return nil, err - } - return sp, nil -} - -func (s *service) get(ctx context.Context, id string) (Space, error) { - obj, err := s.cache.Get(ctx, id) - if err != nil { - return nil, err - } - return obj.(Space), nil -} - -func (s *service) Close(ctx context.Context) (err error) { - return s.cache.Close() -} diff --git a/service/space/space.go b/service/space/space.go deleted file mode 100644 index 7be85ef4..00000000 --- a/service/space/space.go +++ /dev/null @@ -1,142 +0,0 @@ -package space - -import ( - "context" - "fmt" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/remotediff" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync" - "go.uber.org/zap" - "math/rand" - "sync" - "time" -) - -type Space interface { - Id() string - - HeadSync(ctx context.Context, req *spacesync.HeadSyncRequest) (*spacesync.HeadSyncResponse, error) - - Close() error -} - -// - -type space struct { - id string - conf configuration.Configuration - diff ldiff.Diff - diffHandler func() - syncCtx context.Context - syncCancel func() - syncLoopDone chan struct{} - s *service - mu sync.RWMutex -} - -func (s *space) Id() string { - return s.id -} - -func (s *space) Run(ctx context.Context) error { - s.diff = ldiff.New(16, 16) - s.syncCtx, s.syncCancel = context.WithCancel(context.Background()) - s.syncLoopDone = make(chan struct{}) - s.testFill() - go s.syncLoop() - return nil -} - -func (s *space) HeadSync(ctx context.Context, req *spacesync.HeadSyncRequest) (*spacesync.HeadSyncResponse, error) { - return remotediff.HandlerRangeRequest(ctx, s.diff, req) -} - -func (s *space) testFill() { - var n = 1000 - var els = make([]ldiff.Element, 0, n) - rand.Seed(time.Now().UnixNano()) - for i := 0; i < n; i++ { - if rand.Intn(n) > 2 { - id := fmt.Sprintf("%s.%d", s.id, i) - head := "head." + id - if rand.Intn(n) > n-100 { - head += ".modified" - } - el := ldiff.Element{ - Id: id, - Head: head, - } - els = append(els, el) - } - } - s.diff.Set(els...) -} - -func (s *space) syncLoop() { - defer close(s.syncLoopDone) - doSync := func() { - ctx, cancel := context.WithTimeout(s.syncCtx, time.Minute) - defer cancel() - if err := s.sync(ctx); err != nil { - log.Error("periodic sync error", zap.Error(err), zap.String("spaceId", s.id)) - } - } - doSync() - if s.s.conf.SyncPeriod > 0 { - ticker := time.NewTicker(time.Second * time.Duration(s.s.conf.SyncPeriod)) - defer ticker.Stop() - for { - select { - case <-s.syncCtx.Done(): - return - case <-ticker.C: - doSync() - } - } - } -} - -func (s *space) sync(ctx context.Context) error { - peers, err := s.getPeers(ctx) - if err != nil { - return err - } - for _, p := range peers { - if err := s.syncWithPeer(ctx, p); err != nil { - log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) - } - } - return nil -} - -func (s *space) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { - cl := spacesync.NewDRPCSpaceClient(p) - rdiff := remotediff.NewRemoteDiff(s.id, cl) - newIds, changedIds, removedIds, err := s.diff.Diff(ctx, rdiff) - if err != nil { - return nil - } - log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), zap.Int("removedIds", len(removedIds))) - return -} - -func (s *space) getPeers(ctx context.Context) (peers []peer.Peer, err error) { - if s.conf.IsResponsible(s.id) { - return s.conf.AllPeers(ctx, s.id) - } else { - var p peer.Peer - p, err = s.conf.OnePeer(ctx, s.id) - if err != nil { - return nil, err - } - return []peer.Peer{p}, nil - } -} - -func (s *space) Close() error { - s.syncCancel() - <-s.syncLoopDone - return nil -} diff --git a/service/space/spacesync/spacesync_drpc.pb.go b/service/space/spacesync/spacesync_drpc.pb.go deleted file mode 100644 index a083c7e3..00000000 --- a/service/space/spacesync/spacesync_drpc.pb.go +++ /dev/null @@ -1,113 +0,0 @@ -// Code generated by protoc-gen-go-drpc. DO NOT EDIT. -// protoc-gen-go-drpc version: v0.0.32 -// source: service/space/spacesync/protos/spacesync.proto - -package spacesync - -import ( - bytes "bytes" - context "context" - errors "errors" - jsonpb "github.com/gogo/protobuf/jsonpb" - proto "github.com/gogo/protobuf/proto" - drpc "storj.io/drpc" - drpcerr "storj.io/drpc/drpcerr" -) - -type drpcEncoding_File_service_space_spacesync_protos_spacesync_proto struct{} - -func (drpcEncoding_File_service_space_spacesync_protos_spacesync_proto) Marshal(msg drpc.Message) ([]byte, error) { - return proto.Marshal(msg.(proto.Message)) -} - -func (drpcEncoding_File_service_space_spacesync_protos_spacesync_proto) Unmarshal(buf []byte, msg drpc.Message) error { - return proto.Unmarshal(buf, msg.(proto.Message)) -} - -func (drpcEncoding_File_service_space_spacesync_protos_spacesync_proto) JSONMarshal(msg drpc.Message) ([]byte, error) { - var buf bytes.Buffer - err := new(jsonpb.Marshaler).Marshal(&buf, msg.(proto.Message)) - if err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -func (drpcEncoding_File_service_space_spacesync_protos_spacesync_proto) JSONUnmarshal(buf []byte, msg drpc.Message) error { - return jsonpb.Unmarshal(bytes.NewReader(buf), msg.(proto.Message)) -} - -type DRPCSpaceClient interface { - DRPCConn() drpc.Conn - - HeadSync(ctx context.Context, in *HeadSyncRequest) (*HeadSyncResponse, error) -} - -type drpcSpaceClient struct { - cc drpc.Conn -} - -func NewDRPCSpaceClient(cc drpc.Conn) DRPCSpaceClient { - return &drpcSpaceClient{cc} -} - -func (c *drpcSpaceClient) DRPCConn() drpc.Conn { return c.cc } - -func (c *drpcSpaceClient) HeadSync(ctx context.Context, in *HeadSyncRequest) (*HeadSyncResponse, error) { - out := new(HeadSyncResponse) - err := c.cc.Invoke(ctx, "/anySpace.Space/HeadSync", drpcEncoding_File_service_space_spacesync_protos_spacesync_proto{}, in, out) - if err != nil { - return nil, err - } - return out, nil -} - -type DRPCSpaceServer interface { - HeadSync(context.Context, *HeadSyncRequest) (*HeadSyncResponse, error) -} - -type DRPCSpaceUnimplementedServer struct{} - -func (s *DRPCSpaceUnimplementedServer) HeadSync(context.Context, *HeadSyncRequest) (*HeadSyncResponse, error) { - return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) -} - -type DRPCSpaceDescription struct{} - -func (DRPCSpaceDescription) NumMethods() int { return 1 } - -func (DRPCSpaceDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { - switch n { - case 0: - return "/anySpace.Space/HeadSync", drpcEncoding_File_service_space_spacesync_protos_spacesync_proto{}, - func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { - return srv.(DRPCSpaceServer). - HeadSync( - ctx, - in1.(*HeadSyncRequest), - ) - }, DRPCSpaceServer.HeadSync, true - default: - return "", nil, nil, nil, false - } -} - -func DRPCRegisterSpace(mux drpc.Mux, impl DRPCSpaceServer) error { - return mux.Register(impl, DRPCSpaceDescription{}) -} - -type DRPCSpace_HeadSyncStream interface { - drpc.Stream - SendAndClose(*HeadSyncResponse) error -} - -type drpcSpace_HeadSyncStream struct { - drpc.Stream -} - -func (x *drpcSpace_HeadSyncStream) SendAndClose(m *HeadSyncResponse) error { - if err := x.MsgSend(m, drpcEncoding_File_service_space_spacesync_protos_spacesync_proto{}); err != nil { - return err - } - return x.CloseSend() -} diff --git a/service/sync/message/service.go b/service/sync/message/service.go index 3dff9767..6e1ee015 100644 --- a/service/sync/message/service.go +++ b/service/sync/message/service.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + pool2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" @@ -21,7 +22,7 @@ const CName = "MessageService" type service struct { nodes []*node.Node requestHandler requesthandler.RequestHandler - pool pool.Pool + pool pool2.Pool sync.RWMutex } @@ -34,10 +35,10 @@ type Service interface { SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error } -func (s *service) Init(ctx context.Context, a *app.App) (err error) { +func (s *service) Init(a *app.App) (err error) { s.requestHandler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler) s.nodes = a.MustComponent(node.CName).(node.Service).Nodes() - s.pool = a.MustComponent(pool.CName).(pool.Pool) + s.pool = a.MustComponent(pool2.CName).(pool2.Pool) s.pool.AddHandler(syncproto.MessageType_MessageTypeSync, s.HandleMessage) return nil } diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index 2a960104..f7d992e0 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -4,11 +4,11 @@ import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" @@ -38,7 +38,7 @@ type MessageSender interface { const CName = "SyncRequestHandler" -func (r *requestHandler) Init(ctx context.Context, a *app.App) (err error) { +func (r *requestHandler) Init(a *app.App) (err error) { r.treeCache = a.MustComponent(treecache.CName).(treecache.Service) r.account = a.MustComponent(account.CName).(account.Service) r.messageService = a.MustComponent("MessageService").(MessageSender) diff --git a/service/treecache/service.go b/service/treecache/service.go index 8a440107..0a422049 100644 --- a/service/treecache/service.go +++ b/service/treecache/service.go @@ -2,82 +2,88 @@ package treecache import ( "context" - "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list" - aclstorage "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/storage" "go.uber.org/zap" ) const CName = "treecache" -type ObjFunc = func(obj interface{}) error +// TODO: add context +type ACLTreeFunc = func(tree acltree.ACLTree) error +type ChangeBuildFunc = func(builder acltree.ChangeBuilder) error var log = logger.NewNamed("treecache") type Service interface { - Do(ctx context.Context, id string, f ObjFunc) error - Add(ctx context.Context, id string, payload any) error + Do(ctx context.Context, treeId string, f ACLTreeFunc) error + Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error + Create(ctx context.Context, build ChangeBuildFunc, f ACLTreeFunc) error } type service struct { - storage storage.Service - account account.Service - cache ocache.OCache + treeProvider treestorage.Provider + account account.Service + cache ocache.OCache } func New() app.ComponentRunnable { return &service{} } -func (s *service) Do(ctx context.Context, treeId string, f ObjFunc) error { +func (s *service) Create(ctx context.Context, build ChangeBuildFunc, f ACLTreeFunc) error { + acc := s.account.Account() + st, err := acltree.CreateNewTreeStorageWithACL(acc, build, s.treeProvider.CreateTreeStorage) + if err != nil { + return err + } + + id, err := st.TreeID() + if err != nil { + return err + } + + return s.Do(ctx, id, f) +} + +func (s *service) Do(ctx context.Context, treeId string, f ACLTreeFunc) error { log. With(zap.String("treeId", treeId)). Debug("requesting tree from cache to perform operation") - t, err := s.cache.Get(ctx, treeId) + tree, err := s.cache.Get(ctx, treeId) defer s.cache.Release(treeId) if err != nil { return err } - return f(t) + aclTree := tree.(acltree.ACLTree) + aclTree.Lock() + defer aclTree.Unlock() + return f(tree.(acltree.ACLTree)) } -func (s *service) Add(ctx context.Context, treeId string, payload any) error { - switch pl := payload.(type) { - case aclstorage.TreeStorageCreatePayload: - log. - With(zap.String("treeId", treeId), zap.Int("len(changes)", len(pl.Changes))). - Debug("adding Tree with changes") - - _, err := s.storage.CreateTreeStorage(payload.(aclstorage.TreeStorageCreatePayload)) - if err != nil { - return err - } - case aclstorage.ACLListStorageCreatePayload: - log. - With(zap.String("treeId", treeId), zap.Int("len(changes)", len(pl.Records))). - Debug("adding ACLList with records") - - _, err := s.storage.CreateACLListStorage(payload.(aclstorage.ACLListStorageCreatePayload)) - if err != nil { - return err - } +func (s *service) Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error { + log. + With(zap.String("treeId", treeId), zap.Int("len(changes)", len(changes))). + Debug("adding tree with changes") + _, err := s.treeProvider.CreateTreeStorage(treeId, header, changes) + if err != nil { + return err } - return nil + return s.Do(ctx, treeId, f) } -func (s *service) Init(ctx context.Context, a *app.App) (err error) { +func (s *service) Init(a *app.App) (err error) { s.cache = ocache.New(s.loadTree) s.account = a.MustComponent(account.CName).(account.Service) - s.storage = a.MustComponent(storage.CName).(storage.Service) + s.treeProvider = treestorage.NewInMemoryTreeStorageProvider() // TODO: for test we should load some predefined keys return nil } @@ -95,33 +101,11 @@ func (s *service) Close(ctx context.Context) (err error) { } func (s *service) loadTree(ctx context.Context, id string) (ocache.Object, error) { - t, err := s.storage.Storage(id) + tree, err := s.treeProvider.TreeStorage(id) if err != nil { return nil, err } - header, err := t.Header() - if err != nil { - return nil, err - } - - switch header.DocType { // handler - case aclpb.Header_ACL: - return list.BuildACLListWithIdentity(s.account.Account(), t.(aclstorage.ListStorage)) - case aclpb.Header_DocTree: - break - default: - return nil, fmt.Errorf("incorrect type") - } - log.Info("got header", zap.String("header", header.String())) - var objTree tree.ObjectTree - err = s.Do(ctx, header.AclListId, func(obj interface{}) error { - aclList := obj.(list.ACLList) - objTree, err = tree.BuildObjectTree(t.(aclstorage.TreeStorage), nil, aclList) - if err != nil { - return err - } - return nil - }) - - return objTree, err + // TODO: should probably accept nil listeners + aclTree, err := acltree.BuildACLTree(tree, s.account.Account(), acltree.NoOpListener{}) + return aclTree, err }