Gossip是什么
Gossip协议是一个通信协议,一种传播消息的方式,灵感来自于:瘟疫、社交网络等。使用Gossip协议的有:Redis Cluster、Consul、Apache Cassandra等。
原理
Gossip协议基本思想就是:一个节点想要分享一些信息给网络中的其他的一些节点。于是,它周期性的随机选择一些节点,并把信息传递给这些节点。这些收到信息的节点接下来会做同样的事情,即把这些信息传递给其他一些随机选择的节点。一般而言,信息会周期性的传递给N个目标节点,而不只是一个。这个N被称为fanout(这个单词的本意是扇出)。
例子
我们这里假设用N个节点,然后其中某一个节点的数据改变后,所有节点都需要改成相同的(最终相同),举个例子,有10个节点
1,2,3,4,5,6,7,8,9,10
假设扇出是4,然后节点1的数据变化
第一次扩散、节点1随机从剩下的节点中取四个通知
假设取到的是2,3,4,5,那么第一次扩散就已经有1,2,3,4,5个节点一致了
第二次扩散、节点2,3,4,5分别从剩下的节点中随机取四个
假设2取到的是6,7,8,9;3取到的是7,8,9,10,也有可能2,3,4,5都取到6,7,8,9
那么剩下的节点10就得第三次扩散才能扩散过去。
这里因为每个节点都是异步的,可以不考虑是否扩散成功,所以有些节点可能会多次扩散到,所以我下面的例子获取的扩散次数其实假设只扩散一次,实际应该会比我下面的例子次数多一点,举个例子,redis有多个节点,节点2和节点3准备扩散的时候,同时选了节点4,此时节点4状态还是为扩散的,所以会被扩散2次,如果节点数多,重复扩散数目会更多。
代码
1、主程序GossipTest
public class GossipTest {public static void main(String[] args) {//节点数目int num = 20;//扇出也就是每次扩散的节点数int fanout = 4;List<Node> nodes = new ArrayList<Node>();for (int i = 1; i <=num; i++) {Node node = new Node();node.setFanout(fanout);node.setIndex(i);node.setNodes(nodes);nodes.add(node);}//表明这个是第一次传播ConsistentUtil.map.put(nodes.get(0).getIndex(),0);nodes.get(0).setData("大家都设置为1");try {Thread.sleep(10000);System.out.println("到结束,总共扩散了"+ConsistentUtil.max+"次"+nodes);} catch (InterruptedException e) {}}}
作用就是根据节点的数目和扇出的数目初始化相关设置,然后假设第一个节点数据变化,触发了扩散,当然这里的扩散只能用线程模拟,真实情况可能是RPC调用别的服务器的资源。
2、节点Node
/*** 每一个节点,最终目的是每个节点的数据最终一致*/class Node{//节点顺序号private int index;//设置扇出private int fanout;//节点数据,这里简单的用字符串代替private String data;//持有所有节点的位置(不需要考虑并发,重复执行,最终一致性即可)private List<Node> nodes;public int getIndex() {return index;}public void setIndex(int index) {this.index = index;}public String getData() {return data;}public void setData(String data) {this.data = data;//这里设置完消息后需要进行传播消息,这里启动一个线程来模拟//但是这里可能会导致二次调用,所以这里要锁一下try {//这里捕获异常,允许传播失败new ConsistentUtil(nodes, this).start();} catch (Exception e) {e.printStackTrace();}}public List<Node> getNodes() {return nodes;}public void setNodes(List<Node> nodes) {this.nodes = nodes;}public int getFanout() {return fanout;}public void setFanout(int fanout) {this.fanout = fanout;}@Overridepublic String toString() {return "Node [index=" + index + ", data=" + data + "]";}}
上面的属性有节点编号,扇出,节点数据,以及所有节点信息,这里就单纯用list来保存所有节点信息了,并且所有节点信息的修改不需要加任何的锁,因为扩散可能是多次的,失败后会有别的节点继续扩散,这样的话就完全解耦合了。,扩散的触发点是如下代码
public void setData(String data) {this.data = data;//这里设置完消息后需要进行传播消息,这里启动一个线程来模拟//但是这里可能会导致二次调用,所以这里要锁一下try {//这里捕获异常,允许传播失败new ConsistentUtil(nodes, this).start();} catch (Exception e) {e.printStackTrace();}}
但是世界情况,可能是数据改变后,会有个固定的时间去进行扩散,而不会立马,并且不通的应用场景有不通的需求,这里的话暂时接收到改变就立马处理。扩散的方式也是启动一个线程去处理,并且该线程运行失败也不用处理,因为就算这个节点没有扩散成功,会有别的节点扩散成功的,除非扩散的节点真的一直挂掉了,那就是另一个使用场景了。
3、扩散线程
class ConsistentUtil extends Thread{public static Map<Integer,Integer> map = new ConcurrentHashMap<Integer,Integer>();public static int max=0;public static synchronized void compare(int num) {if(num>max) {max=num;//System.out.println("最大循环次数:"+max);}}//持有所有节点的位置(不需要考虑并发,重复执行,最终一致性即可)private List<Node> nodes;//执行的nodeprivate Node node;public ConsistentUtil(List<Node> nodes, Node node) {super();this.nodes = nodes;this.node = node;}@Overridepublic void run() {//1、先检查是否全部都已经一致了boolean flag = checkConsistent();if(!flag) {//随机获取fanout个NodeList<Node> choice = this.getChoiceNodes();if(choice.size()>0) {//传播的次数加1for (Node node : choice) {//System.out.println(this.node.getIndex()+"向"+node.getIndex()+"进行传播");//获取上一个节点扩散的次数Integer num = map.get(this.node.getIndex());//本次扩散次数加1map.put(node.getIndex(),num+1);node.setData(this.node.getData());}}}else {//这里表示结束了System.out.println(this.node.getIndex()+"已经结束了");count();}}private boolean checkConsistent() {Node pre = null;for (Node node : nodes) {if(pre==null) {pre=node;}else {if(pre.getData().equals(node.getData())) {pre = node;}else {//有不一致的return false;}}}return true;}//选择没有被传播的节点private List<Node> getChoiceNodes() {List<Node> noSets = new ArrayList<Node>();for (Node n : nodes) {//这个没有传播过,且不是自己,继续传播if(!this.node.getData().equals(n.getData())&&(this.node.getIndex()!=n.getIndex())) {noSets.add(n);}}List<Node> choice = new ArrayList<Node>();//1、获取当前节点数目int num = noSets.size();//2、随机获取fanout个下标if(num<=node.getFanout()) {//这里就获取全部return noSets;}else {int[] indexs = this.getRandomNums(node.getFanout(), num);for (int i : indexs) {choice.add(noSets.get(i));}}return choice;}/*** 从all里面随机产生size个不重复的下标* @param size 需要产生的下标数* @param all 范围* @return*/private int[] getRandomNums(int size,int all) {if(size>all) {System.err.println("size大于all"+size+">"+all);return null;}SecureRandom random = new SecureRandom();//把all当成一个listList<Integer> list = new ArrayList<Integer>();for(int i=0;i<all;i++) {list.add(i);}int[] result = new int[size];for(int j=0;j<size;j++) {//随机生成一个下标int index = random.nextInt(list.size());//根据下标去取list中的值result[j]=list.get(index);//从list移除该值list.remove(index);}return result;}//通过map统计次数private static void count() {int max = 0;for (Map.Entry<Integer, Integer> entry : map.entrySet()) {//Integer mapKey = entry.getKey();Integer mapValue = entry.getValue();if(mapValue>max) {max = mapValue;}}compare(max);}}
原理就是,从所有节点中随机获取扇出(fanout)个数据还没有一致的节点,然后修改它们的值。
那么下面就是运行测试数据扇出是4.
| 节点数目 | 20 | 60 | 180 | 500 | 1000 | 3000 | 8000 | 20000 |
|---|---|---|---|---|---|---|---|---|
| 扩散次数 | 4 | 6 | 8 | 9 | 10 | 13 | 18 | 18 |
可以看到,随着节点数的增加,扩散次数其实变化的不大,毕竟扩散是指数级别递增的。
当然我上面的例子,肯定还有很多没考虑的地方,不过大体思路应是这样,只是随手用java模拟一下下而已,有错误麻烦指正下下。
图解Gossip:可能是最有趣的一致性协议(转)
