mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-08 05:47:07 +09:00
313 lines
8.5 KiB
Go
313 lines
8.5 KiB
Go
//go:build !nogrpcserver && !_test
|
|
// +build !nogrpcserver,!_test
|
|
|
|
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
// nolint: gosec
|
|
_ "net/http/pprof"
|
|
"os"
|
|
"os/signal"
|
|
"runtime"
|
|
"strconv"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/anyproto/any-sync/app"
|
|
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
|
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
|
|
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
|
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
|
|
"github.com/improbable-eng/grpc-web/go/grpcweb"
|
|
"github.com/opentracing/opentracing-go"
|
|
"github.com/uber/jaeger-client-go"
|
|
jaegercfg "github.com/uber/jaeger-client-go/config"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/anyproto/anytype-heart/core"
|
|
"github.com/anyproto/anytype-heart/core/api"
|
|
"github.com/anyproto/anytype-heart/core/event"
|
|
"github.com/anyproto/anytype-heart/metrics"
|
|
"github.com/anyproto/anytype-heart/pb"
|
|
"github.com/anyproto/anytype-heart/pb/service"
|
|
"github.com/anyproto/anytype-heart/pkg/lib/logging"
|
|
"github.com/anyproto/anytype-heart/util/conc"
|
|
"github.com/anyproto/anytype-heart/util/grpcprocess"
|
|
"github.com/anyproto/anytype-heart/util/vcs"
|
|
)
|
|
|
|
const defaultAddr = "127.0.0.1:31007"
|
|
const defaultWebAddr = "127.0.0.1:31008"
|
|
|
|
// do not change this, js client relies on this msg to ensure that server is up
|
|
const grpcWebStartedMessagePrefix = "gRPC Web proxy started at: "
|
|
|
|
var log = logging.Logger("anytype-grpc-server")
|
|
var commonOSSignals = []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGINT}
|
|
|
|
func main() {
|
|
var addr string
|
|
var webaddr string
|
|
app.StartWarningAfter = time.Second * 5
|
|
fmt.Printf("mw grpc: %s\n", vcs.GetVCSInfo().Description())
|
|
if len(os.Args) > 1 {
|
|
addr = os.Args[1]
|
|
if len(os.Args) > 2 {
|
|
webaddr = os.Args[2]
|
|
}
|
|
}
|
|
|
|
if addr == "" {
|
|
if env := os.Getenv("ANYTYPE_GRPC_ADDR"); env != "" {
|
|
addr = env
|
|
} else {
|
|
addr = defaultAddr
|
|
}
|
|
}
|
|
|
|
if webaddr == "" {
|
|
if env := os.Getenv("ANYTYPE_GRPCWEB_ADDR"); env != "" {
|
|
webaddr = env
|
|
} else {
|
|
webaddr = defaultWebAddr
|
|
}
|
|
}
|
|
|
|
if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" {
|
|
fmt.Printf("Running GO debug HTTP server at: %s\n", debug)
|
|
go func() {
|
|
http.ListenAndServe(debug, nil)
|
|
}()
|
|
}
|
|
metrics.Service.InitWithKeys(metrics.DefaultInHouseKey)
|
|
|
|
var signalChan = make(chan os.Signal, 2)
|
|
signal.Notify(signalChan, signals...)
|
|
|
|
var mw = core.New()
|
|
mw.SetEventSender(event.NewGrpcSender())
|
|
|
|
lis, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
log.Fatalf("failed to listen: %v", err)
|
|
}
|
|
addr = lis.Addr().String()
|
|
|
|
webLis, err := net.Listen("tcp", webaddr)
|
|
if err != nil {
|
|
log.Fatalf("failed to listen: %v", err)
|
|
}
|
|
webaddr = webLis.Addr().String()
|
|
var (
|
|
unaryInterceptors []grpc.UnaryServerInterceptor
|
|
streamInterceptors []grpc.StreamServerInterceptor
|
|
)
|
|
|
|
if metrics.Enabled {
|
|
unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
|
|
}
|
|
unaryInterceptors = append(unaryInterceptors, metrics.UnaryTraceInterceptor)
|
|
unaryInterceptors = append(unaryInterceptors, func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
|
resp, err = mw.Authorize(ctx, req, info, handler)
|
|
if err != nil {
|
|
log.Errorf("authorize: %s", err)
|
|
}
|
|
return
|
|
})
|
|
|
|
// todo: we may want to change it to the opposite check with a public release
|
|
if os.Getenv("ANYTYPE_GRPC_NO_DEBUG_TIMEOUT") != "1" {
|
|
unaryInterceptors = append(unaryInterceptors, metrics.LongMethodsInterceptor)
|
|
}
|
|
|
|
grpcDebug, _ := strconv.Atoi(os.Getenv("ANYTYPE_GRPC_LOG"))
|
|
if grpcDebug > 0 {
|
|
decider := func(_ context.Context, _ string, _ interface{}) bool {
|
|
return true
|
|
}
|
|
|
|
grpcLogger := logging.LoggerNotSugared("grpc")
|
|
|
|
unaryInterceptors = append(unaryInterceptors, grpc_zap.UnaryServerInterceptor(grpcLogger))
|
|
streamInterceptors = append(streamInterceptors, grpc_zap.StreamServerInterceptor(grpcLogger))
|
|
if grpcDebug > 1 {
|
|
unaryInterceptors = append(unaryInterceptors, grpc_zap.PayloadUnaryServerInterceptor(grpcLogger, decider))
|
|
}
|
|
if grpcDebug > 2 {
|
|
streamInterceptors = append(streamInterceptors, grpc_zap.PayloadStreamServerInterceptor(grpcLogger, decider))
|
|
}
|
|
}
|
|
|
|
grpcTrace, _ := strconv.Atoi(os.Getenv("ANYTYPE_GRPC_TRACE"))
|
|
if grpcTrace > 0 {
|
|
jLogger := jaeger.StdLogger
|
|
|
|
cfg, err := jaegercfg.FromEnv()
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
if cfg.ServiceName == "" {
|
|
cfg.ServiceName = "mw"
|
|
}
|
|
// Initialize tracer with a logger and a metrics factory
|
|
tracer, closer, err := cfg.NewTracer(jaegercfg.Logger(jLogger))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer closer.Close()
|
|
|
|
var (
|
|
unaryOptions []otgrpc.Option
|
|
streamOptions []otgrpc.Option
|
|
)
|
|
|
|
// Set the singleton opentracing.Tracer with the Jaeger tracer.
|
|
opentracing.SetGlobalTracer(tracer)
|
|
if grpcTrace > 1 {
|
|
unaryOptions = append(unaryOptions, otgrpc.LogPayloads())
|
|
}
|
|
if grpcTrace > 2 {
|
|
streamOptions = append(streamOptions, otgrpc.LogPayloads())
|
|
}
|
|
|
|
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingServerInterceptor(tracer, unaryOptions...))
|
|
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamServerInterceptor(tracer, streamOptions...))
|
|
}
|
|
|
|
unaryInterceptors = appendInterceptor(unaryInterceptors, mw)
|
|
unaryInterceptors = append(unaryInterceptors, grpcprocess.ProcessInfoInterceptor(
|
|
"/anytype.ClientCommands/AccountLocalLinkNewChallenge",
|
|
))
|
|
|
|
server := grpc.NewServer(grpc.MaxRecvMsgSize(20*1024*1024),
|
|
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
|
|
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
|
|
)
|
|
|
|
service.RegisterClientCommandsServer(server, mw)
|
|
if metrics.Enabled {
|
|
grpc_prometheus.EnableHandlingTimeHistogram()
|
|
// grpc_prometheus.Register(server)
|
|
}
|
|
|
|
webrpc := grpcweb.WrapServer(
|
|
server,
|
|
grpcweb.WithOriginFunc(func(origin string) bool {
|
|
return true
|
|
}),
|
|
grpcweb.WithWebsockets(true),
|
|
grpcweb.WithWebsocketOriginFunc(func(req *http.Request) bool {
|
|
return true
|
|
}))
|
|
|
|
proxy := &http.Server{
|
|
Addr: webaddr,
|
|
}
|
|
|
|
proxy.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if webrpc.IsGrpcWebRequest(r) ||
|
|
webrpc.IsAcceptableGrpcCorsRequest(r) ||
|
|
webrpc.IsGrpcWebSocketRequest(r) {
|
|
webrpc.ServeHTTP(w, r)
|
|
}
|
|
})
|
|
|
|
go func() {
|
|
server.Serve(lis)
|
|
}()
|
|
fmt.Println("gRPC server started at: " + addr)
|
|
|
|
go func() {
|
|
if err := proxy.Serve(webLis); err != nil && err != http.ErrServerClosed {
|
|
log.Fatalf("proxy error: %v", err)
|
|
}
|
|
}()
|
|
|
|
startReportMemory(mw)
|
|
api.SetMiddlewareParams(mw)
|
|
|
|
shutdown := func() {
|
|
server.Stop()
|
|
proxy.Close()
|
|
mw.AppShutdown(context.Background(), &pb.RpcAppShutdownRequest{})
|
|
}
|
|
// do not change this, js client relies on this msg to ensure that server is up and parse address
|
|
fmt.Println(grpcWebStartedMessagePrefix + webaddr)
|
|
if runtime.GOOS == "windows" {
|
|
scanner := bufio.NewScanner(os.Stdin)
|
|
for scanner.Scan() {
|
|
message := scanner.Text()
|
|
if message == "shutdown" {
|
|
fmt.Println("[anytype-heart] Shutdown: received shutdown msg, closing components...")
|
|
// Perform cleanup or exit
|
|
shutdown()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
for {
|
|
sig := <-signalChan
|
|
if shouldSaveStack(sig) {
|
|
if err = mw.SaveGoroutinesStack(""); err != nil {
|
|
log.Errorf("failed to save stack of goroutines: %s", err)
|
|
}
|
|
continue
|
|
}
|
|
fmt.Printf("[anytype-heart] Shutdown: received OS signal (%s), closing components...\n", sig.String())
|
|
shutdown()
|
|
return
|
|
}
|
|
}
|
|
|
|
func appendInterceptor(
|
|
unaryInterceptors []grpc.UnaryServerInterceptor,
|
|
mw *core.Middleware,
|
|
) []grpc.UnaryServerInterceptor {
|
|
return append(unaryInterceptors, func(
|
|
ctx context.Context,
|
|
req interface{},
|
|
info *grpc.UnaryServerInfo,
|
|
handler grpc.UnaryHandler,
|
|
) (resp interface{}, err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
if rerr, ok := r.(error); ok && errors.Is(rerr, core.ErrNotLoggedIn) {
|
|
resp = onNotLoggedInError(resp, rerr)
|
|
} else {
|
|
resp = onDefaultError(mw, r, resp)
|
|
}
|
|
}
|
|
}()
|
|
|
|
resp, err = handler(ctx, req)
|
|
return resp, err
|
|
})
|
|
}
|
|
|
|
func onDefaultError(mw *core.Middleware, r any, resp interface{}) interface{} {
|
|
conc.OnPanic(r)
|
|
resp = &pb.RpcGenericErrorResponse{
|
|
Error: &pb.RpcGenericErrorResponseError{
|
|
Code: pb.RpcGenericErrorResponseError_UNKNOWN_ERROR,
|
|
Description: "panic recovered",
|
|
},
|
|
}
|
|
return resp
|
|
}
|
|
|
|
func onNotLoggedInError(resp interface{}, rerr error) interface{} {
|
|
resp = &pb.RpcGenericErrorResponse{
|
|
Error: &pb.RpcGenericErrorResponseError{
|
|
Code: pb.RpcGenericErrorResponseError_UNKNOWN_ERROR,
|
|
Description: rerr.Error(),
|
|
},
|
|
}
|
|
return resp
|
|
}
|