1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-08 05:47:07 +09:00
anytype-heart/cmd/grpcserver/grpc.go
2025-05-20 14:29:01 +02:00

313 lines
8.5 KiB
Go

//go:build !nogrpcserver && !_test
// +build !nogrpcserver,!_test
package main
import (
"bufio"
"context"
"errors"
"fmt"
"net"
"net/http"
// nolint: gosec
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"strconv"
"syscall"
"time"
"github.com/anyproto/any-sync/app"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
"github.com/anyproto/anytype-heart/core"
"github.com/anyproto/anytype-heart/core/api"
"github.com/anyproto/anytype-heart/core/event"
"github.com/anyproto/anytype-heart/metrics"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pb/service"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/util/conc"
"github.com/anyproto/anytype-heart/util/grpcprocess"
"github.com/anyproto/anytype-heart/util/vcs"
)
const defaultAddr = "127.0.0.1:31007"
const defaultWebAddr = "127.0.0.1:31008"
// do not change this, js client relies on this msg to ensure that server is up
const grpcWebStartedMessagePrefix = "gRPC Web proxy started at: "
var log = logging.Logger("anytype-grpc-server")
var commonOSSignals = []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGINT}
func main() {
var addr string
var webaddr string
app.StartWarningAfter = time.Second * 5
fmt.Printf("mw grpc: %s\n", vcs.GetVCSInfo().Description())
if len(os.Args) > 1 {
addr = os.Args[1]
if len(os.Args) > 2 {
webaddr = os.Args[2]
}
}
if addr == "" {
if env := os.Getenv("ANYTYPE_GRPC_ADDR"); env != "" {
addr = env
} else {
addr = defaultAddr
}
}
if webaddr == "" {
if env := os.Getenv("ANYTYPE_GRPCWEB_ADDR"); env != "" {
webaddr = env
} else {
webaddr = defaultWebAddr
}
}
if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" {
fmt.Printf("Running GO debug HTTP server at: %s\n", debug)
go func() {
http.ListenAndServe(debug, nil)
}()
}
metrics.Service.InitWithKeys(metrics.DefaultInHouseKey)
var signalChan = make(chan os.Signal, 2)
signal.Notify(signalChan, signals...)
var mw = core.New()
mw.SetEventSender(event.NewGrpcSender())
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
addr = lis.Addr().String()
webLis, err := net.Listen("tcp", webaddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
webaddr = webLis.Addr().String()
var (
unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor
)
if metrics.Enabled {
unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
}
unaryInterceptors = append(unaryInterceptors, metrics.UnaryTraceInterceptor)
unaryInterceptors = append(unaryInterceptors, func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
resp, err = mw.Authorize(ctx, req, info, handler)
if err != nil {
log.Errorf("authorize: %s", err)
}
return
})
// todo: we may want to change it to the opposite check with a public release
if os.Getenv("ANYTYPE_GRPC_NO_DEBUG_TIMEOUT") != "1" {
unaryInterceptors = append(unaryInterceptors, metrics.LongMethodsInterceptor)
}
grpcDebug, _ := strconv.Atoi(os.Getenv("ANYTYPE_GRPC_LOG"))
if grpcDebug > 0 {
decider := func(_ context.Context, _ string, _ interface{}) bool {
return true
}
grpcLogger := logging.LoggerNotSugared("grpc")
unaryInterceptors = append(unaryInterceptors, grpc_zap.UnaryServerInterceptor(grpcLogger))
streamInterceptors = append(streamInterceptors, grpc_zap.StreamServerInterceptor(grpcLogger))
if grpcDebug > 1 {
unaryInterceptors = append(unaryInterceptors, grpc_zap.PayloadUnaryServerInterceptor(grpcLogger, decider))
}
if grpcDebug > 2 {
streamInterceptors = append(streamInterceptors, grpc_zap.PayloadStreamServerInterceptor(grpcLogger, decider))
}
}
grpcTrace, _ := strconv.Atoi(os.Getenv("ANYTYPE_GRPC_TRACE"))
if grpcTrace > 0 {
jLogger := jaeger.StdLogger
cfg, err := jaegercfg.FromEnv()
if err != nil {
log.Fatal(err)
}
if cfg.ServiceName == "" {
cfg.ServiceName = "mw"
}
// Initialize tracer with a logger and a metrics factory
tracer, closer, err := cfg.NewTracer(jaegercfg.Logger(jLogger))
if err != nil {
log.Fatal(err)
}
defer closer.Close()
var (
unaryOptions []otgrpc.Option
streamOptions []otgrpc.Option
)
// Set the singleton opentracing.Tracer with the Jaeger tracer.
opentracing.SetGlobalTracer(tracer)
if grpcTrace > 1 {
unaryOptions = append(unaryOptions, otgrpc.LogPayloads())
}
if grpcTrace > 2 {
streamOptions = append(streamOptions, otgrpc.LogPayloads())
}
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingServerInterceptor(tracer, unaryOptions...))
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamServerInterceptor(tracer, streamOptions...))
}
unaryInterceptors = appendInterceptor(unaryInterceptors, mw)
unaryInterceptors = append(unaryInterceptors, grpcprocess.ProcessInfoInterceptor(
"/anytype.ClientCommands/AccountLocalLinkNewChallenge",
))
server := grpc.NewServer(grpc.MaxRecvMsgSize(20*1024*1024),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
)
service.RegisterClientCommandsServer(server, mw)
if metrics.Enabled {
grpc_prometheus.EnableHandlingTimeHistogram()
// grpc_prometheus.Register(server)
}
webrpc := grpcweb.WrapServer(
server,
grpcweb.WithOriginFunc(func(origin string) bool {
return true
}),
grpcweb.WithWebsockets(true),
grpcweb.WithWebsocketOriginFunc(func(req *http.Request) bool {
return true
}))
proxy := &http.Server{
Addr: webaddr,
}
proxy.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if webrpc.IsGrpcWebRequest(r) ||
webrpc.IsAcceptableGrpcCorsRequest(r) ||
webrpc.IsGrpcWebSocketRequest(r) {
webrpc.ServeHTTP(w, r)
}
})
go func() {
server.Serve(lis)
}()
fmt.Println("gRPC server started at: " + addr)
go func() {
if err := proxy.Serve(webLis); err != nil && err != http.ErrServerClosed {
log.Fatalf("proxy error: %v", err)
}
}()
startReportMemory(mw)
api.SetMiddlewareParams(mw)
shutdown := func() {
server.Stop()
proxy.Close()
mw.AppShutdown(context.Background(), &pb.RpcAppShutdownRequest{})
}
// do not change this, js client relies on this msg to ensure that server is up and parse address
fmt.Println(grpcWebStartedMessagePrefix + webaddr)
if runtime.GOOS == "windows" {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
message := scanner.Text()
if message == "shutdown" {
fmt.Println("[anytype-heart] Shutdown: received shutdown msg, closing components...")
// Perform cleanup or exit
shutdown()
return
}
}
}
for {
sig := <-signalChan
if shouldSaveStack(sig) {
if err = mw.SaveGoroutinesStack(""); err != nil {
log.Errorf("failed to save stack of goroutines: %s", err)
}
continue
}
fmt.Printf("[anytype-heart] Shutdown: received OS signal (%s), closing components...\n", sig.String())
shutdown()
return
}
}
func appendInterceptor(
unaryInterceptors []grpc.UnaryServerInterceptor,
mw *core.Middleware,
) []grpc.UnaryServerInterceptor {
return append(unaryInterceptors, func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
if rerr, ok := r.(error); ok && errors.Is(rerr, core.ErrNotLoggedIn) {
resp = onNotLoggedInError(resp, rerr)
} else {
resp = onDefaultError(mw, r, resp)
}
}
}()
resp, err = handler(ctx, req)
return resp, err
})
}
func onDefaultError(mw *core.Middleware, r any, resp interface{}) interface{} {
conc.OnPanic(r)
resp = &pb.RpcGenericErrorResponse{
Error: &pb.RpcGenericErrorResponseError{
Code: pb.RpcGenericErrorResponseError_UNKNOWN_ERROR,
Description: "panic recovered",
},
}
return resp
}
func onNotLoggedInError(resp interface{}, rerr error) interface{} {
resp = &pb.RpcGenericErrorResponse{
Error: &pb.RpcGenericErrorResponseError{
Code: pb.RpcGenericErrorResponseError_UNKNOWN_ERROR,
Description: rerr.Error(),
},
}
return resp
}