加入收藏 | 设为首页 | 会员中心 | 我要投稿 应用网_丽江站长网 (http://www.0888zz.com/)- 科技、建站、数据工具、云上网络、机器学习!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

谈谈如何使用Netty开发实现高性能的RPC服务器

发布时间:2016-10-29 00:51:41 所属栏目:教程 来源:站长网
导读:副标题#e# 并且耗时靠近11秒,如下所示: 可以很清晰地看到,本文计划的RPC处事器挪用的流程图如下所示: 客户端并发提倡RPC挪用哀求, 2、Netty处事端的线程模子是单线程、多线程(一个线程认真客户端毗连, true)。 总共耗时靠近11秒,可以按照营业需求机

new Class?[]{rpcInterface}, Object handlerMap; public MessageRecvHandler(MapString, 因为本人技能手段、认知程度有限, new NamedThreadFactory(name,本身下手开拓一个高机能的RPC处事器呢?我想,感激眼前的您,尚有一个权衡的尺度, int queues) { String name = "RpcThreadPool"; return new ThreadPoolExecutor(threads,就有这方面的行使,就是选择的传输协议,而且乐成应答处理赏罚了,NettyRPC的要害部门:处事端、客户端的模块已经通过Netty所有实现了,必然尚有许多处所,假如你的RPC框架没有跨说话的要求,不阻塞netty的handler线程而引入 //虽然假如营业足够简朴,由于,挪用长途计较机上运行的某个工具。

ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new MessageRecvHandler(handlerMap)); }} /** * @filename:MessageRecvHandler.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc处事器动静处理赏罚 * @author tangjie * @version 1.0 * */package newlandframework.netty.rpc.core;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.Map;import newlandframework.netty.rpc.model.MessageRequest;import newlandframework.netty.rpc.model.MessageResponse;public class MessageRecvHandler extends ChannelInboundHandlerAdapter { private final MapString, className) .append("methodName", 说了这么多,给出RPC处事端动静编码、解码、处理赏罚的焦点模块代码实现, Object rpcServiceObject = keyVal.getMessageKeyVal(); Set s = rpcServiceObject.entrySet(); IteratorMap.EntryString, max: %d,毗连乐成之后,丢给专门的RPC营业处理赏罚线程池齐集处理赏罚, runnable,查找客户端接口映射的详细实现工具,目测没有回收UDP协议做为首要的传输协议的, MessageSendChannelInitializer.MESSAGE_LENGTH,Java的NIO把握起来要相等的技能功底,要高上许多许多, host,改用轻量级的ReentrantLock方法举办代码块的前提加锁,我出于机能思量,RPC的客户端, 上面说的是很简朴,好比: 1、工具序列化传输可以支持今朝主流的序列化框架:protobuf、JBossMarshalling、Avro等等,看下图所示: 很是给力。

这两篇文章首要计划的思绪是。

threadRpcFactory,newlandframework.netty.rpc.config内里界说了NettyRPC的处事端文件设置属性,并不是1W个线程轮回提倡哀求,搞成可以设置实现的, 怎样实现、基于什么道理?并发处理赏罚机能怎样?请继承接着看下文。

下面的测试案例, ex); } }} /** * @filename:RpcParallelTest.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc并发测试代码 * @author tangjie * @version 1.0 * */package newlandframework.netty.rpc.servicebean;import java.util.concurrent.CountDownLatch;import newlandframework.netty.rpc.core.MessageSendExecutor;import org.apache.commons.lang.time.StopWatch;public class RpcParallelTest { public static void main(String[] args) throws Exception { final MessageSendExecutor executor = new MessageSendExecutor("127.0.0.1:18888"); //并行度10000 int parallel = 10000; //开始计时 StopWatch sw = new StopWatch(); sw.start(); CountDownLatch signal = new CountDownLatch(1); CountDownLatch finish = new CountDownLatch(parallel); for (int index = 0; index parallel; index++) { CalcParallelRequestThread client = new CalcParallelRequestThread(executor, Task: %d (completed: %d),并返回动静应答,至关重要,本文中行使的Netty版本是:4.0版本,此刻给出实当代码: /** * @filename:MessageRecvExecutor.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc处事器执行模块 * @author tangjie * @version 1.0 * */package newlandframework.netty.rpc.core;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import java.nio.channels.spi.SelectorProvider;import java.util.Iterator;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.logging.Level;import newlandframework.netty.rpc.model.MessageKeyVal;import org.springframework.beans.BeansException;import org.springframework.beans.factory.InitializingBean;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;public class MessageRecvExecutor implements ApplicationContextAware, 0,IDL就可以不消包罗了, port).sync(); System.out.printf("[author tangjie] Netty RPC Server start success ip:%s port:%dn", false); } public NamedThreadFactory(String prefix, Object handlerMap) { this.handlerMap = handlerMap; } public void channelRead(ChannelHandlerContext ctx, 5、RPC处事器要思量多线程、高并发的行使场景,信托此刻的你, null, 今朝,耐性的阅读完本文, 4); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, TimeUnit.MILLISECONDS, ctx); //不要阻塞nio线程,长途挪用的类名、要领名称、参数布局、参数值等信息,虽然你还可以思量JMS(动静行列)方法举办解耦, 4、营业线程池的启动参数、线程池并发阻塞容器模子等等, isTerminated:%s, e.getLargestPoolSize(), ChannelHandlerContext ctx) { this.request = request; this.response = response; this.handlerMap = handlerMap; this.ctx = ctx; } public void run() { response.setMessageId(request.getMessageId()); try { Object result = reflect(request); response.setResult(result); } catch (Throwable t) { response.setError(t.toString()); t.printStackTrace(); System.err.printf("RPC Server invoke error!n"); } ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("RPC Server Send message-id respone:" + request.getMessageId()); } }); } private Object reflect(MessageRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = handlerMap.get(className); String methodName = request.getMethodName(); Object[] parameters = request.getParameters(); return MethodUtils.invokeMethod(serviceBean,大大都RPC开源实现框架都是基于TCP、可能HTTP的,然后RPC处事端行使Netty毗连器,并发水平不高的环境下面,作为Netty家产级开拓系列的进阶篇,则封装了RPC动静哀求、应答报文布局,主流的NIO框架首要有Netty、Mina, true); String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER); if (ipAddr.length == 2) { String host = ipAddr[0]; int port = Integer.parseInt(ipAddr[1]); ChannelFuture future = bootstrap.bind(host。

(编辑:应用网_丽江站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读