grpc使用自定义业务线程池

  • grpc默认的业务线程池是无限线程,大流量下容易线程爆炸
@Slf4j
@Component
public class GrpcServer {
    @Resource
    private Config config;
    @Resource
    private GrpcMetadataHandler metadataHandler;

    private ThreadPoolExecutor executorService;
    private Server server;

    // executorService拒绝任务时服务端会有错误日志,返回给客户端 http2.0 internal error,客户端会抛异常
    @PostConstruct
    private void init() {
        log.info("GrpcServer init");
        executorService = new ThreadPoolExecutor(
                config.getGrpcThreadCount(),
                config.getGrpcThreadCount(),
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(config.getGrpcThreadQueue()),
                new CustomizableThreadFactory("GrpcServer"));
        server = NettyServerBuilder.forPort(config.getGrpcPort())
//                .addService(metadataHandler)
                .executor(executorService)
                .maxConnectionIdle(10, TimeUnit.MINUTES)
                .build();
        GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(() -> log.info("GrpcServer queue count:{}", executorService.getQueue().size()), 0, 1, TimeUnit.SECONDS);
    }

    @PreDestroy
    private void destroy() throws InterruptedException {
        log.info("GrpcServer shutdown ...");
        // 会阻塞到所有任务执行完,除非和客户端的连接断了,断了的话不会执行线程池中未完成的任务
        server.shutdown().awaitTermination();
        executorService.shutdown();
        while (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
            log.error("GrpcServer shutdown wait queue:{}", executorService.getQueue().size());
        }
        log.info("GrpcServer shutdown finished");
    }

    public void start() throws IOException {
        server.start();
    }
}
  • 异常信息
2021-05-19 22:24:01.868  WARN 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : Exception in onHeadersRead()

java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@7cfe63 rejected from java.util.concurrent.ThreadPoolExecutor@7f241d90[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 9]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
    at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreatedInternal(ServerImpl.java:585)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreated(ServerImpl.java:475)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:456)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.access$900(NettyServerHandler.java:101)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$FrameListener.onHeadersRead(NettyServerHandler.java:813)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:373)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:65)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$1.processFragment(DefaultHttp2FrameReader.java:457)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:464)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

2021-05-19 22:24:01.873  WARN 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : Stream Error

io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Task io.grpc.internal.SerializingExecutor@7cfe63 rejected from java.util.concurrent.ThreadPoolExecutor@7f241d90[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 9]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:167)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.newStreamException(NettyServerHandler.java:774)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:465)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.access$900(NettyServerHandler.java:101)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$FrameListener.onHeadersRead(NettyServerHandler.java:813)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:373)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:65)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$1.processFragment(DefaultHttp2FrameReader.java:457)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:464)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@7cfe63 rejected from java.util.concurrent.ThreadPoolExecutor@7f241d90[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 9]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
    at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreatedInternal(ServerImpl.java:585)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreated(ServerImpl.java:475)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:456)
    ... 31 common frames omitted

2021-05-19 22:24:01.874 DEBUG 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : [id: 0x257fbde9, L:/127.0.0.1:10051 - R:/127.0.0.1:54061] OUTBOUND RST_STREAM: streamId=13 errorCode=2
2021-05-19 22:24:01.875 DEBUG 2595 --- [-worker-ELG-3-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xe5f50b5a, L:/127.0.0.1:54061 - R:/127.0.0.1:10051] INBOUND RST_STREAM: streamId=13 errorCode=2
2021-05-19 22:24:01.875 DEBUG 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : [id: 0x257fbde9, L:/127.0.0.1:10051 - R:/127.0.0.1:54061] INBOUND DATA: streamId=13 padding=0 endStream=true length=50 bytes=000000002d0a0868615f6461696c7912214d515f494e53545f313937333238313236393636313136305f4742746439557878
2021-05-19 22:24:01.876  WARN 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : Stream Error

io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Received DATA frame for an unknown stream 13
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.shouldIgnoreHeadersOrDataFrame(DefaultHttp2ConnectionDecoder.java:596)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:239)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容