1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Add IdentityRepo client method to coordinator client

This commit is contained in:
Sergey 2024-01-11 21:07:31 +01:00
parent ed0baeb6c8
commit 0bc7b2505a
No known key found for this signature in database
GPG key ID: 3B6BEF79160221C6

View file

@ -9,6 +9,7 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/coordinator/coordinatorproto"
"github.com/anyproto/any-sync/identityrepo/identityrepoproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/rpcerr"
@ -38,6 +39,9 @@ type CoordinatorClient interface {
FileLimitCheck(ctx context.Context, spaceId string, identity []byte) (response *coordinatorproto.FileLimitCheckResponse, err error)
NetworkConfiguration(ctx context.Context, currentId string) (*coordinatorproto.NetworkConfigurationResponse, error)
DeletionLog(ctx context.Context, lastRecordId string, limit int) (records []*coordinatorproto.DeletionLogRecord, err error)
IdentityRepoPut(ctx context.Context, identity string, data []*identityrepoproto.Data) (err error)
IdentityRepoGet(ctx context.Context, identities []string, kinds []string) (res []*identityrepoproto.DataWithIdentity, err error)
app.Component
}
@ -225,19 +229,66 @@ func (c *coordinatorClient) NetworkConfiguration(ctx context.Context, currentId
return
}
func (c *coordinatorClient) IdentityRepoPut(ctx context.Context, identity string, data []*identityrepoproto.Data) (err error) {
err = c.doIdentityRepoClient(ctx, func(cl identityrepoproto.DRPCIdentityRepoClient) error {
_, err := cl.DataPut(ctx, &identityrepoproto.DataPutRequest{
Identity: identity,
Data: data,
})
if err != nil {
return rpcerr.Unwrap(err)
}
return nil
})
return
}
func (c *coordinatorClient) IdentityRepoGet(ctx context.Context, identities, kinds []string) (res []*identityrepoproto.DataWithIdentity, err error) {
err = c.doIdentityRepoClient(ctx, func(cl identityrepoproto.DRPCIdentityRepoClient) error {
resp, err := cl.DataPull(ctx, &identityrepoproto.DataPullRequest{
Identities: identities,
Kinds: kinds,
})
if err != nil {
return rpcerr.Unwrap(err)
}
res = resp.GetData()
return nil
})
return
}
func (c *coordinatorClient) doClient(ctx context.Context, f func(cl coordinatorproto.DRPCCoordinatorClient) error) error {
p, err := c.pool.GetOneOf(ctx, c.nodeConf.CoordinatorPeers())
p, err := c.getPeer(ctx)
if err != nil {
return err
}
pubKey, err := peer.CtxPubKey(p.Context())
if err != nil {
return ErrPubKeyMissing
}
if pubKey.Network() != c.nodeConf.Configuration().NetworkId {
return ErrNetworkMismatched
}
return p.DoDrpc(ctx, func(conn drpc.Conn) error {
return f(coordinatorproto.NewDRPCCoordinatorClient(conn))
})
}
func (c *coordinatorClient) doIdentityRepoClient(ctx context.Context, f func(cl identityrepoproto.DRPCIdentityRepoClient) error) error {
p, err := c.getPeer(ctx)
if err != nil {
return err
}
return p.DoDrpc(ctx, func(conn drpc.Conn) error {
return f(identityrepoproto.NewDRPCIdentityRepoClient(conn))
})
}
func (c *coordinatorClient) getPeer(ctx context.Context) (peer.Peer, error) {
p, err := c.pool.GetOneOf(ctx, c.nodeConf.CoordinatorPeers())
if err != nil {
return nil, err
}
pubKey, err := peer.CtxPubKey(p.Context())
if err != nil {
return nil, ErrPubKeyMissing
}
if pubKey.Network() != c.nodeConf.Configuration().NetworkId {
return nil, ErrNetworkMismatched
}
return p, nil
}