大家好,今天小热关注到一个比较有意思的话题,就是关于AbstractMessage的问题,于是小编就整理了2个相关介绍AbstractMessage的解答,让我们一起看看吧。
文章目录:
一、SpringBoot 实战:国际化组件 MessageSource 的执行逻辑与源码_百度知 ...
SpringBoot 的国际化组件 MessageSource 在配置文件中发挥基础作用,其执行逻辑包含两部分:ResourceBundleMessageSource 和 ReloadableResourceBundleMessageSource。默认情况下,ResourceBundleMessageSource 仅加载配置文件一次,而 ReloadableResourceBundleMessageSource 则根据文件修改时间判断是否重新加载。流程图展示了这些组件的执行逻辑。
MessageSource 组件提供多个抽象化实现,其核心逻辑在 AbstractMessageSource 类中体现。`getMessage` 方法提供了三种使用场景:第一种可传入默认值,默认值在所有 basename 配置文件中不存在时返回;第二种根据 `useCodeAsDefaultMessage` 配置,若设置为 `true`,在不存在指定值时返回 code;若设置为 `false`,则抛出异常。第三种接受 MessageSourceResolvable 接口对象,以多种方式查找 code,若最终未找到,同样抛出异常。
源码中的内存缓存应用是学习优秀编程方式的重要环节。Spring 源码中使用内存缓存常见于性能优化,例如,通过创建一个包含 basename 和 locale 的对象 BasenameLocale,并以 `Map` 实现缓存,简化了逻辑判断。记录类 BasenameLocale 在 Java 16 的新特性中有所介绍。
本文总结了 MessageSource 的配置项、执行逻辑和内存缓存实现,最后预告了后续内容将扩展 MessageSource 功能,实现从 Nacos 加载配置内容及动态修改配置内容。通过本文学习,读者将对 SpringBoot 国际化组件的底层原理有更深入理解,为开发实践提供坚实基础。
二、netty系列:一行简单的writeAndFlush都做了哪些事
前言对于使用Netty的小伙伴来说,我们想通过服务端往客户端发送数据,通常我们会调用ctx.writeAndFlush(数据)的方式。那么它都执行了那些行为呢,是怎么将消息发送出去的呢。
源码分析下面的这个方法是用来接收客户端发送过来的数据,通常会使用ctx.writeAndFlush(数据)来向客户端发送数据。
@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{System.out.println("接收到消息:"+msg);Stringstr="服务端收到:"+newDate()+msg;ctx.writeAndFlush(str);}ctx.writeAndFlush的逻辑privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}从上述源码我们可以知道,WriteAndFlush()相对于Write(),它的flush字段是true。
write:将需要写的ByteBuff存储到ChannelOutboundBuffer中。
flush:从ChannelOutboundBuffer中将需要发送的数据读出来,并通过Channel发送出去。
writeAndFlush源码publicChannelFuturewriteAndFlush(Objectmsg){returnthis.writeAndFlush(msg,this.newPromise());}publicChannelPromisenewPromise(){returnnewDefaultChannelPromise(this.channel(),this.executor());}writeAndFlush方法里提供了一个默认的newPromise()作为参数传递。在Netty中发送消息是一个异步操作,那么可以通过往hannelPromise中注册回调监听listener来得到该操作是否成功。
在发送消息时添加监听
ctx.writeAndFlush(str,ctx.newPromise().addListener(newChannelFutureListener(){@OverridepublicvoidoperationComplete(ChannelFuturechannelFuture)throwsException{channelFuture.isSuccess();}}));继续向下一层跟进代码,AbstractChannelHandlerContext中的invokeWriteAndFlush的源码。
privatevoidinvokeWriteAndFlush(Objectmsg,ChannelPromisepromise){if(this.invokeHandler()){this.invokeWrite0(msg,promise);this.invokeFlush0();}else{this.writeAndFlush(msg,promise);}}从上述源码我们可以能够知道:
1、首先通过invokeHandler()判断通道处理器是否已添加到管道中。
2、执行消息处理invokeWrite0方法:
首先将消息内容放入输出缓冲区中invokeFlush0;
然后将输出缓冲区中的数据通过socket发送到网络中。
分析invokeWrite0执行的内容,源码如下:
privatevoidinvokeWrite0(Objectmsg,ChannelPromisepromise){try{((ChannelOutboundHandler)this.handler()).write(this,msg,promise);}catch(Throwablevar4){notifyOutboundHandlerException(var4,promise);}}((ChannelOutboundHandler)this.handler()).write是一个出站事件ChannelOutboundHandler,会由ChannelOutboundHandlerAdapter处理。
@Skippublicvoidwrite(ChannelHandlerContextctx,Objectmsg,ChannelPromisepromise)throwsException{ctx.write(msg,promise);}接下来会走到ChannelPipeline中,来执行网络数据发送;我们来看DefaultChannelPipeline中HeadContext的write方法源码
publicvoidwrite(ChannelHandlerContextctx,Objectmsg,ChannelPromisepromise){this.unsafe.write(msg,promise);}unsafe是构建NioServerSocketChannel或NioSocketChannel对象时,一并构建一个成员属性,它会完成底层真正的网络操作等。
我们跟进HenderContext的write(),而HenderContext的中依赖的是unsafe.wirte()。所以直接去AbstractChannel的Unsafe源码如下:
publicfinalvoidwrite(Objectmsg,ChannelPromisepromise){this.assertEventLoop();ChannelOutboundBufferoutboundBuffer=this.outboundBuffer;if(outboundBuffer==null){//缓存写进来的bufferthis.safeSetFailure(promise,this.newWriteException(AbstractChannel.this.initialCloseCause));ReferenceCountUtil.release(msg);}else{intsize;try{//bufferDirct化,(我们查看AbstractNioByteBuf的实现)msg=AbstractChannel.this.filterOutboundMessage(msg);size=AbstractChannel.this.pipeline.estimatorHandle().size(msg);if(size<0){size=0;}}catch(Throwablevar6){this.safeSetFailure(promise,var6);ReferenceCountUtil.release(msg);return;}//插入写队列将msg插入到outboundBuffer//outboundBuffer这个对象是ChannelOutBoundBuff类型的,它的作用就是起到一个容器的作用//下面看,是如何将msg添加进ChannelOutBoundBuff中的outboundBuffer.addMessage(msg,size,promise);}}从上述源码中,我们可以看出,首先调用assertEventLoop确保该方法的调用是在reactor线程中;然后,调用filterOutboundMessage()方法,将待写入的对象过滤。下面我们来看看filterOutboundMessage方法的源码。
protectedfinalObjectfilterOutboundMessage(Objectmsg){if(msginstanceofByteBuf){ByteBufbuf=(ByteBuf)msg;returnbuf.isDirect()?msg:this.newDirectBuffer(buf);}elseif(msginstanceofFileRegion){returnmsg;}else{thrownewUnsupportedOperationException("unsupportedmessagetype:"+StringUtil.simpleClassName(msg)+EXPECTED_TYPES);}}从上述源码可以看出,只有ByteBuf以及FileRegion可以进行最终的Socket网络传输,其他类型的数据是不支持的,会抛UnsupportedOperationException异常。并且会把堆ByteBuf转换为一个非堆的ByteBuf返回。也就说,最后会通过socket传输的对象时非堆的ByteBuf和FileRegion。
在发送数据时,我们需要估算出需要写入的ByteBuf的size,我们来看看DefaultMessageSizeEstimator的HandleImpl类中的size()方法。
privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}0通过ByteBuf.readableBytes()判断消息内容大小,估计待发送消息数据的大小,如果是FileRegion的话直接返回0,否则返回ByteBuf中可读取字节数。
接下来我们来看看是如何将msg添加进ChannelOutBoundBuff中的。
ChannelOutBoundBuff类ChannelOutboundBuffer类主要用于存储其待处理的出站写请求的内部数据。当Netty调用write时数据不会真正地去发送而是写入到ChannelOutboundBuffer缓存队列,直到调用flush方法Netty才会从ChannelOutboundBuffer取数据发送。每个Unsafe都会绑定一个ChannelOutboundBuffer,也就是说每个客户端连接上服务端都会创建一个ChannelOutboundBuffer绑定客户端Channel。
观察ChannelOutBoundBuff源码,可以看到以下四个属性:
privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}1flushedEntry:指针表示第一个被写到操作系统Socket缓冲区中的节点;
unFlushedEntry:指针表示第一个未被写入到操作系统Socket缓冲区中的节点;
tailEntry:指针表示ChannelOutboundBuffer缓冲区的最后一个节点。
flushed:表示待发送数据个数。
下面分别是三个指针的作用,示意图如下:
unFlushedEntry指针表示第一个未被写入到操作系统Socket缓冲区中的节点;
tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个节点。
初次调用addMessage之后,各个指针的情况为:
ChannelOutboundBuffer主要提供了以下方法:
addMessage方法:添加数据到对列的队尾;
addFlush方法:准备待发送的数据,在flush前需要调用;
nioBuffers方法:用于获取待发送的数据。在发送数据的时候,需要调用该方法以便拿到数据;
removeBytes方法:发送完成后需要调用该方法来删除已经成功写入TCP缓存的数据。
addMessage方法addMessage方法是系统调用write方法时调用,源码如下。
privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}2上述源码流程如下:
将消息数据包装成Entry对象;
如果对列为空,直接设置尾结点为当前节点,否则将新节点放尾部;
unflushedEntry为空说明不存在暂时不需要发送的节点,当前节点就是第一个暂时不需要发送的节点;
将消息添加到未刷新的数组后,增加挂起的节点。
这里需要重点看看第一步将消息数据包装成Entry对象的方法。
privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}3其中Recycler类是基于线程本地堆栈的轻量级对象池。这意味着调用newInstance方法时,并不是直接创建了一个Entry实例,而是通过对象池获取的。
下面我们看看incrementPendingOutboundBytes方法的源码。
privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}4在每次添加新的节点后都调用incrementPendingOutboundBytes((long)entry.pendingSize,false)方法,这个方法的作用是设置写状态,设置怎样的状态呢?我们看它的源码,可以看到,它会记录下累计的ByteBuf的容量,一旦超出了阈值,就会传播channel不可写的事件。
addFlush方法addFlush方法是在系统调用flush方法时调用的,addFlush方法的源码如下。
privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(
到此,以上就是小编对于AbstractMessage的问题就介绍到这了,希望介绍关于AbstractMessage的2点解答对大家有用。
郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。