mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
98 lines
2.7 KiB
Go
98 lines
2.7 KiB
Go
//go:generate mockgen -destination mock_nodeclient/mock_nodeclient.go github.com/anyproto/any-sync/node/nodeclient NodeClient
|
|
package nodeclient
|
|
|
|
import (
|
|
"context"
|
|
|
|
"storj.io/drpc"
|
|
|
|
"github.com/anyproto/any-sync/app"
|
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
|
"github.com/anyproto/any-sync/consensus/consensusproto"
|
|
"github.com/anyproto/any-sync/net/pool"
|
|
"github.com/anyproto/any-sync/net/rpc/rpcerr"
|
|
"github.com/anyproto/any-sync/nodeconf"
|
|
)
|
|
|
|
const CName = "common.node.nodeclient"
|
|
|
|
func New() NodeClient {
|
|
return &nodeClient{}
|
|
}
|
|
|
|
type NodeClient interface {
|
|
app.Component
|
|
AclGetRecords(ctx context.Context, spaceId, aclHead string) (recs []*consensusproto.RawRecordWithId, err error)
|
|
AclAddRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (recWithId *consensusproto.RawRecordWithId, err error)
|
|
}
|
|
|
|
type nodeClient struct {
|
|
pool pool.Service
|
|
nodeConf nodeconf.Service
|
|
}
|
|
|
|
func (c *nodeClient) Init(a *app.App) (err error) {
|
|
c.pool = a.MustComponent(pool.CName).(pool.Service)
|
|
c.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
|
|
return
|
|
}
|
|
|
|
func (c *nodeClient) Name() (name string) {
|
|
return CName
|
|
}
|
|
|
|
func (c *nodeClient) AclGetRecords(ctx context.Context, spaceId, aclHead string) (recs []*consensusproto.RawRecordWithId, err error) {
|
|
err = clientDo(c, ctx, spaceId, func(cl spacesyncproto.DRPCSpaceSyncClient) error {
|
|
resp, err := cl.AclGetRecords(ctx, &spacesyncproto.AclGetRecordsRequest{
|
|
SpaceId: spaceId,
|
|
AclHead: aclHead,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
recs = make([]*consensusproto.RawRecordWithId, len(resp.Records))
|
|
for i, rec := range resp.Records {
|
|
recs[i] = &consensusproto.RawRecordWithId{}
|
|
if err = recs[i].Unmarshal(rec); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
func (c *nodeClient) AclAddRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (recWithId *consensusproto.RawRecordWithId, err error) {
|
|
data, err := rec.Marshal()
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = clientDo(c, ctx, spaceId, func(cl spacesyncproto.DRPCSpaceSyncClient) error {
|
|
res, err := cl.AclAddRecord(ctx, &spacesyncproto.AclAddRecordRequest{
|
|
SpaceId: spaceId,
|
|
Payload: data,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
recWithId = &consensusproto.RawRecordWithId{
|
|
Payload: res.Payload,
|
|
Id: res.RecordId,
|
|
}
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
var clientDo = (*nodeClient).doClient
|
|
|
|
func (c *nodeClient) doClient(ctx context.Context, spaceId string, f func(cl spacesyncproto.DRPCSpaceSyncClient) error) error {
|
|
p, err := c.pool.GetOneOf(ctx, c.nodeConf.NodeIds(spaceId))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return p.DoDrpc(ctx, func(conn drpc.Conn) error {
|
|
err := f(spacesyncproto.NewDRPCSpaceSyncClient(conn))
|
|
return rpcerr.Unwrap(err)
|
|
})
|
|
}
|