简

人生短暂,学海无边,而大道至简。


  • 首页

  • 归档

  • 分类

  • 标签

JVM垃圾回收机制

发表于 2018-05-10 | 分类于 java

如何定义垃圾

有两种方式,一种是引用计数(但是无法解决循环引用的问题);另一种就是可达性分析。

引用计数

引用计数法的逻辑是:在堆中存储对象时,在对象头处维护一个counter计数器,如果一个对象增加了一个引用与之相连,则将counter++。如果一个引用关系失效则counter-。如果一个对象的counter变为0,则说明该对象已经被废弃,不处于存活状态。

引用计数法无法解决多种类型引用的问题。但这并不是致命的,因为我们可以通过增加逻辑区分四种引用情况,虽然麻烦一些但还算是引用计数法的变体,真正让引用计数法彻底报废的下面的情况。

如果一个对象A持有对象B,而对象B也持有一个对象A,那发生了类似操作系统中死锁的循环持有,这种情况下A与B的counter恒大于1,会使得GC永远无法回收这两个对象。

可达性分析
这个算法的基本思路就是通过一系列名为GC Roots的对象作为起始点,从这些节点开始向下搜索,搜索所走过的路径称为引用链(Reference Chain),当一个对象到GC Roots没有任何引用链相连时,则证明此对象是不可用的,下图对象object5, object6, object7虽然有互相判断,但它们到GC Roots是不可达的,所以它们将会判定为是可回收对象

20200410180141

判断对象可以回收的情况:

显示的把某个引用置位NULL或者指向别的对象
局部引用指向的对象
弱引用关联的对象

典型的垃圾收集算法

在确定了哪些垃圾可以被回收后,垃圾收集器要做的事情就是开始进行垃圾回收,但是这里面涉及到一个问题是:如何高效地进行垃圾回收。由于Java虚拟机规范并没有对如何实现垃圾收集器做出明确的规定,因此各个厂商的虚拟机可以采用不同的方式来实现垃圾收集器,所以在此只讨论几种常见的垃圾收集算法的核心思想。

1、Mark-Sweep(标记-清除)算法

这是最基础的垃圾回收算法,之所以说它是最基础的是因为它最容易实现,思想也是最简单的。标记-清除算法分为两个阶段:标记阶段和清除阶段。标记阶段的任务是标记出所有需要被回收的对象,清除阶段就是回收被标记的对象所占用的空间。具体过程如下图所示:

20200410180253

从图中可以很容易看出标记-清除算法实现起来比较容易,但是有一个比较严重的问题就是容易产生内存碎片,碎片太多可能会导致后续过程中需要为大对象分配空间时无法找到足够的空间而提前触发新的一次垃圾收集动作。

2、Copying(复制)算法

为了解决Mark-Sweep算法的缺陷,Copying算法就被提了出来。它将可用内存按容量划分为大小相等的两块,每次只使用其中的一块。当这一块的内存用完了,就将还存活着的对象复制到另外一块上面,然后再把已使用的内存空间一次清理掉,这样一来就不容易出现内存碎片的问题。

这种算法虽然实现简单,运行高效且不容易产生内存碎片,但是却对内存空间的使用做出了高昂的代价,因为能够使用的内存缩减到原来的一半。

很显然,Copying算法的效率跟存活对象的数目多少有很大的关系,如果存活对象很多,那么Copying算法的效率将会大大降低。

3、Mark-Compact(标记-整理)算法

为了解决Copying算法的缺陷,充分利用内存空间,提出了Mark-Compact算法。该算法标记阶段和Mark-Sweep一样,但是在完成标记之后,它不是直接清理可回收对象,而是将存活对象都向一端移动,然后清理掉端边界以外的内存。

这种方法可以解决内存碎片问题,但是会增加停顿时间。

4、Generational Collection(分代收集)算法

分代收集算法是目前大部分JVM的垃圾收集器采用的算法。它的核心思想是根据对象存活的生命周期将内存划分为若干个不同的区域。一般情况下将堆区划分为老年代(Tenured Generation)和新生代(Young Generation),老年代的特点是每次垃圾收集时只有少量对象需要被回收,而新生代的特点是每次垃圾回收时都有大量的对象需要被回收,那么就可以根据不同代的特点采取最适合的收集算法。

目前大部分垃圾收集器对于新生代都采取Copying算法,因为新生代中每次垃圾回收都要回收大部分对象,也就是说需要复制的操作次数较少,但是实际中并不是按照1:1的比例来划分新生代的空间的,一般来说是将新生代划分为一块较大的Eden空间和两块较小的Survivor空间,每次使用Eden空间和其中的一块Survivor空间,当进行回收时,将Eden和Survivor中还存活的对象复制到另一块Survivor空间中,然后清理掉Eden和刚才使用过的Survivor空间。

而由于老年代的特点是每次回收都只回收少量对象,一般使用的是Mark-Compact算法。

注意,在堆区之外还有一个代就是永久代(Permanet Generation),它用来存储class类、常量、方法描述等。对永久代的回收主要回收两部分内容:废弃常量和无用的类。

垃圾收集器

垃圾收集算法是 内存回收的理论基础,而垃圾收集器就是内存回收的具体实现。下面介绍一下HotSpot(JDK 7)虚拟机提供的几种垃圾收集器,用户可以根据自己的需求组合出各个年代使用的收集器。

20200410180439

图中展示了7种不同分代的收集器;而它们所处区域,则表明其是属于新生代收集器还是老年代收集器;两个收集器间有连线,表明它们可以搭配使用。

1、Serial/Serial Old

Serial/Serial Old收集器是最基本最古老的收集器,它是一个单线程收集器,并且在它进行垃圾收集时,必须暂停所有用户线程。Serial收集器是针对新生代的收集器,采用的是Copying算法,Serial Old收集器是针对老年代的收集器,采用的是Mark-Compact算法。它的优点是实现简单高效,但是缺点是会给用户带来停顿。

优势: 简单而高效(与其他收集器的单线程比),对于限定单个CPU的环境来说,Serial收集器由于没有线程交互的开销,专心做垃圾收集自然可以获得最高的单线程收集效率。

2、ParNew

ParNew收集器是Serial收集器的多线程版本,使用多个线程进行垃圾收集。

优势: 除了多线程收集以外,跟Serial收集器一样,很重要的原因是:除了Serial收集器外,目前只有它能与CMS收集器配合工作。CMS作为老年代的收集器,却无法与JDK 1.4.0中已经存在的新生代收集器Parallel Scavenge配合工作,所以在JDK 1.5中使用CMS来收集老年代的时候,新生代只能选择ParNew或者Serial收集器中的一个。

3、Parallel Scavenge

Parallel Scavenge收集器是一个新生代的多线程收集器(并行收集器),它在回收期间不需要暂停其他用户线程,其采用的是Copying算法,该收集器与前两个收集器有所不同,它主要是为了达到一个可控的吞吐量。可以高效率地利用CPU时间,尽快完成程序的运算任务,主要适合在后台运算而不需要太多交互的任务。

4、Parallel Old

Parallel Old是Parallel Scavenge收集器的老年代版本(并行收集器),使用多线程和Mark-Compact算法。

5、CMS

CMS(Current Mark Sweep)收集器是一种以获取最短回收停顿时间为目标的收集器,它是一种并发收集器,采用的是Mark-Sweep算法。

目前很大一部分的Java应用集中在互联网站或者B/S系统的服务端上,这类应用尤其重视服务的响应速度,希望系统停顿时间最短,以给用户带来较好的体验。CMS收集器就非常符合这类应用的需求。

由于整个过程中耗时最长的并发标记和并发清除过程收集器线程都可以与用户线程一起工作,所以,从总体上来说,CMS收集器的内存回收过程是与用户线程一起并发执行的。

优点: CMS是一款优秀的收集器,它的主要优点在名字上已经体现出来了:并发收集、低停顿。
缺点: CMS收集器对CPU资源非常敏感

其实,面向并发设计的程序都对CPU资源比较敏感。在并发阶段,它虽然不会导致用户线程停顿,但是会因为占用了一部分线程(或者说CPU资源)而导致应用程序变慢,总吞吐量会降低。
CMS默认启动的回收线程数是(CPU数量+3)/ 4,也就是当CPU在4个以上时,并发回收时垃圾收集线程不少于25%的CPU资源,并且随着CPU数量的增加而下降。但是当CPU不足4个(譬如2个)时,CMS对用户程序的影响就可能变得很大。

6、G1

G1收集器是当今收集器技术发展最前沿的成果,它是一款面向服务端应用的收集器,它能充分利用多CPU、多核环境。因此它是一款并行与并发收集器,并且它能建立可预测的停顿时间模型。

HotSpot开发团队赋予它的使命是未来可以替换掉JDK 1.5中发布的CMS收集器。与其他GC收集器相比,G1具备如下特点。

  • 并行与并发

G1能充分利用多CPU、多核环境下的硬件优势,使用多个CPU来缩短Stop-The-World停顿的时间,部分其他收集器原本需要停顿Java线程执行的GC动作,G1收集器仍然可以通过并发的方式让Java程序继续执行。

  • 分代收集

与其他收集器一样,分代概念在G1中依然得以保留。虽然G1可以不需要其他收集器配合就能独立管理整个GC堆,但它能够采用不同的方式去处理新创建的对象和已经存活了一段时间、熬过多次GC的旧对象以获取更好的收集效果。

  • 空间整合

与CMS的”标记-清理”算法不同,G1从整体来看是基于”标记-整理”算法实现的收集器,从局部(两个Region之间)上来看是基于”复制”算法实现的,但无论如何,这两种算法都意味着G1运作期间不会产生内存空间碎片,收集后能提供规整的可用内存。这种特性有利于程序长时间运行,分配大对象时不会因为无法找到连续内存空间而提前触发下一次GC。

  • 可预测的停顿

这是G1相对于CMS的另一大优势,降低停顿时间是G1和CMS共同的关注点,但G1除了追求低停顿外,还能建立可预测的停顿时间模型,能让使用者明确指定在一个长度为M毫秒的时间片段内,消耗在垃圾收集上的时间不得超过N毫秒。

在G1之前的其他收集器进行收集的范围都是整个新生代或者老年代,而G1不再是这样。使用G1收集器时,Java堆的内存布局就与其他收集器有很大差别,它将整个Java堆划分为多个大小相等的独立区域(Region),虽然还保留有新生代和老年代的概念,但新生代和老年代不再是物理隔离的了,它们都是一部分Region(不需要连续)的集合。

G1收集器之所以能建立可预测的停顿时间模型,是因为它可以有计划地避免在整个Java堆中进行全区域的垃圾收集。G1跟踪各个Region里面的垃圾堆积的价值大小(回收所获得的空间大小以及回收所需时间的经验值),在后台维护一个优先列表,每次根据允许的收集时间,优先回收价值最大的Region(这也就是Garbage-First名称的来由)。这种使用Region划分内存空间以及有优先级的区域回收方式,保证了G1收集器在有限的时间内可以获取尽可能高的收集效率。

执行过程

G1收集器的运作大致可划分为以下几个步骤:

  • 初始标记(Initial Marking)

初始标记阶段仅仅只是标记一下GC Roots能直接关联到的对象,并且修改TAMS(Next Top at Mark Start)的值,让下一阶段用户程序并发运行时,能在正确可用的Region中创建新对象,这阶段需要停顿线程,但耗时很短。

  • 并发标记(Concurrent Marking)

并发标记阶段是从GCRoot开始对堆中对象进行可达性分析,找出存活的对象,这阶段耗时较长,但可与用户程序并发执行。

  • 最终标记(Final Marking)

最终标记阶段是为了修正在并发标记期间因用户程序继续运作而导致标记产生变动的那一部分标记记录,虚拟机将这段时间对象变化记录在线程Remembered Set Logs里面,最终标记阶段需要把Remembered Set Logs的数据合并到Remembered Set中,这阶段需要停顿线程,但是可并行执行。

  • 筛选回收(Live Data Counting and Evacuation)

筛选回收阶段首先对各个Region的回收价值和成本进行排序,根据用户所期望的GC停顿时间来制定回收计划,这个阶段其实也可以做到与用户程序一起并发执行,但是因为只回收一部分Region,时间是用户可控制的,而且停顿用户线程将大幅提高收集效率。

Java7中已经将运行时常量池从永久代移除,在Java 堆(Heap)中开辟了一块区域存放运行时常量池。
Java8中,已经彻底没有了永久代,将方法区直接放在一个与堆不相连的本地内存区域,这个区域被叫做元空间。
JDK8中把存放元数据中的永久内存从堆内存中移到了本地内存(native memory)中,这样永久内存就不再占用堆内存,它可以通过自动增长来避免JDK7以及前期版本中常见的永久内存错误(Java.lang.OutOfMemoryError: PermGen)。

JDK8也提供了一个新的设置Matespace内存大小的参数:-XX:MaxMetaspaceSize=128m

注意:如果不设置JVM将会根据一定的策略自动增加本地元内存空间。如果你设置的元内存空间过小,你的应用程序可能得到以下错误:java.lang.OutOfMemoryError: Metadata space

对象的内存分配,基本上主要是在堆上分配。

默认垃圾回收器

jdk1.7 默认垃圾收集器Parallel Scavenge(新生代)+Parallel Old(老年代)

jdk1.8 默认垃圾收集器Parallel Scavenge(新生代)+Parallel Old(老年代)

jdk1.9 默认垃圾收集器G1

-XX:+PrintCommandLineFlagsjvm参数可查看默认设置收集器类型
-XX:+PrintGCDetails亦可通过打印的GC日志的新生代、老年代名称判断

查看默认GC:java -XX:+PrintCommandLineFlags -version

JVM内存模型详解

发表于 2018-05-10 | 分类于 java

Java字节码是在JRE中运行(JRE: Java 运行时环境)。JVM则是JRE中的核心组成部分,承担分析和执行Java字节码的工作,而Java程序员通常并不需要深入了解JVM运行情况就可以开发出大型应用和类库。

JVM = 类加载器(classloader) + 执行引擎(execution engine) + 运行时数据区域(runtime data area)

JVM的基本特性:

基于栈(Stack-based)的虚拟机: 不同于Intel x86和ARM等比较流行的计算机处理器都是基于寄存器(register)架构,JVM是基于栈执行的。

符号引用(Symbolic reference): 除基本类型外的所有Java类型(类和接口)都是通过符号引用取得关联的,而非显式的基于内存地址的引用。

垃圾回收机制: 类的实例通过用户代码进行显式创建,但却通过垃圾回收机制自动销毁。

通过明确清晰基本类型确保平台无关性: 像C/C++等传统编程语言对于int类型数据在同平台上会有不同的字节长度。JVM却通过明确的定义基本类型的字节长度来维持代码的平台兼容性,从而做到平台无关。

网络字节序(Network byte order): Java class文件的二进制表示使用的是基于网络的字节序(network byte order)。为了在使用小端(little endian)的Intel x86平台和在使用了大端(big endian)的RISC系列平台之间保持平台无关,必须要定义一个固定的字节序。JVM选择了网络传输协议中使用的网络字节序,即基于大端(big endian)的字节序。

每个JVM都有两种机制:

类装载子系统:装载具有适合名称的类或接口
执行引擎:负责执行包含在已装载的类或接口中的指令

每个JVM都包含:

方法区、Java堆、Java栈、本地方法栈、程序计数器

20200410175431

20200410175453

程序计数器(Program Counter Register)

线程私有,它的生命周期与线程相同。可以看做是当前线程所执行的字节码的行号指示器。

Java虚拟机栈(JVM Stacks)

线程私有的,它的生命周期与线程相同。

虚拟机栈描述的是Java方法执行的内存模型:每个方法被执行的时候都会同时创建一个栈帧(Stack Frame)用于存储局部变量表、操作栈、动态链接、方法出口等信息。

每一个方法被调用直至执行完成的过程,就对应着一个栈帧在虚拟机栈中从入栈到出栈的过程。

局部变量表存放了编译期可知的各种基本数据类型(boolean、byte、char、short、int、float、long、double)、对象引用(reference类型),它不等同于对象本身,根据不同的虚拟机实现,它可能是一个指向对象起始地址的引用指针,也可能指向一个代表对象的句柄或者其他与此对象相关的位置)和returnAddress类型(指向了一条字节码指令的地址)。局部变量表所需的内存空间在编译期间完成分配,当进入一个方法时,这个方法需要在帧中分配多大的局部变量空间是完全确定的,在方法运行期间不会改变局部变量表的大小。

该区域可能抛出以下异常:

当线程请求的栈深度超过最大值,会抛出 StackOverflowError 异常;
栈进行动态扩展时如果无法申请到足够内存,会抛出 OutOfMemoryError 异常。

本地方法栈(Native Method Stacks)

与虚拟机栈非常相似,其区别不过是虚拟机栈为虚拟机执行Java方法(也就是字节码)服务,而本地方法栈则是为虚拟机使用到的Native 方法服务。

Java堆(Heap)

被所有线程共享,在虚拟机启动时创建,用来存放对象实例,几乎所有的对象实例都在这里分配内存。

对于大多数应用来说,Java堆(Java Heap)是Java虚拟机所管理的内存中最大的一块。

Java堆是垃圾收集器管理的主要区域,因此很多时候也被称做”GC堆”。如果从内存回收的角度看,由于现在收集器基本都是采用的分代收集算法,所以Java堆中还可以细分为:新生代和老年代;新生代又有Eden空间、From Survivor空间、To Survivor空间三部分。

Java 堆不需要连续内存,并且可以通过动态增加其内存,增加失败会抛出 OutOfMemoryError 异常。

方法区(Method Area)

用于存放已被加载的类信息、常量、静态变量、即时编译器编译后的代码等数据。

和 Java 堆一样不需要连续的内存,并且可以动态扩展,动态扩展失败一样会抛出 OutOfMemoryError 异常。

对这块区域进行垃圾回收的主要目标是对常量池的回收和对类的卸载,但是一般比较难实现,HotSpot 虚拟机把它当成永久代(Permanent Generation)来进行垃圾回收。

运行时常量池(Runtime Constant Pool)

运行时常量池是方法区的一部分。

Class 文件中的常量池(编译器生成的各种字面量和符号引用)会在类加载后被放入这个区域。

除了在编译期生成的常量,还允许动态生成,例如 String 类的 intern()。这部分常量也会被放入运行时常量池。

在 JDK1.7之前,HotSpot 使用永久代实现方法区;从 JDK1.7 开始HotSpot 开始移除永久代。其中符号引用(Symbols)被移动到 Native Heap中,字符串常量和类引用被移动到 Java Heap中。

在 JDK1.8 中,永久代已完全被元空间(Meatspace)所取代。元空间的本质和永久代类似,都是对JVM规范中方法区的实现。不过元空间与永久代之间最大的区别在于:元空间并不在虚拟机中,而是使用本地内存。因此,默认情况下,元空间的大小仅受本地内存限制。

直接内存(Direct Memory)

直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,但是这部分内存也被频繁地使用,而且也可能导致OutOfMemoryError 异常出现。

在 JDK 1.4 中新加入了 NIO 类,引入了一种基于通道(Channel)与缓冲区(Buffer)的 I/O方式,它可以使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆里的 DirectByteBuffer 对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在Java 堆和 Native 堆中来回复制数据。

20200410175738

Java中的内存分配:Java程序在运行时,需要在内存中的分配空间。为了提高运算效率,就对数据进行了不同空间的划分,因为每一片区域都有特定的处理数据方式和内存管理方式。具体划分为如下5个内存空间:

  • 栈:存放局部变量
  • 堆:存放所有new出来的东西
  • 方法区:被虚拟机加载的类信息、常量、静态常量等。
  • 程序计数器(和系统相关)
  • 本地方法栈

1、程序计数器:每个线程拥有一个PC寄存器;在线程创建时创建;指向下一条指令的地址;执行本地方法时,PC的值为undefined。

2、方法区:
保存装载的类信息:类型的常量池;字段,方法信息;方法字节码;通常和永久区(Perm)关联在一起。

3、堆内存:和程序开发密切相关,应用系统对象都保存在Java堆中,所有线程共享Java堆;对分代GC来说,堆也是分代的;GC管理的主要区域。

4、栈内存:线程私有,生命周期和线程相同;栈由一系列帧组成(因此Java栈也叫做帧栈);帧保存一个方法的局部变量、操作数栈、常量池指针;每一次方法调用创建一个帧,并压栈。

Java虚拟机栈描述的是Java方法执行的内存模型:每个方法被调用的时候都会创建一个栈帧,用于存储局部变量表、操作栈、动态链接、方法出口等信息。每一个方法被调用直至执行完成的过程就对应着一个栈帧在虚拟机中从入栈到出栈的过程。在Java虚拟机规范中,对这个区域规定了两种异常情况:

1)如果线程请求的栈深度太深,超出了虚拟机所允许的深度,就会出现StackOverFlowError(比如无限递归。因为每一层栈帧都占用一定空间,而 Xss 规定了栈的最大空间,超出这个值就会报错)

2)虚拟机栈可以动态扩展,如果扩展到无法申请足够的内存空间,会出现OOM

深度好文:https://www.cnblogs.com/qianguyihao/p/4748392.html

20200410175929

断点续传和多线程下载原理(转)

发表于 2018-04-29 | 分类于 网络协议

断点续传和多线程下载的实现原理

HTTP协议的GET方法,支持只请求某个资源的某一部分;

  • 206 Partial Content 部分内容响应;
  • Range 请求的资源范围;
  • Content-Range 响应的资源范围;
  • 在连接断开重连时,客户端只请求该资源未下载的部分,而不是重新请求整个资源,来实现断点续传。

分块请求资源实例:

  • Eg1:Range: bytes=306302- :请求这个资源从306302个字节到末尾的部分;
  • Eg2:Content-Range: bytes 306302-604047/604048:响应中指示携带的是该资源的第306302-604047的字节,该资源共604048个字节;

客户端通过并发的请求相同资源的不同片段,来实现对某个资源的并发分块下载。从而达到快速下载的目的。目前流行的FlashGet和迅雷基本都是这个原理。

多线程下载的原理

  • 下载工具开启多个发出HTTP请求的线程;
  • 每个http请求只请求资源文件的一部分:Content-Range: bytes 20000-40000/47000;
  • 合并每个线程下载的文件。

JAVA8 对集合的增强

发表于 2018-04-22 | 分类于 java

Predicate 谓词对象

Java8起为Collection集合新增了一个removeIf(Predicate)方法,该方法将会批量删除符合filter条件的所有元素。该方法需要一个Predicate对象作为参数

removeIf(Predicate pre) 该方法将会批量删除符合条件的所有元素

1
2
3
4
5
6
7
8
9
hashSet.removeIf(new Predicate<String>() {
@Override
public boolean test(String s) {
if (s.equals("hu")) {
return true;
}
return false;
}
});

Predicate 简化集合操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
System.out.println("list中包含a的个数" + total(list, ele -> ele.contains("a")));
System.out.println("长度大于5的元素个数" + total(list, new Predicate<String>() {
@Override
public boolean test(String s) {
if (s.length() > 5) {
return true;
}
return false;
}
}));
System.out.println("集合中包含c的元素个数" + total(list, ele -> ele.contains("c")));
}

public static int total(Collection<String> collection, Predicate<String> p) {
int total = 0;
for (String s : collection) {
if (p.test(s)) {
total++;
}
}
return total;
}

Steam 流式API

Java8新增了Stream、InStream、LongStream、DoubleStream等流式API,这些API代表多个支持串行和并行聚集操作的元素。

Java8为上面每个流式API提供了对应的Builder

Stream 通用接口 Stream.Builder

IntSteam 对应的int类型的流 IntStream.Builder

LongStream 对应的long类型的流 LongStream.Builder

DoubleStream 对应的double类型的流 DoubleStream.Builder

使用Stream步骤

1、使用Stream或XxxStream的builder()类方法创建该Stream对应的Builder

2、重复调用Builder的add()方法向该流中添加多个元素

3、Builder的build()获取对应的Stream

4、调用Stream的聚集方法(Stream提供了大量的聚集方法供用户调用 ) 每个Stream只能调用一次聚集方法,Stream调用聚集方法后流就会关闭,并且不可重用。

  • 中间方法:中间操作允许流保持打开状态,并允许直接调用后续方法。
  • 末端方法:末端方法是对流的最终操作。对某个流操作执行末端方法后,该流将被消耗,且不再可用。例如sum() count() average()等都是末端方法
  • 有状态的方法:这种方法会给流增加一些新的属性。例如元素的唯一性、元素的最大数量、保证元素以排序的方式被处理等。
  • 有状态的方法往往需要更大的性能开销
  • 短路方法:短路方法可以尽早结束对流的操作,不必检查所有的元素

Stream API

中间方法

filter(Predicate predicate) 过滤Stream中所有不符合Predicate的元素(也就是说返回一个符合Predicate条件的流)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
IntStream.Builder builder = IntStream.builder();
builder.add(1);
builder.add(2);
builder.add(3);
builder.add(4);

IntStream stream = builder.build();
IntStream filterStream = stream.filter(new IntPredicate() {
@Override
public boolean test(int value) {
if (value % 2 == 0) {
return false;
}
return true;
}
});
filterStream.forEach(i -> System.out.println(i));
filterStream.close();

mapToXxx(ToXxxFunction mapper): 使用ToXxxFunction对流中的元素执行一对一的转换。该方法返回一个新Stream中包含了ToXxxFunction转换生成的所有元素

1
2
3
4
5
6
7
8
9
10
11
12
LongStream.Builder builder = LongStream.builder();
builder.add(4l);
builder.add(10l);
builder.add(9l);
builder.add(7l);
IntStream intStream = builder.build().mapToInt(new LongToIntFunction() {
@Override
public int applyAsInt(long value) {
return (int) (value + 2);
}
});
intStream.forEach(value -> System.out.println(value));

peek(Cosumer action) 依次对每个元素执行一些操作,返回的流与原有流包含相同的元素

distinct():该方法用于排序流中所有重复的元素(判断重复依据是equels())。返回由该流的不同元素组成的流

sorted()该方法用于排序流中的元素在后续的访问中处于有序状态。是有状态的方法

limit(long maxSize) 该方法用于保证对该流的后续访问中最大允许的访问的元素个数。是一个有状态的、短路的方法。

末端方法

forEach(Consumer action) 遍历流中所有元素,对每个元素执行action

toArray():将流中所有元素转换为一个数组

reduce():用于通过某种操作来合并流中的元素

min():返回流中所有元素的最小值

max():返回流中所有元素的最大值

count():返回流中所有元素的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
IntStream.Builder builder = IntStream.builder();
builder.add(1);
builder.add(2);
builder.add(3);
builder.add(4);
builder.add(5);
IntStream intStream = builder.build();
OptionalInt max = intStream.max();
int min = intStream.min().getAsInt();
int sum = intStream.sum();
long count = intStream.count();
double average = intStream.average().getAsDouble();
System.out.println("最大值"+max.getAsInt());
System.out.println("最小值" + min);
System.out.println("元素个数"+count);
System.out.println("元素总和"+sum);
System.out.println("所有元素的平均值" + average);

anyMatch(Predicate predicate) 判断流中是否至少包含一个元素符合predicate条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
IntStream.Builder builder = IntStream.builder();
builder.add(1);
builder.add(2);
builder.add(3);
builder.add(4);
builder.add(5);
builder.add(6);
IntStream build = builder.build();
boolean b = build.anyMatch(new IntPredicate() {
@Override
public boolean test(int value) {
if (value % 2 == 0) {
return true;
}
return false;
}
});

allMatch(Predicate predicate): 判断流中是否每个元素都符合predicate条件

1
2
3
4
5
6
7
8
9
10
boolean allMatch = build.allMatch(new IntPredicate() {
@Override
public boolean test(int value) {
if (value % 1 == 0) {
return true;
}
return false;
}
});
System.out.println("all"+allMatch);

anyMatch(Predicate predicate):判断流中是否每个元素都符合Predicate条件

1
2
3
4
5
6
7
8
9
 boolean b = build.anyMatch(new IntPredicate() {
@Override
public boolean test(int value) {
if (value % 2 == 0) {
return true;
}
return false;
}
});

noneMatch(Predicate predicate) 判断流中是否所有元素都不符合predicate条件

1
2
3
4
5
6
7
8
9
10
boolean b = build.noneMatch(new IntPredicate() {
@Override
public boolean test(int value) {
if (value / 7 == 0) {
return true;
}
return false;
}
});
System.out.println(b);

findFirst() 返回流中的第一个元素

findAny() 返回流中的任意一个元素

1
2
3
4
5
6
7
8
9
LongStream.Builder builder = LongStream.builder();
builder.add(10);
builder.add(20);
builder.add(30);
builder.add(40);
builder.add(50);
builder.add(60);
LongStream longStream = builder.build();
System.out.println(longStream.findAny().getAsLong());

Stream操作集合

Java8使用流式API操作集合,Collection接口中提供了一个stream()默认方法,该方法可以返回该集合对应的流,接下来即可通过流式API来操作

通过Map获取 Map不能直接获取流,必须通过键的集合、键值对的集合获取Set的流

1
2
3
4
5
6
7
8
Map<Integer,String> map = new HashMap<>();
……
//1.获取“键”的流:
Stream<Integer> keysSteram = map.keySet().stream();
//2.获取“值”的流:
Stream<String> valueStream = map.values().stream();
//3.获取“键值对”的流:
Stream<Map.Entry<Integer,String>> entryStream = map.entrySet().stream();

通过引用类型的数组获取流

1
2
Integer[] arr = {1,2,3,432,3,2,324,32};
Stream<Integer> intStream = Stream.<Integer>of(arr);

通过基本类型的数组获取流

1
2
int[] arr = {1,4,234,32,32,523,432};
IntStream intStream = IntStream.of(arr);

通过零散的数据获取流

1
Stream<Integer> intStream = Stream.of(1,3,2,43,24,325,43,324,325,24,2);
常用方法

filter方法 过滤

1
2
3
LinkedList<String> list = new LinkedList<>();
list.stream().filter(s -> s.startsWith("张"))
.forEach(s -> System.out.println(s));

count方法_统计个数

1
2
3
4
long zCount = list.stream()
.filter(s -> s.startsWith("张"))
.count();
System.out.println("张姓学员数量有"+zCount);

limit (long maxSize) 该方法用于保证对该流的后续访问中最大允许访问的元素个数,短路的有状态的方法

1
2
3
4
list.stream()
.filter(s -> s.startsWith("张"))
.limit(3)
.forEach(s -> System.out.println(s));

skip方法_跳过前几个

1
2
3
4
list.stream()
.filter(s->s.startsWith("张"))
.skip(2)
.forEach(s-> System.out.println(s));

map方法_转换

1
2
3
4
5
LinkedList<String> list = new LinkedList<>();
list.add("10");
list.add("26");
list.stream().map(s -> Integer.parseInt(s) + 5)
.forEach(s -> System.out.println(s));

concat方法_组合

1
2
3
List<String> list1 = new ArrayList<>();
List<String> list2 = new ArrayList<>();
Stream.concat(list1.stream(), list2.stream()).forEach(s -> System.out.println(s));

distinct() 用于排除流中所有重复的元素,去重使用,有状态的方法

1
2
3
4
5
LinkedList<String> linkedList = new LinkedList<>();
linkedList.add("s1");
linkedList.add("s1");
linkedList.add("s6");
linkedList.stream().distinct().forEach(s -> System.out.println(s));

sorted() 排序方法

1
2
3
4
5
LinkedList<Integer> integers = new LinkedList<>();
integers.add(1);
integers.add(2);
//使用sorted排序
integers.stream().sorted().forEach(s -> System.out.println(s));

min() 返回流中所有元素的最小值

1
2
3
4
5
6
7
8
9
10
11
LinkedList<Integer> integers = new LinkedList<>();
integers.add(1);
integers.add(2);
integers.add(5000);
Integer minInt = integers.stream().min(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
}).get();
System.out.println("最小值为" + minInt);

max()返回流中所有元素的最大值

1
2
3
4
5
6
Integer minInt = integers.stream().max(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1 - o2;
}
}).get();

anyMatch(Predicate pre) 判断流中是否至少包含一个元素符合Predicate条件,返回true代表有元素包含,返回false代表没有元素包含

1
2
3
4
5
6
7
8
LinkedList<Integer> list = new LinkedList<>();
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.add(5);
boolean match = list.stream().anyMatch(i -> i > 2);
System.out.println(match);

allMatch(Predicate predicate) 判断流中是否每个元素都符合条件

1
2
3
4
5
6
7
8
LinkedList<Integer> list = new LinkedList<>();
list.add(2);
list.add(4);
list.add(6);
list.add(8);
//查看是否元素全部包含
boolean b = list.stream().allMatch(i -> i % 2 == 0);
System.out.println("集合中全部元素是否" + b);

noneMatch(Predicate predicate) 判断流中是否所有元素都不符合Predicate条件

1
2
3
4
5
6
7
8
LinkedList<Integer> integers = new LinkedList<>();
integers.add(1);
integers.add(2);
integers.add(3);
integers.add(4);
integers.add(5);
boolean b = integers.stream().noneMatch(i -> i > 6);
System.out.println(b);

findFirst() 返回流中的第一个元素

1
2
3
4
5
6
7
LinkedList<Integer> integers = new LinkedList<>();
integers.offer(2);
integers.offer(4);
integers.offer(6);
integers.offer(8);
Integer firtst = integers.stream().findFirst().get();
System.out.println(firtst);

findAny() 返回流中的任意一个元素

1
2
3
4
5
6
LinkedList<Integer> list = new LinkedList<>();
list.add(2);
list.add(4);
list.add(6);
list.add(8);
Integer integer = list.stream().findAny().get();

Stream流的结果收集到集合和数组中

1
2
3
collect(Collectors.toList()) 将流的结果转化为list
collect(Collectors.toSet()) 将流的结果转化为Set
toArray() 将流的结果提取到数组

IDEA常用配置和快捷键

发表于 2018-04-21 | 分类于 工具

使用

全局Maven(默认配置)

20200421164519

版本控制Git/Svn (默认配置)

IDEA内置的Git插件灰常好用,尤其是解决冲突性的代码。另外Git客户端推荐SourceTree。
20200421164547

自动导包和智能移除 (默认配置)

20200421164617

Tomcat Server

20200421164637

自动编译

开启自动编译之后,结合Ctrl+Shift+F9 会有热更新效果。

20200421164659

20200421164720

20200421164729

取消大小写敏感

具体步骤:

File | Settings | Editor | General | Code Completion Case | Sensitive Completion = None
取消大小敏感,在编写代码的时候,代码的自动提示将更加全面和丰富。

20200421164816

调整字体

20200421164837

关闭Intellij IDEA自动更新

20200421164919

文件编码设置

20200421164936

类和方法注释模板

1.修改类注释模板
在File->Settings->Editor->File and Code Templates下分别修改Class,Interface,Enum等注释模板,Class模板部分修改如下,其余的举一反三进行修改。

2.方法注释模板修改
在File->Settings->Editor->Live Templates下添加自定义Template Group,并在自定义Template Group下添加自定义Template

生成serialVersionUID

默认情况下Intellij IDEA关闭了继承了Java.io.Serializable的类生成serialVersionUID的警告,如果需要提示生成serialVersionUID,那么需要做以下设置:在File->Settings->Editor->Inspections下勾选中Java->Serialization issues->Serializable class without ‘serialVersionUID’,将光标放到类名上按Atl+Enter键就会提示生成serialVersionUID了

快捷键

1. 按【鼠标中键】快速打开智能提示,取代alt+enter 。

File->Settings-> Keymap-> 搜索 Show Intention Actions -> 添加快捷键为鼠标中键。

2. 按【F2】快速修改文件名,告别双手操作。

File->Settings-> Keymap-> 搜索 Rename -> 将快捷键设置为F2 。

3. 按【F3】直接打开文件所在目录,浏览一步到位。

File->Settings-> Keymap-> 搜索 Show In Explorer -> 将快捷键设置为F3 。

4. 按【Ctrl+右键】直接打开实现类,方便开发查询。

File->Settings-> Keymap-> 搜索 implementation->  Add Mouse Shortcut 将快捷键设置为Ctrl+ 鼠标右键。

Maven自动下载源码包,告别反编译,直接上源码注释!!

File | Settings | Build, Execution, Deployment | Build Tools | Maven | Importing

将Automatically Download  的 Source 勾上


快捷键        介绍
Ctrl + B        进入光标所在的方法/变量的接口或是定义处,等效于Ctrl + 左键单击
Ctrl + D        复制光标所在行或复制选择内容,并把复制内容插入光标位置下面
Ctrl + F        在当前文件进行文本查找
Ctrl + H        查看类的继承结构
Ctrl + N        通过类名定位文件
Ctrl + O        快速重写父类方法
Ctrl + P        方法参数提示
Ctrl + Y        删除光标所在行或删除选中的行
Ctrl + W        递进式选择代码块
Ctrl + Z        撤销
Ctrl + 1,2,3…9        定位到对应数值的书签位置 结合Ctrl + Shift + 1,2,3…9使用
Ctrl + F1        在光标所在的错误代码出显示错误信息
Ctrl + F12        弹出当前文件结构层,可以在弹出的层上直接输入进行筛选
Ctrl + Space        基础代码补全默认在Windows系统上被输入法占用,需要进行修改,建议修改为Ctrl + 逗号
Ctrl + /        注释光标所在行代码,会根据当前不同文件类型使用不同的注释符号
Alt相关
快捷键        介绍
Alt + Q        弹出一个提示,显示当前类的声明/上下文信息
Alt + Enter        根据光标所在问题,提供快速修复选择
Shift相关
快捷键        介绍
Shift + F3        在查找模式下,定位到上一个匹配处
Ctrl+Alt相关
快捷键        介绍
Ctrl + Alt + B        在某个调用的方法名上使用会跳到具体的实现处
Ctrl + Alt + L        格式化代码 可以对当前文件和整个包目录使用
Ctrl + Alt + M        快速抽取方法
Ctrl + Alt + O        优化导入的类和包 可以对当前文件和整个包目录使用
Ctrl + Alt + T        对选中的代码弹出环绕选项弹出层
Ctrl + Alt + V        快速引进变量
Ctrl + Alt + F7        寻找类或是变量被调用的地方,以弹出框的方式显示
Ctrl + Alt + 左方向键        退回到上一个操作的地方
Ctrl + Alt + 右方向键        前进到上一个操作的地方
Ctrl+Shift相关
快捷键        介绍
Ctrl + Shift + F        根据输入内容查找整个项目或指定目录内文件
Ctrl + Shift + H        查看方法的继承结构
Ctrl + Shift + J        自动将下一行合并到当前行末尾
Ctrl + Shift + N        通过文件名定位打开文件/目录,打开目录需要在输入的内容后面多加一个正斜杠
Ctrl + Shift + R        根据输入内容替换对应内容,范围为整个项目或指定目录内文件
Ctrl + Shift + U        对选中的代码进行大/小写轮流转换
Ctrl + Shift + W        递进式取消选择代码块
Ctrl + Shift + Z        取消撤销
Ctrl + Shift + /        代码块注释
Ctrl + Shift + +        展开所有代码
Ctrl + Shift + -        折叠所有代码
Ctrl + Shift + 1,2,3…9        快速添加指定数值的书签
Ctrl + Shift + F7        高亮显示所有该选中文本,按Esc高亮消失
Ctrl + Shift + Space        智能代码提示
Ctrl + Shift + Enter        自动结束代码,行末自动添加分号
Alt+Shift相关
快捷键        介绍

Ctrl+Alt+Shift相关
快捷键        介绍

其他
快捷键        介绍
F2        跳转到下一个高亮错误或警告位置
F3        在查找模式下,定位到下一个匹配处
F4        编辑源

Git安装使用

发表于 2018-04-21 | 分类于 工具

linux安装Git

yum -y install gcc openssl openssl-devel curl curl-devel unzip perl perl-devel expat expat-devel zlib zlib-devel asciidoc xmlto gettext-devel openssh-clients
[root@localhost local]# wget https://github.com/git/git/archive/v2.21.0.tar.gz
[root@localhost local]# cd git-2.21.0/
[root@localhost git-2.21.0]# make prefix=/usr/local/git all
耐心等待编译即可
安装Git至/usr/local/git路径,命令为 make prefix=/usr/local/git install
配置环境变量vim /etc/profile 加入:PATH=$PATH:/usr/local/git/bin
git --version ,查看安装的git版本,校验通过,安装成功。

第二种,直接yum安装

[root@localhost ~]# yum install -y git
git数据交互是基于ssh的,查看是否开启了ssh服务,
[root@localhost ~]# ps -ef|grep sshd
root       1841      1  0 06:17 ?        00:00:00 /usr/sbin/sshd
root       4484   1841  0 07:40 ?        00:00:00 sshd: root@pts/0
root       4517   4488  5 07:48 pts/0    00:00:00 grep sshd
[root@localhost ~]# git --version
git version 1.7.1

服务器端创建 git 用户,用来管理 Git 服务,并为 git 用户设置密码

配置用户名、邮箱、Windows提交到Linux上是否自动转换换行符、字符集
[root@localhost ~]# git config --global user.name "hu"
[root@localhost ~]# git config --global user.email "404914989@qq.com"
[root@localhost ~]# git config --global core.autocrlf false
[root@localhost ~]# git config --global gui.encoding utf-8         
此时$HOME目录下会新建一个.gitconfig文件
创建用户专门管理代码仓库
[root@localhost ~]# useradd -m git
[root@localhost ~]# passwd git
禁止 git 用户 ssh 登录服务器
编辑 /etc/passwd
git:x:502:504::/home/git:/bin/bash修改为git:x:502:504::/home/git:/bin/git-shell

建立一个共享的仓库,只能接受push/pull ,不能本地commit

[root@localhost git]# mkdir data
[root@localhost git]# cd data
[root@localhost data]# git init --bare
Initialized empty Git repository in /home/git/data/
赋权
[root@localhost git]# chown -R git:git data/

客户端clone远程仓库
在客户端中进如一个地址右键Git Bash进入

克隆

$ git clone git@192.168.1.110:/home/git/data
Cloning into 'data'...

当第一次连接到目标 Git 服务器时会得到一个提示:

The authenticity of host '192.168.1.110 (192.168.1.110)' can't be established.
RSA key fingerprint is SHA256:iSLdvstPeZt4ZCsHQ/muhRpPPJmBhJKnB/rqOfRSJEc.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '192.168.1.110' (RSA) to the list of known hosts.
git@192.168.1.110's password:
warning: You appear to have cloned an empty repository.
此时 C:\Users\用户名\.ssh 下会多出一个文件 known_hosts,以后在这台电脑上再次连接目标 Git 服务器时不会再提示上面的语句。

后面提示要输入密码,可以采用 SSH 公钥来进行验证。

通过SSH认证

在客户端创建公钥和私钥,例如在Windows上安装git操作,打开Git Bash
$ ssh-keygen -t rsa -C "404914989@qq.com"
Generating public/private rsa key pair.
Enter file in which to save the key (/c/Users/Administrator/.ssh/id_rsa):
/c/Users/Administrator/.ssh/id_rsa already exists.
Overwrite (y/n)? y
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /c/Users/Administrator/.ssh/id_rsa.
Your public key has been saved in /c/Users/Administrator/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:o794/GehcD8LpFmAkhNOQe/sX54NLzKfGoXaSeNJlC0 404914989@qq.com
The key's randomart image is:
+---[RSA 2048]----+
|   .=.           |
|   o + . o       |
|    = o E .      |
|     = . +       |
|      o S +      |
|     . B.@. .    |
|      +.Oo+o .   |
|       +=+oB=    |
|      ..=B*++o   |
+----[SHA256]-----+

此时 C:\Users\用户名.ssh 下会多出两个文件 id_rsa 和 id_rsa.pub

id_rsa 是私钥
id_rsa.pub 是公钥

服务端git开启rsa认证

[root@localhost ~]# vim /etc/ssh/sshd_config

开打如下行:

      RSAAuthentication yes
      PubkeyAuthentication yes
      AuthorizedKeysFile .ssh/authorized_keys
重启:

[root@localhost ~]# /etc/rc.d/init.d/sshd restart
Stopping sshd:                                             [  OK  ]
Starting sshd:                                             [  OK  ]

公钥的存放路径 .ssh/authorized_keys实际上是 $Home/.ssh/authorized_keys,由于管理 Git 服务的用户是 git,所以实际存放公钥的路径是 /home/git/.ssh/authorized_keys。

[root@localhost git]# mkdir .ssh
[root@localhost git]# ls -a
.  ..  .bash_logout  .bash_profile  .bashrc  data  .gnome2  .mozilla  .ssh

修改目录所有者:因为我们创建了git用户管理代码,把.ssh的权限设置为git。用户组也是git。

[root@localhost git]# chown -R git:git .ssh
[root@localhost git]# ll -a
....
drwxr-xr-x. 2 git  git  4096 Apr  6 04:49 .ssh

将客户端公钥导入服务器端 /home/git/.ssh/authorized_keys 文件

在客户端通过Git Bash操作如下:

$ ssh git@192.168.1.110 'cat >> .ssh/authorized_keys' < ~/.ssh/192.168.1.110.git.hub.pub
git@192.168.1.110's password:
需要输入服务端git密码
或者直接创建文件 ,复制公钥保存文件的方式:
[root@localhost git]# cd .ssh/
[root@localhost .ssh]# touch authorized_keys
[root@localhost .ssh]# cd ..
将客户端的公钥id_rsa.pub内容导入服务器authorized_keys文件里
[root@localhost git]# vim .ssh/authorized_keys

回到服务器查看否存在 authorized_keys 文件
[root@localhost git]# ll .ssh/
-rw-rw-r--. 1 git git 742 Apr  6 04:57 authorized_keys
设置权限:
修改 .ssh 目录的权限为 700,修改 .ssh/authorized_keys 文件的权限为 600
[root@localhost git]# chmod 700 .ssh
[root@localhost git]# cd .ssh/
[root@localhost .ssh]# chmod 600 authorized_keys

再次clone测试

$ git clone git@192.168.1.110:/home/git/data
Cloning into 'data'...
warning: You appear to have cloned an empty repository.

也可以通过界面工具clone

上传项目到git

例如windows上安装好git,进入需要上传的项目里,如:G:\Java\idea-maindir\maven-jenkins-test
右键Git Bash进入命令行

1、把项目变成可以提交的项目
$ git init
2、把文件添加到版本库中,使用命令 git add .添加到暂存区里面去,不要忘记后面的小数点".",意为添加文件夹下的所有文件
$ git add .
3、用命令 git commit告诉Git,把文件提交到仓库。引号内为提交说明
$ git commit -m 'first commit'
4、关联到远程库
$ git remote add origin 你的远程库地址
如:$ git remote add origin git@192.168.1.110:/home/git/data
    如果出错
$ git remote add origin git@192.168.1.110:/home/git/data
fatal: remote origin already exists.
    先删除远程 Git 仓库
    $ git remote rm origin
    再添加远程 Git 仓库
    如果执行 git remote rm origin 报错的话,我们可以手动修改gitconfig文件的内容
    $ vi .git/config
    把 [remote "origin"] 那一行删掉就好了。
5、获取远程库与本地同步合并(如果远程库不为空必须做这一步,否则后面的提交会失败)
$ git pull --rebase origin master
6、把本地库的内容推送到远程,使用 git push命令,实际上是把当前分支master推送到远程。
$ git push -u origin master
git@192.168.1.110's password:
Enumerating objects: 19, done.
Counting objects: 100% (19/19), done.
Delta compression using up to 4 threads
Compressing objects: 100% (15/15), done.
Writing objects: 100% (19/19), 5.59 KiB | 817.00 KiB/s, done.
Total 19 (delta 0), reused 0 (delta 0)
To 192.168.1.110:/home/git/data
* [new branch]      master -> master
Branch 'master' set up to track remote branch 'master' from 'origin'.

7、状态查询命令
$ git status

注意:

客户端使用Git,客户机安装git后,创建公钥和私钥,公钥添加到服务器ssh中。

和使用GitHub类似,比如登录GitHub,点击setting>SSH and GPG keys>New SSH Key 将生成的密钥复制到Key中即可

测试是否配置成功ssh -T git@github.com

Hi RoninLee! You’ve successfully authenticated, but GitHub does not provide shell access.

出现这样一段话,即证明配置成功。

代理模式

发表于 2018-04-18 | 分类于 设计模式

代理设计模式:

代理对象 增强后的对象

目标对象 被增强的对象

分为静态代理和动态代理

静态代理

实现方式:继承或聚合(实现接口)

动态代理

模拟的动态代理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package com.hu.proxy;

import javax.tools.JavaCompiler;
import javax.tools.StandardJavaFileManager;
import javax.tools.ToolProvider;
import java.io.File;
import java.io.FileWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.net.URL;
import java.net.URLClassLoader;

public class Proxy {

private static String LINE = "\n";
private static String TAB = "\t";
private static String PATH = "d:\\proxy\\com\\hu\\";

/**
* 步骤:
* 1、通过拼接字符串生成(io)Java代码文件
* 2、动态编译
* 3、类加载
* 4、反射生成代理对象
* <p>
* 注:此处处理实现一个接口的代理
*/
public static Object newInstance(Object target) throws Exception {
Object proxy = null;
StringBuilder code = new StringBuilder();

Class targetInterface = target.getClass().getInterfaces()[0];
Method methods[] = targetInterface.getDeclaredMethods();

String className = target.getClass().getSimpleName() + "$Proxy";
PATH += className + ".java";

String targetInterfaceName = targetInterface.getSimpleName();
//生成类字符串
code.append("package com.hu;" + LINE);
code.append("import " + targetInterface.getName() + ";" + LINE);
code.append("public class " + className + " implements " + targetInterfaceName + "{" + LINE);

code.append(TAB + "private " + targetInterfaceName + " target;" + LINE);
code.append(TAB + "public " + className + " (" + targetInterfaceName + " target){" + LINE);
code.append(TAB + TAB + "this.target =target;");
code.append(LINE + TAB + "}" + LINE);

String methodStr = getMethods(methods);
code.append(methodStr);
code.append("}");

//创建类
File java = cteateFile(code.toString(), PATH);
//编译
compiler(java);
//加载字节码
URL[] urls = new URL[]{new URL("file:D:\\proxy\\\\")};
URLClassLoader urlClassLoader = new URLClassLoader(urls);
Class clazz = urlClassLoader.loadClass("com.hu." + className);

//创建代理对象
Constructor constructor = clazz.getConstructor(targetInterface);
proxy = constructor.newInstance(target);
return proxy;
}

/**
* 获取类方法
*/
private static String getMethods(Method[] methods) {
StringBuilder methodStr = new StringBuilder();
for (Method method : methods) {
String returnTypeName = method.getReturnType().getSimpleName();
String methodName = method.getName();

Class args[] = method.getParameterTypes();
Parameter[] parameters = method.getParameters();
String argsContent = "";
String paramsContent = "";
int flag = 0;
for (Class arg : args) {
argsContent += arg.getSimpleName() + " " + parameters[flag].getName() + ",";
paramsContent += parameters[flag].getName() + ",";
flag++;
}
if (argsContent.length() > 0) {
argsContent = argsContent.substring(0, argsContent.length() - 1);
paramsContent = paramsContent.substring(0, paramsContent.length() - 1);
}
methodStr.append(TAB + "public " + returnTypeName + " " + methodName + "(" + argsContent + ") {" + LINE);
methodStr.append(TAB + TAB + "System.out.println(\"日志输出!!!!!!!!!\");" + LINE);

if ("void".equals(returnTypeName)) {
methodStr.append(TAB + TAB + "target." + methodName + "(" + paramsContent + ");" + LINE);
} else {
methodStr.append(TAB + TAB + "return target." + methodName + "(" + paramsContent + ");" + LINE);
}
methodStr.append(TAB + "}" + LINE);

}
return methodStr.toString();
}

/**
* 创建类
*/
private static File cteateFile(String code, String pathName) throws Exception {
File file = new File(pathName);
if (!file.exists()) {
file.createNewFile();
}
FileWriter fw = new FileWriter(file);
fw.write(code);
fw.flush();
fw.close();
return file;
}

/**
* 编译
*/
private static void compiler(File file) throws Exception {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();

StandardJavaFileManager fileMgr = compiler.getStandardFileManager(null, null, null);
Iterable units = fileMgr.getJavaFileObjects(file);

JavaCompiler.CompilationTask t = compiler.getTask(null, fileMgr, null, null, null, units);
t.call();
fileMgr.close();
}
}

测试:

1
2
3
4
5
6
public class Test {
public static void main(String[] args) throws Exception {
UserDao proxy = (UserDao) Proxy.newInstance(new UserDaoImpl());
System.out.println("return "+proxy.query("001", "胡"));
}
}

这种方式实现由缺点:要生成文件、动态编译文件 class、需要一个URLclassloader、软件性能的最终体现在IO操作。

升级版,简单模拟jdk动态代理

Proxy.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package com.hu.proxy1;

import javax.tools.JavaCompiler;
import javax.tools.StandardJavaFileManager;
import javax.tools.ToolProvider;
import java.io.File;
import java.io.FileWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Random;

public class Proxy {

private static String LINE = "\n";
private static String TAB = "\t";
private static String PATH = "d:\\proxy\\com\\hu\\";

public static Object newInstance(Class targetInterface, MyInvocationHandler h) throws Exception {
Object proxy = null;
StringBuilder code = new StringBuilder();

Method methods[] = targetInterface.getDeclaredMethods();

String className = targetInterface.getSimpleName() + "$Proxy" + new Random().nextInt(1);
PATH += className + ".java";

String targetInterfaceName = targetInterface.getSimpleName();
//生成类字符串
code.append("package com.hu;" + LINE);
code.append("import " + targetInterface.getName() + ";" + LINE);

code.append("import com.hu.proxy1.MyInvocationHandler;" + LINE);
code.append("import java.lang.reflect.Method;" + LINE);

code.append("public class " + className + " implements " + targetInterfaceName + "{" + LINE);

code.append(TAB + "private " + MyInvocationHandler.class.getSimpleName() + " h;" + LINE);
code.append(TAB + "public " + className + " (MyInvocationHandler h){" + LINE);
code.append(TAB + TAB + "this.h =h;");
code.append(LINE + TAB + "}" + LINE);

String methodStr = getMethods(methods, targetInterface, h);
code.append(methodStr);
code.append("}");

//创建类 这里可以直接生成字节码文件,就不需要编译。
File java = cteateFile(code.toString(), PATH);
//编译
compiler(java);
//加载字节码
URL[] urls = new URL[]{new URL("file:D:\\proxy\\\\")};
URLClassLoader urlClassLoader = new URLClassLoader(urls);
Class clazz = urlClassLoader.loadClass("com.hu." + className);

//创建代理对象
Constructor constructor = clazz.getConstructor(MyInvocationHandler.class);
proxy = constructor.newInstance(h);
return proxy;
}

/**
* 获取类方法
*/
private static String getMethods(Method[] methods, Class targetInterface, MyInvocationHandler h) {
StringBuilder methodStr = new StringBuilder();
for (Method method : methods) {
String returnTypeName = method.getReturnType().getSimpleName();
String methodName = method.getName();

Class args[] = method.getParameterTypes();
Parameter[] parameters = method.getParameters();
String argsContent = "";
String paramsContent = "";
String methodParamsTypes="";
int flag = 0;
for (Class arg : args) {
argsContent += arg.getSimpleName() + " " + parameters[flag].getName() + ",";
paramsContent += parameters[flag].getName() + ",";
methodParamsTypes += arg.getTypeName() +".class,";
flag++;
}
if (argsContent.length() > 0) {
argsContent = argsContent.substring(0, argsContent.length() - 1);
paramsContent = paramsContent.substring(0, paramsContent.length() - 1);
methodParamsTypes = methodParamsTypes.substring(0, methodParamsTypes.length() - 1);
}
methodStr.append(TAB + "public " + returnTypeName + " " + methodName + "(" + argsContent + ") throws Exception {" + LINE);

methodStr.append(TAB + TAB + "Object[] args={" + paramsContent + "};" + LINE);

methodStr.append(TAB + TAB + "Method method=Class.forName(\"" + targetInterface.getName() + "\").getDeclaredMethod(\"" + methodName + "\""+
(methodParamsTypes==""?"":(","+methodParamsTypes))
+ ");" + LINE);

if ("void".equals(returnTypeName)) {
methodStr.append(TAB + TAB + "h.invoke(method,args);" + LINE);
} else {
methodStr.append(TAB + TAB + "return (" + returnTypeName + ")h.invoke(method,args);" + LINE);
}
methodStr.append(TAB + "}" + LINE);

}
return methodStr.toString();
}

/**
* 创建类 这里可以直接生成字节码文件,就不需要编译。
*/
private static File cteateFile(String code, String pathName) throws Exception {
File file = new File(pathName);
if (!file.exists()) {
file.createNewFile();
}
FileWriter fw = new FileWriter(file);
fw.write(code);
fw.flush();
fw.close();
return file;
}

/**
* 编译
*/
private static void compiler(File file) throws Exception {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();

StandardJavaFileManager fileMgr = compiler.getStandardFileManager(null, null, null);
Iterable units = fileMgr.getJavaFileObjects(file);

JavaCompiler.CompilationTask t = compiler.getTask(null, fileMgr, null, null, null, units);
t.call();
fileMgr.close();
}
}

MyInvocationHandler

1
2
3
4
5
6
7
package com.hu.proxy1;

import java.lang.reflect.Method;

public interface MyInvocationHandler {
public Object invoke(Method method, Object[] args) throws Exception;
}

MyInvocationHandlerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.hu.proxy1;

import java.lang.reflect.Method;

public class MyInvocationHandlerImpl implements MyInvocationHandler {

private UserDao dao;

public MyInvocationHandlerImpl(UserDao userDao) {
this.dao = userDao;
}

@Override
public Object invoke(Method method, Object[] args) throws Exception {
System.out.println("proxy 自定义动态代理");
return method.invoke(dao, args);
}
}

测试:

1
2
3
4
5
6
7
8
9
10
11
package com.hu.proxy1;

public class Test {
public static void main(String[] args) throws Exception {
UserDao proxy = (UserDao) Proxy.newInstance(UserDao.class, new MyInvocationHandlerImpl(new UserDaoImpl()));
//UserDao proxy = (UserDao) java.lang.reflect.Proxy.newProxyInstance(Test.class.getClassLoader(), new Class[]{UserDao.class},new InvocationHandlerImpl(new UserDaoImpl()));
System.out.println("return " + proxy.query("001", "胡"));
System.out.println("-----------------------------------------------");
proxy.query("ddddd");
}
}
JDK动态代理

实现方法:通过Proxy的静态方法,通过接口反射得到得到字节码文件,使用ClassLoader将字节码文件加载到JVM,调用native方法生成代理对象。

RabbitMQ中的AMQP模型

发表于 2018-04-14 | 分类于 rabbitmq

AMQP全称是Advanced Message Queuing Protocol,它是一个(分布式)消息传递协议

AMQP在RabbitMQ中的基本模型

消息发布者将消息发布到交换器(Exchange)中,交换器把消息的副本分发到队列(Queue)中,分发消息的时候遵循的规则叫做绑定(Binding)。接着,消息中间件代理向订阅队列的消费者发送消息(push模式),或者消费者也可以主动从队列中拉取消息(fetch/pull模式)。

20200413155922

由于网络是不可靠的,客户端可能无法接收消息或者处理消息失败,这个时候消息中间件代理无法感知消息是否正确传递到消费者中,因此AMQP模型提供了消息确认(Message Acknowledgement)的概念:当消息传递到消费者,消费者可以自动向消息中间件代理确认消息已经接收成功或者由应用程序开发者选择手动确认消息已经接收成功并且向消息中间件代理确认消息,消息中间件代理只有在接收到该消息(或者消息组)的确认通知后才会从队列中完全删除该消息。

在某些情况下,交换器无法正确路由到队列中,那么该消息就会返回给发布者,或者丢弃,或者如果消息中间件代理实现了”死信队列(Dead Letter Queue)”扩展,消息会被放置到死信队列中。消息发布者可以选择使用对应的参数控制路由失败的处理策略。

交换器和交换器类型

交互器(Exchange)是消息发送的第一站目的地,它的作用就是就收消息并且将其路由到零个或者多个队列。路由消息的算法取决于交互器的类型和路由规则(也就是Binding)。RabbitMQ消息中间件代理支持四种类型的交互器,分别是:

  • Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  • Topic:按规则转发消息(最灵活)
  • Headers:设置 header attribute 参数类型的交换机
  • Fanout:转发消息到所有绑定队列

20200413155941

声明交换器的时候需要提供一些列的属性,其中比较重要的属性如下:

1
2
3
4
5
6
7
8
public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
super(name, durable, autoDelete, arguments);
}
Name:交互器的名称。
Type:交换器的类型。
Durability:(交换器)持久化特性,如果启动此特性,则Broker重启后交换器依然存在,否则交换器会被删除。
Auto-delete:是否自动删除,如果启用此特性,当最后一个队列解除与交换器的绑定关系,交换器会被删除。
Arguments:可选参数,一般配合插件或者Broker的特性使用。

之所以存在Durability和Auto-delete特性是因为并发所有的场景和用例都要求交互器是持久化的。

Direct交换器

Direct类型的交换器基于消息路由键(RoutingKey)把消息传递到队列中。Direct交换器是消息单播路由的理想实现(当然,用于多播路由也可以),它的工作原理如下:

  • 队列使用路由键K绑定到交换器。
  • 当具有路由键R的新消息到达交换器的时候,如果K = R,那么交换器会把消息传递到队列中。

20200413160005

默认交换器

默认交换器(Default Exchange)是一种特殊的Direct交互器,它的名称是空字符串(也就是””),它由消息中间件代理预声明,在RabbitMQ Broker中,它在Web管理界面中的名称是(AMQP default)。每个新创建的队列都会绑定到默认交换器,路由键就是该队列的队列名,也就是所有的队列都可以通过默认交换器进行消息投递,只需要指定路由键为相应的队列名即可。

20200413160018

Fanout交换器

Fanout其实是一个组合单词,fan也就是扇形,out就是向外发散的意思,Fanout交换器可以想象为”扇形”交换器。Fanout交换器会忽略路由键,它会路由消息到所有绑定到它的队列。也就是说,如果有N个队列绑定到一个Fanout交换器,当一个新的消息发布到该Fanout交换器,那么这条新消息的一个副本会分发到这N个队列中。Fanout交换器是消息广播路由的理想实现。

20200413160034

Topic交换器

Topic交换器基于路由键和绑定队列和交换器的模式进行匹配从而把消息路由到一个或者多个队列。绑定队列和交换器的Topic模式(这个模式串其实就是声明绑定时候的路由键,和消息发布的路由键并非同一个)一般使用点号(dot,也就是’.’)分隔,例如source.target.key,绑定模式支持通配符:

  • 符号#匹配一个或者多个词,例如:source.target.#可以匹配source.target.doge、source.target.doge.throwable等等。
  • 符号只能匹配一个词,例如:source.target.可以匹配source.target.doge、source.target.throwable等等。
    对每一条消息,Topic交换器会遍历所有的绑定关系,检查消息指定的路由键是否匹配绑定关系中的路由键,如果匹配,则将消息推送到相应队列。

20200413160051

Topic交换器是消息多播路由的理想实现。

Headers交换器

Headers交换器是一种不常用的交换器,它使用多个属性进行路由,这些属性一般称为消息头,它不使用路由键进行消息路由。消息头(Message Headers)是消息属性(消息元数据)部分,因此,使用Headers交换器在建立队列和交换器的绑定关系的时候需要指定一组键值对,发送消息到Headers交换器时候,需要在消息属性中携带一组键值对作为消息头。消息头属性支持匹配规则x-match如下:

  • x-match = all:表示所有的键值对都匹配才能接受到消息。
  • x-match = any:表示只要存在键值对匹配就能接受到消息。
    Headers交换器也是忽略路由键的,只依赖于消息属性中的消息头进行消息路由。

20200413160110

队列

AMQP模型中的队列与其他消息或者任务队列系统中的队列非常相似:它们存储应用程序所使用的消息。队列和交换器的基本属性有类似的地方:

1
2
3
4
5
Name:队列名称。
Durable:是否持久化,开启持久化意味着消息中间件代理重启后队列依然存在,否则队列会被删除。
Exclusive:是否独占的,开启队列独占特性意味着队列只能被一个连接使用并且连接关闭之后队列会被删除。
Auto-delete:是否自动删除,开启自动删除特性意味着队列至少有一个消费者并且最后一个消费者解除订阅状态(一般是消费者对应的通道关闭)后队列会自动删除。
Arguments:队列参数,一般和消息中间件代理或者插件的特性相关,如消息的过期时间(Message TTL)和队列长度等。

一个队列只有被声明(Declare)了才能使用,也就是队列的第一次声明就是队列的创建操作(因为第一次声明的时候队列并不存在)。如果使用相同的参数再次声明已经存在的队列,那么此次声明会不生效(当然也不会出现异常)。但是如果使用不相同的参数再次声明已经存在的队列,那么会抛出通道级别的异常,异常代码是406(PRECONDITION_FAILED)。

队列名称

队列名指定为空字符串,那么消息中间件代理会自动生成一个队列名,并且在队列声明的返回结果中带上对应的队列名。

以”amq.”开头的队列是由消息中间件代理内部生成的,有其特殊的作用,因此不能声明此类名称的新队列,否则会导致通道级别的异常,异常代码为403(ACCESS_REFUSED)。

队列的持久化特性

持久化的队列会持久化到磁盘中,这种队列在消息中间件代理重启后不会被删除。不开启持久化特性的队列称为瞬时(transient)队列,并非所有的场景都需要开启队列的持久化特性。

队列的持久化特性并不意味着路由到它上面的消息是持久化的,也就是队列的持久化跟消息的持久化是两回事。如果息中间件代理挂了,它重启后会重新声明开启了持久化特性的队列,这些队列中只有使用了消息持久化特性的消息会被恢复。

绑定

绑定(Binding)是交换器路由消息到队列的规则。例如交换器E可以路由消息到队列Q,那么Q必须通过一定的规则绑定到E。绑定中使用的某些交换器的类型决定了它可以使用可选的路由键(RoutingKey)。路由键的作用类似于过滤器,可以筛选某些发布到交换器的消息路由到目标队列。

如果发布的消息没有路由到任意一个目标队列,例如,消息已经发布到交换器,交换器中没有任何绑定,这个时候消息会被丢弃或者返回给发布者,取决于消息发布者发布消息时候使用的参数。

消费者

消息消费的方式有两种:

  • 消息代理中间件向消费者推送消息(推模式,代表方法是basic.consume)。
  • 消费者主动向消息代理中间件拉取消息(拉模式,代表方法是basic.get)。
    使用推模式的情况下,消费者必须指定需要订阅的队列。每个队列可以存在多个消费者,或者仅仅注册一个独占的消费者。

每个消费者(订阅者)都有一个称为消费者标签(consumer tag)的标识符,消费者标签是一个字符串。通过消费者标签可以实现取消订阅的操作。

消息确认

消费者应用程序有可能在接收和处理消息的时候崩溃,也有可能因为网络原因导致消息中间件代理投递消息到消费者的时候失败了,AMQP消息中间件代理应该在什么时候从队列中删除消息?AMQP规范提供了两种选择:

  • 消息中间件代理向应用程序发送消息(使用AMQP方法basic.deliver或basic.get-ok)。
  • 应用程序收到消息后向消息中间件代理发送确认(使用AMQP方法basic.ack/basic.nack和basic.reject)
    前一种称为自动确认模型(动作触发的同时进行了消息确认);
    后一种主动向消息中间件代理进行消息主动确认,这个消息主动确认动作的执行时机完全由应用程序控制。
    消息主动确认有三种方式:积极确认(ack)、消极确认(nack)和拒绝(reject)。

预取消息

预取消息(Prefetching Messages)是一个特性。对于多个消费者共享同一个队列的情况,能够告知消息中间件代理在发送下一个确认之前指定每个消费者一次可以接收消息的消息量。这个特性可以理解为简单的负载均衡技术,在批量发布消息的场景下能够提高吞吐量。

消息属性和有效负载

AMQP模型中,消息具有属性值。AMQP规范定义了一些常见的属性,一般开发人员不需要太关注这些属性:

1
2
3
4
5
6
7
8
Content type
Content encoding
Routing key
Delivery mode (persistent or not)
Message priority
Message publishing timestamp
Expiration period
Publisher application id

这些通用的属性一般是消息中间件代理使用的,还有可以定制的可选属性header,形式是键值对,类似于HTTP中的请求头。消息属性是在发布消息的时候设置的。

AMQP消息还有一个有效载荷(payload,其实就是消息数据体),AMQP代理将其视为不透明的字节数组,也就是AMQP代理不会检查或者修改消息的有效载荷。有些消息可能只包含属性而没有有效负载。通常使用序列化格式(如JSON,Thrift,Protocol Buffers和MessagePack)来序列化和结构化数据,以便将其作为消息有效负载发布。在一般约定下,消息属性中的Content type和Content encoding一般可以表明其序列化的方式。

消息发布支持消息的持久化特性,消息持久化特性开启后,消息中间件代理会把消息保存到磁盘中,如果重启代理消息也不会丢失。开启消息持久化特性将会影响性能,主要是因为涉及到刷盘操作。

AMQP 方法

AMQP 定义了一些方法,对应了客户端和消息中间件代理之间交互的一些操作方法,常用的交换器相关的操作方法有:

1
2
3
4
exchange.declare   
exchange.declare-OK
exchange.delete
exchange.delete-OK

在逻辑上,上面几个操作方法在客户端和消息中间件代理之间的交互如下:

20200413160133

对于队列,也有类似的操作方法:

1
2
3
4
queue.declare
queue.declare-OK
queue.delete
queue.delete-OK

20200413160146

并非所有的AMQP操作方法都有响应结果操作方法,像消息发布方法basic.publish的使用是最广泛的,此操作方法没有对应的响应结果操作方法。有些操作方法可能有多个响应结果(操作方法),例如basic.get。

连接(Connection)

AMQP的连接(Connection)通常是长期存在的。AMQP是一种使用TCP进行可靠传递的应用程序级协议。AMQP连接使用用户身份验证,可以使用TLS(SSL)进行保护。当应用程序不再需要连接到AMQP代理时,它应该正常关闭AMQP连接,而不是突然关闭底层TCP连接。

通道(Channel)

某些应用程序需要与AMQP代理程序建立多个连接。但是,不希望同时打开许多TCP连接,因为这样做会消耗系统资源并使配置防火墙变得十分困难。通道(Channel)可以认为是”共享一个单独的TCP连接的轻量级连接”,一个AMQP连接可以拥有多个通道。

对于使用了多线程处理的应用程序,有一种使用场景十分普遍:每个线程开启一个新的通道使用,这些通道是线程间隔离的。

另外,每个特定的通道和其他通道是相互隔离的,每个执行的AMQP操作方法(包括响应)都携带一个通道的唯一标识,这样客户端就能通过该通道的唯一标识得知操作方法是对应哪个通道发生的。

虚拟主机(Virtual Host)

为了使单个消息中间件代理可以托管多个完全隔离的”环境”(这里的隔离指的是用户组、交互器、队列等),AMQP提供了虚拟主机(Virtual Host)的概念。多个虚拟主机类似于许多主流的Web服务器的虚拟主机,提供了AMQP组件完全隔离的环境。AMQP客户端可以在连接消息中间件代理时指定需要连接的虚拟主机。

其实从开发者的角度来看,最重要的是Exchange、Queue、Binding三者的关系,消息的发布第一站总是Exchange,从模型上看,消息发布无法直接发送到队列中。Exchange本身不存储消息,它在接收到消息之后,会基于路由规则也就是Binding,把消息路由到目标Queue中。从实际操作来看,声明路由规则总是在发布消息和消费消息之前,也就是一般步骤如下:

1
2
3
4
5
1、声明Exchange。
2、声明Queue。
3、基于Exchange和Queue声明Binding,这个过程有可能自定义一个RoutingKey。
4、通过Exchange消息发布,这个过程有可能使用到上一步定义的RoutingKey。
5、通过Queue消费消息。

消息发布实际上只跟Exchange有关,而消息消费实际上只跟Queue有关。Binding实际上就是Exchange和Queue的契约关系,会直接影响消息发布阶段的消息路由。那么,路由失败一般是什么情况导致的?路由失败,其实就是消息已经发布到Exchange,而Exchange中从既有的Binding中无法找到存在的目标Queue用于传递消息副本。

关于Exchange的类型

模型中支持了四种交换器direct(单播)、fanout(广播)、topic(多播)、headers,实际上,从使用者角度来看,四种交换器的功能是可以相互取代的。例如可以使用fanout类型交换器实现广播,其实使用direct类型交换器也是可以实现广播的,只是对应的direct类型交换器需要通过多个路由键绑定到多个目标队列中。在面对生产环境的技术选型的时候,我们需要考虑性能、维护难度、合理性等角度去考虑选择什么类型的交换器,就上面的广播消息的例子,显然使用fanout类型交换器可以避免声明多个绑定关系,这样在性能、合理性上是更优的选择。

关于负载均衡

在AMQP模型中,负载均衡的实现是基于消费者而不是基于队列(准确来说应该是消息传递到队列的方式)。实际上,出现消息生产速度大大超过消费者的消费速度的时候,队列中有可能会出现消息积压。AMQP-模型中没有提供基于队列负载均衡的特性,也就是出现消息生产速度大大超过消费者的消费速度时候,并不会把消息路由到多个队列中,而是通过预取消息(Prefetching Messages)的特性,确定消息者的消费能力,从而调整消息中间件代理推送消息到对应消费者的数量,这样就能够实现消费速度快的消费者能够消费更多的消息,减少产生有消费者处于饥饿状态和有消费者长期处于忙碌状态的问题。

关于消息确认机制

AMQP中提供的消息确认机制主要包括积极确认(一般叫ack,Acknowledgement)、消极确认(一般叫nack,Negative Acknowledgement)和拒绝(reject)。消息确认机制是保证消息不丢失的重要措施,当消费者接收到消息中间件代理推送的消息时候,需要主动通知消息中间件代理消息已经确认投递成功,然后消息中间件代理才会从队列中删除对应的消息。没有主动确认的消息就会变为”nack”状态,可以想象为暂存在队列的”nack区”中,这些消息不会投递到消费者,直到消费者重启后,”nack区”中的消息会重新变为”ready”状态,可以重新投递给消费者。

zookeeper集群搭建

发表于 2018-04-14 | 分类于 zookeeper

集群搭建

zookeeper集群规模不小于3个节点,并且要求服务器之间系统时间保持一致。

首先在一台机器上192.168.1.106如下操作:

1、下载包传到Linux后解压到/usr/local/zookeeper/下

2、添加环境变量vim /etc/profile

1
2
3
4
#zookeeper
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
立即生效:source /etc/profile

3、进入cd /usr/local/zookeeper/conf

1
2
3
4
5
6
重命名配置文件:mv zoo_sample.cfg zoo.cfg
修改配置: dataDir=/usr/local/zookeeper/data
并且添加如下代码
server.0=192.168.1.106:2888:3888
server.1=192.168.1.107:2888:3888
server.2=912.101.1.108:2888:3888

4、创建data目录:/usr/local/zookeeper/data

1
2
在data下创建文件myid
vim myid 在里边填写0

5、把zookeeper目录远程拷贝到192.168.1.107、192.168.1.108

1
2
3
4
5
6
7
8
9
10
进入/usr/local/
scp -r zookeeper 192.168.1.107:/usr/local/
scp -r zookeeper 192.168.1.108:/usr/local/
分别修改这两个节点下data中的myid文件的数字。和配置文件中的0、1、2
server.0=192.168.1.106:2888:3888
server.1=192.168.1.107:2888:3888
server.2=912.101.1.108:2888:3888
一致。
```
### 6、启动
在三台机器上分别启动zookeeper
启动:zkServer.sh start
看状态:zkServer.sh status(其中一个是leader,其他为follower)
重启:zkServer.sh  restart
1
### 7、操作zookeeper (shell)
zkCli.sh 进入zookeeper客户端    
根据提示命令进行操作:
        查找:ls /   ls /zookeeper
        创建并赋值:create /server hadoop
        获取:get /server
        设值:set /server hadoop
        可以看到zookeeper集群的数据一致性
创建节点有俩种类型:短暂(ephemeral) 持久(persistent)

zkCli.sh -server 192.168.1.106
1
2
## 操作API
Zookeeper API共包含五个包,分别为:

  (1)org.apache.zookeeper
  (2)org.apache.zookeeper.data
  (3)org.apache.zookeeper.server
  (4)org.apache.zookeeper.server.quorum
  (5)org.apache.zookeeper.server.upgrade

1
2
3
4
5
其中org.apache.zookeeper,包含Zookeeper类,他是我们编程时 最常用的类文件。这个类是Zookeeper客户端的主要类文件。如果要使用Zookeeper服务,应用程序首先必须创建一个Zookeeper实例, 这时就需要使用此类。一旦客户端和Zookeeper服务建立起了连接,Zookeeper系统将会给次连接会话分配一个ID值,并且客户端将会周期性的 向服务器端发送心跳来维持会话连接。只要连接有效,客户端就可以使用Zookeeper API来做相应处理了。

zookeeper-*.jar是官方提供的java api,zkclient-0.1.jar则是在原生态的api基础上进行扩展的开源Java客户端api。

Zookeeper构造方法参数:

connectString 服务器列表,用逗号分隔
sessionTimeOut 心跳检测时间周期,毫秒
wather 事件处理通知器
canBeReadOnly 标识当前会话是否支持只读
sessionId和sessionPasswd 标识连接集群的sessionid和密码。

1
2
3
4
5
由于zookeeper客户端和服务端简历连接会话是一个异步的过程,在程序中,我们我们需要在程序中设置等待连接成功在进行下面代码执行。

创建节点方法:create

提供两套创建节点方法,同步和异步方式。

同步方式:
参数1 节点路径,不许递归创建路径。
参数2 节点内容,要求类型是字节数组,不支持序列化方式,如果需要实现序列化,可以使用序列化框架如:Hessian/kryo框架
参数3 节点权限,用Ids.OPEN_ACL_UNSAFE开放权限即可
参数4 节点类型,四种:
PERSISTENT 持久节点
PERSISTENT SEQUENTIAL 持久顺序节点
ERHEMERAL 临时节点
ERHEMERAL SEQUENTIAL 临时顺序节点
```

Zookeeper介绍

发表于 2018-04-14 | 分类于 zookeeper

介绍

Zookeeper 分布式服务框架是 Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题。

Zookeeper 作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,但是 Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。

zookeeper的特性就是在分布式场景下高可用,但是原生的API实现分布式功能较难,团队去实现也很浪费时间还尾部稳定。可以采用第三方客户端的完美实现,比如Curator框架。Apache开源顶级项目。ZK使用场景广泛,协调如Hadoop、Storm、消息中间件、RPC服务框架、数据库增量订阅与消费组件(如MySQL Binlog)、分布式数据库同步系统。淘宝的Otter等。

运用场景

zookeeper是一个类似hdfs的树形文件结构,zookeeper可以用来保证数据在(zk)集群之间的数据的事务性一致、
zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher
zookeeper有三个角色:Learner,Follower,Observer

zookeeper应用场景:

1、统一命名服务(Name Service)

Zookeeper 的 Name Service 更加是广泛意义上的关联,也许你并不需要将名称关联到特定资源上,你可能只需要一个不会重复名称,就像数据库中产生一个唯一的数字主键一样。Name Service 已经是 Zookeeper 内置的功能,你只要调用 Zookeeper 的 API 就能实现。如调用 create 接口就可以很容易创建一个目录节点。

2、配置管理(Configuration Management)

配置的管理在分布式应用环境中很常见,比如我们在平时的应用系统,经常会遇到这样的需求:机器的配置列表、运行时的开关配置。数据库配置信息等。这些全局配置信息通常具备以下3个特性:

a、数据量较小
b、数据内容在运行时动态发生变化
c、集群中各个节点共享信息,配置一致。

例如同一个应用系统需要多台 PC Server 运行,但是它们运行的应用系统的某些配置项是相同的,如果要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统 的 PC Server,这样非常麻烦而且容易出错。这样的配置信息完全可以交给 Zookeeper 来管理,将配置信息保存在 Zookeeper 的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中。

3、集群管理(Group Membership)

zk不仅能帮你维护当前集群中机器的服务状态,而且能够帮你选出一个”总管”,让这个总管来管理集群,这就是zk的另一个功能leader,并实现集群容错功能。希望知道当前集群中究竟有多少机器工作;对集群中每天集群的运行时状态进行数据收集;对集群中每台集群进行上下线操作。

4、发布与订阅

zookeeper是一个典型的发布与订阅模式的分布式数控管理与协调框架,开发人员可以使用它来进行分布式数据的发布与订阅。

5、数据库切换

比如我们初始化zookeeper的时候读取节点上的数据库配置文件,当配置一旦发生变化时,zk就能够帮助我们把变更的通知发送到各个客户端,每个客户端收到这个变更通知后,就可以从新进行最新数据的获取。

6、分布式日志收集

7、分布式锁

8、队列管理

Zookeeper 可以处理两种类型的队列:

a、当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
b、队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。

同步队列用 Zookeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 / synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start。

FIFO 队列用 Zookeeper 实现思路如下:
就是在特定的目录下创建 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证 FIFO。

9、共享锁(Locks)

共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。

分布式协调技术

分布式协调技术 主要用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止造成”脏数据”的后果。在分布式中,为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度。而这个分布式协调技术的核心就是来实现这个分布式锁。调度算法不能解决分布式锁问题,如果这些进程在一台机器上,调度算法是最优解。

ZooKeeper是一种为分布式应用所设计的高可用、高性能且一致的开源协调服务,它提供了一项基本服务:分布式锁服务。由于ZooKeeper的开源特性,后来我们的开发者在分布式锁的基础上,摸索了出了其他的使用方法:配置维护、组服务、分布式消息队列、分布式通知/协调等。

ZooKeeper数据模型

ZooKeeper数据模型Znode

Zookeeper 会维护一个具有层次关系的数据结构,它非常类似于一个标准的文件系统。

20200413095310

1、结构

ZooKeeper树中的每个节点被称为-Znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1,Znode兼具文件和目录两种特点。既像文件一样维护着数据、元信息、ACL、时间戳等数据结构,又像目录一样可以作为路径标识的一部分。

每个Znode由3部分组成:

  • ① stat:此为状态信息, 描述该Znode的版本, 权限等信息

  • ② data:与该Znode关联的数据

  • ③ children:该Znode下的子节点

    znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录
    znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
    znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
    znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2
    znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的,后面在典型的应用场景中会有实例介绍

2、数据访问:

每个节点存储的数据要被原子性的操作。也就是说读操作将获取与节点相关的所有数据,写操作也将替换掉节点的所有数据。另外,每一个节点都拥有自己的ACL(访问控制列表),这个列表规定了用户的权限,即限定了特定用户对目标节点可以执行的操作。

节点类型: 分别为临时节点和永久节点。节点的类型在创建时即被确定,并且不能改变。

  • ① 临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话(Session)结束,临时节点将被自动删除,当然可以也可以手动删除。虽然每个临时的Znode都会绑定到一个客户端会话,但他们对所有的客户端还是可见的。另外,ZooKeeper的临时节点不允许拥有子节点。

  • ② 永久节点:该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除。

顺序节点: 当创建Znode的时候,用户可以请求在ZooKeeper的路径结尾添加一个递增的计数。这个计数对于此节点的父节点来说是唯一的,它的格式为”%10d”(10位数字,没有数值的数位用0补充,例如”0000000001”)。当计数值大于232-1时,计数器将溢出。

观察: 客户端可以在节点上设置watch,我们称之为监视器。当节点状态发生改变时(Znode的增、删、改)将会触发watch所对应的操作。当watch被触发时,ZooKeeper将会向客户端发送且仅发送一条通知,因为watch只能被触发一次,这样可以减少网络流量。

3、时间

(1) Zxid:ZooKeeper节点状态改变的每一个操作都将使节点接收到一个Zxid格式的时间戳,并且这个时间戳全局有序唯一的。如果Zxid1的值小于Zxid2的值,那么Zxid1所对应的事件发生在Zxid2所对应的事件之前。实际 上,ZooKeeper的每个节点维护者三个Zxid值,为别为:cZxid、mZxid、pZxid。

① cZxid: 是节点的创建时间所对应的Zxid格式时间戳。
② mZxid:是节点的修改时间所对应的Zxid格式时间戳。
③ pZxid:是与该节点的子节点(或该节点)的最近一次创建/删除的时间戳对应,只与本节点/该节点的子节点有关,与孙子节点无关。

(2) 版本号:对节点的每一个操作都将致使这个节点的版本号增加。每个节点维护着三个版本号,他们分别为:

① version:节点数据版本号
② cversion:子节点版本号
③ aversion:节点所拥有的ACL版本号

4、节点属性

20200413103423

ZooKeeper服务中的操作(ZooKeeper类)

20200413103452

更新ZooKeeper操作是有限制的。delete或setData必须明确要更新的Znode的版本号,我们可以调用exists找到。如果版本号不匹配,更新将会失败。

更新ZooKeeper操作是非阻塞式的。因此客户端如果失去了一个更新(由于另一个进程在同时更新这个Znode),他可以在不阻塞其他进程执行的情况下,选择重新尝试或进行其他操作。

Watch触发器

ZooKeeper可以为所有的读操作设置watch,这些读操作包括:exists()、getChildren()及getData()。watch事件是一次性的触发器,当watch的对象状态发生改变时,将会触发此对象上watch所对应的事件。watch事件将被异步地发送给客户端,并且ZooKeeper为watch机制提供了有序的一致性保证。理论上,客户端接收watch事件的时间要快于其看到watch对象状态变化的时间。

ZooKeeper所管理的watch可以分为两类:

① 数据watch(data  watches):getData和exists负责设置数据watch
② 孩子watch(child watches):getChildren负责设置孩子watch

我们可以通过操作返回的数据来设置不同的watch:

① getData和exists:返回关于节点的数据信息
② getChildren:返回孩子列表

因此

① 一个成功的setData操作将触发Znode的数据watch
② 一个成功的create操作将触发Znode的数据watch以及孩子watch
③ 一个成功的delete操作将触发Znode的数据watch以及孩子watch

20200413103350

Watch由客户端所连接的ZooKeeper服务器在本地维护,因此watch可以非常容易地设置、管理和分派。当客户端连接到一个新的服务器 时,任何的会话事件都将可能触发watch。另外,当从服务器断开连接的时候,watch将不会被接收。但是,当一个客户端重新建立连接的时候,任何先前 注册过的watch都会被重新注册。

Zookeeper的watch实际上要处理两类事件:

① 连接状态事件(type=None, path=null)  这类事件不需要注册,也不需要我们连续触发,我们只要处理就行了。
② 节点事件  节点的建立,删除,数据的修改。它是one time trigger,我们需要不停的注册触发,还可能发生事件丢失的情况。

zoo.cfg配置文件解释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
tickTime:基本事件单元,以毫秒为单位,Zookeeper 服务器心跳时间,也就是每隔 tickTime时间就会发送一个心跳。                
initLimit:这个配置项是用来配置 Zookeeper 接受客户端初始化连接时最长能忍受多少个心跳时间间隔数,
当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20 秒。
syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10 秒
dataDir:存储内存中数据库快照的位置,就是保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
dataLogDir:设置log目录
clientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
server.A = B:C:D
A表示这个是第几号服务器,
B 是这个服务器的 ip 地址;
C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader

maxClientCnxns:这个操作将限制连接到Zookeeper的客户端数量,并限制并发连接的数量,通过IP来区分不同的客户端。此配置选项可以阻止某些类别的Dos攻击。将他设置为零或忽略不进行设置将会取消对并发连接的限制。

minSessionTimeout和maxSessionTimeout
即最小的会话超时和最大的会话超时时间。在默认情况下,minSession=2*tickTime;maxSession=20*tickTime。
上一页1…141516…25下一页
初晨

初晨

永远不要说你知道本质,更别说真相了。

249 日志
46 分类
109 标签
近期文章
  • WebSocket、Socket、TCP、HTTP区别
  • Springboot项目的接口防刷
  • 深入理解Volatile关键字及其实现原理
  • 使用vscode搭建个人笔记环境
  • HBase介绍安装与操作
© 2018 — 2020 Copyright
由 Hexo 强力驱动
|
主题 — NexT.Gemini v5.1.4