1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-07 21:37:04 +09:00

GO-3886 optimisations

This commit is contained in:
Roman Khafizianov 2024-09-02 21:23:25 +02:00
parent 718a8bb215
commit 85cfc9c46a
No known key found for this signature in database
GPG key ID: F07A7D55A2684852
8 changed files with 106 additions and 59 deletions

View file

@ -34,7 +34,7 @@ type NetInterfaceWithAddrCache struct {
}
type InterfacesAddrs struct {
Interfaces []NetInterfaceWithAddrCache
Addrs []net.Addr // addrs without attachment to specific interface. Used as a fallback mechanism
Addrs []net.Addr // addrs without attachment to specific interface. Used as cheap(1 syscall) way to check if smth has changed
}
func WrapInterface(iface net.Interface) NetInterfaceWithAddrCache {
@ -77,6 +77,25 @@ func (i NetInterfaceWithAddrCache) GetAddr() []net.Addr {
return i.cachedAddrs
}
func NetAddrsEqualUnordered(a, b []net.Addr) bool {
if len(a) != len(b) {
return false
}
for _, addr := range a {
found := false
for _, addr2 := range b {
if addr.String() == addr2.String() {
found = true
break
}
}
if !found {
return false
}
}
return true
}
func (i InterfacesAddrs) Equal(other InterfacesAddrs) bool {
if len(other.Interfaces) != len(i.Interfaces) {
return false
@ -316,14 +335,3 @@ func (i InterfacesAddrs) findInterfacePosByIP(ip net.IP) (pos int, equal bool) {
}
return -1, false
}
func filterInterfaces(ifaces []NetInterfaceWithAddrCache) []NetInterfaceWithAddrCache {
return slice.Filter(ifaces, func(iface NetInterfaceWithAddrCache) bool {
if iface.Flags&net.FlagUp != 0 && iface.Flags&net.FlagMulticast != 0 && iface.Flags&net.FlagLoopback == 0 {
if len(iface.GetAddr()) > 0 {
return true
}
}
return false
})
}

View file

@ -5,7 +5,6 @@ package addrs
import (
"net"
"slices"
)
func SetInterfaceAddrsGetter(getter InterfaceAddrsGetter) {}
@ -30,12 +29,6 @@ func GetInterfacesAddrs() (iAddrs InterfacesAddrs, err error) {
if err != nil {
return
}
iAddrs.Interfaces = filterInterfaces(WrapInterfaces(ifaces))
iAddrs.Interfaces = WrapInterfaces(ifaces)
return
}
func IsLoopBack(interfaces []net.Interface) bool {
return len(interfaces) == 1 && slices.ContainsFunc(interfaces, func(n net.Interface) bool {
return n.Flags&net.FlagLoopback != 0
})
}

View file

@ -78,6 +78,5 @@ func GetInterfacesAddrs() (addrs InterfacesAddrs, err error) {
addrs.Interfaces = append(addrs.Interfaces, ifaceWrapped)
}
addrs.Interfaces = filterInterfaces(addrs.Interfaces)
return
}

View file

@ -1,8 +1,13 @@
package localdiscovery
import (
gonet "net"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/anytype-heart/net/addrs"
"github.com/anyproto/anytype-heart/util/slice"
)
const (
@ -33,3 +38,17 @@ type LocalDiscovery interface {
Start() error // Start the local discovery. Used when automatic start is disabled.
app.ComponentRunnable
}
// filterMulticastInterfaces filters out interfaces that doesn't make sense to use for multicast discovery.
// Also filters out loopback interfaces to make less mess.
// Please note: this call do a number of underlying syscalls to get addrs for each interface, but they will be cached after first call.
func filterMulticastInterfaces(ifaces []addrs.NetInterfaceWithAddrCache) []addrs.NetInterfaceWithAddrCache {
return slice.Filter(ifaces, func(iface addrs.NetInterfaceWithAddrCache) bool {
if iface.Flags&gonet.FlagUp != 0 && iface.Flags&gonet.FlagMulticast != 0 && iface.Flags&gonet.FlagLoopback == 0 {
if len(iface.GetAddr()) > 0 {
return true
}
}
return false
})
}

View file

@ -7,9 +7,9 @@ import (
"context"
"fmt"
gonet "net"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/anyproto/any-sync/accountservice"
@ -58,10 +58,9 @@ type localDiscovery struct {
notifier Notifier
m sync.Mutex
anythingDiscovered atomic.Bool
hookMu sync.Mutex
hookState DiscoveryPossibility
hooks []HookCallback
hookMu sync.Mutex
hookState DiscoveryPossibility
hooks []HookCallback
}
func New() LocalDiscovery {
@ -76,7 +75,7 @@ func (l *localDiscovery) Init(a *app.App) (err error) {
l.manualStart = a.MustComponent(config.CName).(*config.Config).DontStartLocalNetworkSyncAutomatically
l.nodeConf = a.MustComponent(config.CName).(*config.Config).GetNodeConf()
l.peerId = a.MustComponent(accountservice.CName).(accountservice.Service).Account().PeerId
l.periodicCheck = periodicsync.NewPeriodicSync(5, 0, l.checkAddrs, log)
l.periodicCheck = periodicsync.NewPeriodicSync(5, 0, l.refreshInterfaces, log)
l.drpcServer = app.MustComponent[clientserver.ClientServer](a)
return
}
@ -154,16 +153,23 @@ func (l *localDiscovery) RegisterDiscoveryPossibilityHook(hook func(state Discov
l.hooks = append(l.hooks, hook)
}
func (l *localDiscovery) checkAddrs(ctx context.Context) (err error) {
func (l *localDiscovery) refreshInterfaces(ctx context.Context) (err error) {
newAddrs, err := addrs.GetInterfacesAddrs()
if !addrs.NetAddrsEqualUnordered(l.interfacesAddrs.Addrs, newAddrs.Addrs) {
// only replace existing interface structs in case if we have a different set of addresses
// this optimization allows to save syscalls to get addrs for every iface, as we have a cache
newAddrs.Interfaces = filterMulticastInterfaces(newAddrs.Interfaces)
newAddrs.SortInterfacesWithPriority(interfacesSortPriority)
fmt.Printf("#p2p local discovery: new interfaces(%d) %v\n", len(newAddrs.Interfaces), newAddrs.NetInterfaces())
newAddrs.SortInterfacesWithPriority(interfacesSortPriority)
l.notifyP2PPossibilityState(l.getP2PPossibility(newAddrs))
if newAddrs.Equal(l.interfacesAddrs) && l.server != nil {
return
}
l.notifyP2PPossibilityState(l.getP2PPossibility(newAddrs))
if newAddrs.Equal(l.interfacesAddrs) && l.server != nil {
// we do additional check after we filter and sort multicast interfaces
// so this equal check is more precise
return
}
l.interfacesAddrs = newAddrs
if l.server != nil {
l.cancel()
@ -223,9 +229,9 @@ func (l *localDiscovery) startServer() (err error) {
l.ipv4, // do not include ipv6 addresses, because they are disabled
nil,
l.interfacesAddrs.NetInterfaces(),
zeroconf.TTL(60),
zeroconf.TTL(3600), // big ttl because we don't have re-broadcasting
zeroconf.ServerSelectIPTraffic(zeroconf.IPv4), // disable ipv6 for now
zeroconf.WriteTimeout(time.Second*1),
zeroconf.WriteTimeout(time.Second*3),
)
return
}
@ -265,44 +271,50 @@ func (l *localDiscovery) readAnswers(ch chan *zeroconf.ServiceEntry) {
func (l *localDiscovery) browse(ctx context.Context, ch chan *zeroconf.ServiceEntry) {
defer l.closeWait.Done()
newAddrs, err := addrs.GetInterfacesAddrs()
if err != nil {
return
}
newAddrs.SortInterfacesWithPriority(interfacesSortPriority)
if err := zeroconf.Browse(ctx, serviceName, mdnsDomain, ch,
zeroconf.ClientWriteTimeout(time.Second*1),
zeroconf.SelectIfaces(newAddrs.NetInterfaces()),
zeroconf.ClientWriteTimeout(time.Second*3),
zeroconf.SelectIfaces(l.interfacesAddrs.NetInterfaces()),
zeroconf.SelectIPTraffic(zeroconf.IPv4)); err != nil {
log.Error("browsing failed", zap.Error(err))
}
}
func (l *localDiscovery) GetOwnAddresses() OwnAddresses {
return OwnAddresses{
Addrs: l.ipv4,
Port: l.port,
}
}
func (l *localDiscovery) getP2PPossibility(newAddrs addrs.InterfacesAddrs) DiscoveryPossibility {
// get wlan or eth interfaces
// some sophisticated logic for ios, because of possible Local Network Restrictions
var err error
interfaces := newAddrs.Interfaces
for _, iface := range interfaces {
if runtime.GOOS == "ios" {
// on ios we have to check only en interfaces
if !strings.HasPrefix(iface.Name, "en") {
// en1 used for wifi
// en2 used for wired connection
continue
}
}
addrs := iface.GetAddr()
if len(addrs) == 0 {
continue
}
if strings.HasPrefix(iface.Name, "wlan") || strings.HasPrefix(iface.Name, "eth") || strings.HasPrefix(iface.Name, "en") {
for _, addr := range addrs {
if ip, ok := addr.(*gonet.IPNet); ok {
if ip.IP.To4() == nil {
continue
}
ipv4 := ip.IP.To4()
err = testSelfConnection(ipv4.String())
if err != nil {
log.Warn(fmt.Sprintf("self connection via %s to %s failed: %v", iface.Name, ipv4.String(), err))
} else {
return DiscoveryPossible
}
break
for _, addr := range addrs {
if ip, ok := addr.(*gonet.IPNet); ok {
ipv4 := ip.IP.To4()
if ipv4 == nil {
continue
}
err = testSelfConnection(ipv4.String())
if err != nil {
log.Warn(fmt.Sprintf("self connection via %s to %s failed: %v", iface.Name, ipv4.String(), err))
} else {
return DiscoveryPossible
}
break
}
}
}

View file

@ -66,6 +66,7 @@ func (l *localDiscovery) PeerDiscovered(peer DiscoveredPeer, own OwnAddresses) {
}
// TODO: move this to android side
newAddrs, err := addrs.GetInterfacesAddrs()
newAddrs = filterMulticastInterfaces(newAddrs)
l.notifyPeerToPeerStatus(newAddrs)
if err != nil {

View file

@ -8,7 +8,7 @@ import (
"time"
)
const expectedMessage = "Test message"
const expectedMessage = "test"
func handleConnection(conn net.Conn) error {
defer conn.Close()

View file

@ -0,0 +1,15 @@
package localdiscovery
import (
"testing"
"github.com/stretchr/testify/require"
)
func Test_testSelfConnection(t *testing.T) {
err := testSelfConnection("127.0.0.1")
require.NoError(t, err)
err = testSelfConnection("11.11.11.11")
require.Error(t, err)
}