??斗地主捕鱼电竞提现秒到 广告位招租 - 15元/月全站展示
??支付宝搜索579087183领大额红包 ??伍彩集团官网直营彩票
??好待遇→招代理 ??伍彩集团官网直营彩票
??络茄网 广告位招租 - 15元/月全站展示
Java编写基于netty的RPC框架

转载   沙漏半杯   2018-11-15   浏览量:12


一 简单概念

RPC: ( Remote Procedure Call),远程调用过程,是通过网络调用远程计算机的进程中某个方法,从而获取到想要的数据,过程如同调用本地的方法一样.

阻塞IO :当阻塞I/O在调用InputStream.read()方法是阻塞的,一直等到数据到来时才返回,同样ServerSocket.accept()方法时,也是阻塞,直到有客户端连接才返回,I/O通信模式如下:

Java编写基于netty的RPC框架

图片描述(最多50字)

缺点:当客户端多时,会创建大量的处理线程,并且为每一个线程分配一定的资源;阻塞可能带来频繁切换上下文,这时引入NIO

NIO : jdk1.4引入的(NEW Input/Output),是基于通过和缓存区的I/O方式,(插入一段题外话,学的多忘得也多,之前有认真研究过NIO,后来用到的时候,忘得一干二净,所以学习一些东西,经常返回看看),NIO是一种非阻塞的IO模型,通过不断轮询IO事件是否就绪,非阻塞是指线程在等待IO的时候,可以做其他的任务,同步的核心是Selector,Selector代替线程本省的轮询IO事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心是通道和缓存区,当IO事件的就绪时,可以将缓存区的数据写入通道

Java编写基于netty的RPC框架

图片描述(最多50字)

其工作原理:

1 由专门的线程来处理所有的IO事件,并且负责转发

2 事件驱动机制:事件到的时候才触发,而不是同步监视

3 线程通讯:线程之间通讯通过wait,notify等方式通讯,保证每次上下文切换都是有意义的,减少没必要的线程切换

通道 : 是对原I/O包中流的模拟,所有数据必须通过Channel对象,常见的通道FileChannel,SocketChannel,ServerSocketChannel,DatagramChannel(UDP协议向网络连接的两端读写数据)

Java编写基于netty的RPC框架

图片描述(最多50字)

Buffer缓存区 :实际上是一个容器,一个连续的数组,任何读写的数据都经过Buffer

Java编写基于netty的RPC框架

图片描述(最多50字)

Netty :是由JBOSS提供的一个java开源框架,是一个高性能,异步事件驱动的NIO框架,基于JAVA NIO提供的API实现,他提供了TCP UDP和文件传输的支持,,所有操作都是异步非阻塞的.通过Futrue-Listener机制,本质就是Reactor模式的现实,Selector作为多路复用器,EventLoop作为转发器,而且,netty对NIO中buffer做优化,大大提高了性能

二 Netty 客户端和服务端的

Netty中Bootstrap和Channel的生命周期

Bootstrap简介

Bootstarp:引导程序,将ChannelPipeline,ChannelHandler,EventLoop进行整体关联

Java编写基于netty的RPC框架

图片描述(最多50字)

Bootstrap具体分为两个实现

ServerBootstrap:用于服务端,使用一个ServerChannel接收客户端的连接,并创建对应的子Channel

Bootstrap:用于客户端,只需要一个单独的Channel,配置整个Netty程序,串联起各个组件

二者的主要区别:

1 ServerBootstrap用于Server端,通过调用bind()绑定一个端口监听连接,Bootstrap用于Client端,需要调用connect()方法来连接服务器端,我们也可以调用bind()方法接收返回ChannelFuture中Channel

2 客户端的Bootstrap一般用一个EventLoopGroup,而服务器的ServerBootstrap会用两个第一个EventLoopGroup专门负责绑定到端口监听连接事件,而第二个EventLoopGroup专门用来处处理每个接收的连接,这样大大提高了并发量

public class Server {
public static void main(String[] args) throws Exception {
// 1 创建线两个事件循环组
// 一个是用于处理服务器端接收客户端连接的
// 一个是进行网络通信的(网络读写的)
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
// 2 创建辅助工具类ServerBootstrap,用于服务器通道的一系列配置
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup) // 绑定俩个线程组
.channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel对应TCP, NioDatagramChannel对应UDP
.option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP缓冲区
.option(ChannelOption.SO_SNDBUF, 32 1024) // 设置发送缓冲大小
.option(ChannelOption.SO_RCVBUF, 32
1024) // 这是接收缓冲大小
.option(ChannelOption.SO_KEEPALIVE, true) // 保持连接
.childHandler(new ChannelInitializer<SocketChannel>() {br/>@Override
protected void initChannel(SocketChannel sc) throws Exception { //SocketChannel建立连接后的管道
// 3 在这里配置 通信数据的处理逻辑, 可以addLast多个...
sc.pipeline().addLast(new ServerHandler());
}
});
// 4 绑定端口, bind返回future(异步), 加上sync阻塞在获取连接处
ChannelFuture cf1 = b.bind(8765).sync();
//ChannelFuture cf2 = b.bind(8764).sync(); //可以绑定多个端口
// 5 等待关闭, 加上sync阻塞在关闭请求处
cf1.channel().closeFuture().sync();
//cf2.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}

public class ServerHandler extends ChannelHandlerAdapter {br/>@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("server channel active... ");br/>}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Server :" + body );
String response = "返回给客户端的响应:" + body ;
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
// future完成后触发监听器, 此处是写完即关闭(短连接). 因此需要关闭连接时, 要通过server端关闭. 直接关闭用方法ctx[.channel()].close()br/>//.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.flush();
}
@Override
br/>System.out.println("读完了");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
throws Exception {
ctx.close();
}
}

public class Client {
public static void main(String[] args) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
  @Override
  protected void initChannel(SocketChannel sc) throws Exception { 
    sc.pipeline().addLast(new ClientHandler());
  }
});

ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
//ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //可以使用多个端口
//发送消息, Buffer类型. write需要flush才发送, 可用writeFlush代替
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
Thread.sleep(2000);
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
//cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes()));

cf1.channel().closeFuture().sync();
//cf2.channel().closeFuture().sync();
group.shutdownGracefully();

}
}

public class ClientHandler extends ChannelHandlerAdapter{br/>@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {br/>}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Client :" + body );
} finally {
// 记得释放xxxHandler里面的方法的msg参数: 写(write)数据, msg引用将被自动释放不用手动处理; 但只读数据时,!必须手动释放引用数br/>ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {br/>}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
其他组件:

Handle: 为了支持各种协议和处理数据的方式,可以是连接,数据接收,异常,数据格式转换等

ChannelHandler

ChannelInboundHandler :最常用的Handler,作用是处理接收数据的事件,来处理我们的核心业务逻辑。

ChannelInitializer :,当一个链接建立时,我们需要知道怎么来接收或者发送数据,当然,我们有各种各样的Handler实现来处理它,那么ChannelInitializer便是用来配置这些Handler,它会提供一个ChannelPipeline,并把Handler加入到ChannelPipeline。

ChannelPipeline :一个Netty应用基于ChannelPipeline机制,这种机制依赖EventLoop和EventLoopGroup,这三个都和事件或者事件处理相关

EventLoop : 为Channel处理IO操作,一个EventLoop可以为多个Channel服务

EventLoopGroup :包含多个EventLoop

Channel :代表一个Socket连接

Future :在Netty中所有的IO操作都是异步的,,因此我们不知道,过来的请求是否被处理了,所以我们注册一个监听,当操作执行成功或者失败时监听自动触发,所有操作都会返回一个ChannelFutrue

ChannelFuture

Netty 是一个非阻塞的,事件驱动的,网络编程框架,我们通过一张图理解一下,Channel,EventLoop以及EventLoopGroup之间的关系

Java编写基于netty的RPC框架

图片描述(最多50字)

解释一下,当一个连接过来,Netty首先会注册一个channel,然后EventLoopGroup会分配一个EventLoop绑定到这个channel,在这个channel的整个生命周期过程中,这个EventLoop一直为他服务,这个玩意就是一个线程

Java编写基于netty的RPC框架

图片描述(最多50字)

这下聊一下Netty如何处理数据?

前面有讲到,handler数据处理核心,,而ChannelPipeline负责安排Handler的顺序和执行,我们可以这样理解,数据在ChannelPipeline中流动,其中ChannelHandler就是一个个阀门,这些数据都会经过每一个ChannelHandler并且被他处理,其中ChannelHandler的两个子类ChannelOutboundHandler和ChannelInboundHandler,根据不同的流向,选择不同的Handler

Java编写基于netty的RPC框架

图片描述(最多50字)

由图可以看出,一个数据流进入ChannelPipeline时,一个一个handler挨着执行,各个handler的数据传递,这需要调用方法中ChannelHandlerContext来操作,而这个ChannelHandlerContext可以用来读写Netty中的数据流

三 Netty中的业务处理

netty中会有很多Handler.具体哪一种Handler还要看继承是InboundAdapter还是OutboundAdapter,Netty中提供一系列的Adapter来帮助我们简化开发,在ChannelPipeline中的每一个handler都负责把Event传递个洗下一个handler,有这些adapter,这些工作可以自动完成,,我们只需覆盖我们真正实现的部分即可,接下来比较常用的三种ChannelHandler

Encoders和Decodeers

我们在网络传输只能传输字节流,在发送数据时,把我们的message转换成bytes这个过程叫Encode(编码),相反,接收数据,需要把byte转换成message,这个过程叫Decode(解码)

Domain Logic

我们真正关心的如何处理解码以后的数据,我们真正的业务逻辑便是接收处理的数据,Netty提供一个常用的基类就是SimpleChannelInboundHandler<T>,其中T就是Handler处理的数据类型,消息到达这个Handler,会自动调用这个Handler中的channelRead0(ChannelHandlerContext,T)方法,T就是传过来的数据对象

四 基于netty实现的Rpc的例子

这是我的github上项目的位置

https://github.com/developerxiaofeng/rpcByNetty

项目目录结构如下

Java编写基于netty的RPC框架

图片描述(最多50字)

详细的项目细节看类中的注释,很详细哦!!!

转载自://blog.51cto.com/14028890/2317363

招聘 不方便扫码就复制添加关注:程序员招聘谷,微信号:jobs1024



深入理解分布式系统中的缓存架构 下
承接上一篇《理解分布式系统中的缓存架构(上)》,介绍了大型分布式系统中缓存的相关理论,常见的缓存组件以及应用场景,本文主要介绍缓存架构设计常见问题以及解决方案,业界案例。1分层缓存架构设计2缓存带来的复杂度问题常见的问题主要包括数据一致性缓存穿透缓存雪崩缓存高可用缓存热点下面逐一介绍分析这些问题以及相应的解决方案。数据一致性因为缓存属于持久化数据的一个副本,因此不可避免的会出现数据不一致问题。导致
Java笔试面试题(三)
每日一句:想,都是问题;做,才是答案。构造器Constructor是否可被Override?构造器Constructor不能被继承,因此不能重写Override,但是可以被重载Overload。接口是否可继承接口?抽象类是否可实现(implement)接口?抽象类是否可继承具体类(concreteclass)?抽象类中是否可以有静态的类方法?接口可以继承接口。抽象类可以实现接口。抽象类可以继承具体
使用反射对bean的collection属性赋值
反射对bean的collection属性赋值对collection使用反射创建时会遇到不知道具体实现类型而烦恼比如:classSchool{privateListclasses;//班级列表publicvoidsetClasses(Listclasses){this.classes=classes;}pulbicListgetClasses(){returnthisclasses;}}Class&
springcloud与dubbo的性能对比
在长期使用dubbo的团队中推行springcloud确实是个难题,巨大理由之一就是rpc调用效率远远高于http,故此做一个相关性能测试,所用到的dubbo与spring系均采用官方最新版(dubbo2.5.7、springboot1.5.9、springcloudedgware)。springcloud测试代码地址?https://github.com/liuchengts/
浅谈Spring cloud与Dubbo各自的优势与劣势
有关微服务架构的讨论最近一直很火。近期也看到一些分享SpringCloud的相关实施经验,这对于最近正在整理SpringCloud相关套件内容与实例应用的我而言,还是有不少激励的。目前,SpringCloud在国内的知名度并不高,与一些互联网公司的架构师、技术VP或者CTO在交流时,有些甚至还不知道该项目的存在。这也许与国内的开源服务治理框架Dubbo有一定的关系,除了Dubbo本身较为完善...
基于springCloud构建微云架构技术分享
一,什么是微服务微服务英文名称Microservice,Microservice架构模式就是将整个Web应用组织为一系列小的Web服务。这些小的Web服务可以独立地编译及部署,并通过各自暴露的API接口相互通讯。它们彼此相互协作,作为一个整体为用户提供功能,却可以独立地进行扩。微服务架构需要的功能或使用场景1:我们把整个系统根据业务拆分成几个子系统。2:每个子系统可以部署多个应用,多个应用之间使用...
基于springCloud的分布式架构体系
SpringCloud作为一套微服务治理的框架,几乎考虑到了微服务治理的方方面面,之前也写过一些关于SpringCloud文章,主要偏重各组件的使用,本次分享主要解答这两个问题:SpringCloud在微服务的架构中都做了哪些事情?SpringCloud提供的这些功能对微服务的架构提供了怎样的便利??我们先来简单回顾一下,我们以往互联网架构的发展情况:?传统架构发展史?
Spring Cloud 微服务框架技术标准分析
本系列文章主要介绍这些技术中的翘楚——SpringCloud。这是序篇,主要讲述我们为什么选择SpringCloud和它的技术概览。1为什么微服务架构需要SpringCloud简单来说,服务化的核心就是将传统的一站式应用根据业务拆分成一个一个的服务,而微服务在这个基础上要更彻底地去耦合(不再共享DB、KV,去掉重量级ESB),并且强调DevOps和快速演化。这就要求我们
详细全面的 SpringBoot 文件上传
这些天忙着刷题,又怕遗忘了springboot,所以抽出一点时间折腾折腾,加深点印象。springboot的文件上传与springmvc的文件上传基本一致,只需注意一些配置即可?;肪骋螅篠pringBootv1.5.1.RELEASE+jdk1.7+myeclipse1).引入thymeleaf,支持页面跳转
基于Redis实现分布式锁实战
大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。