curator leaderLatch 源码阅读笔记

编程入门 行业动态 更新时间:2024-10-10 17:28:02

curator leaderLatch <a href=https://www.elefans.com/category/jswz/34/1770099.html style=源码阅读笔记"/>

curator leaderLatch 源码阅读笔记

zookeeper 结构

curator api 介绍

CuratorFramework //可以直接看做zkclient 的上层封装,作为client用

基本用法:

//build client
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().retryPolicy(new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(),zookeeperProperties.getMaxRetries(), zookeeperProperties.getMaxSleepTimeMs())).connectString(zookeeperProperties.getServerList());
client = builder.build();client.start();
​
//基本操作
client.create().forPath("/head", new byte[0]);
client.delete().inBackground().forPath("/head");
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
client.getData().watched().inBackground().forPath("/test");

curator leaderLatch 源码分析

  • 先说构建、开始leader选举和选上或者没选上等待继续选的过程;

//基本用法
LeaderLatch leaderLatch = new LeaderLatch(client, "/leaderLatchTest", UUID.randomUUID().toString());
leaderLatch.addListener(new LeaderLatchListener() {@Overridepublic void isLeader() {System.out.println("isLeader");}
​@Overridepublic void notLeader() {System.out.println("notLeader");}
});
leaderLatch.start();

看下构造方法,只是在传入操作zk节点的client,涉及到leader选举的主题路径,以及竞争过程中创建出字节点的值

public LeaderLatch(CuratorFramework client, String latchPath, String id){this(client, latchPath, id, CloseMode.SILENT);}
​/*** @param client    the client* @param latchPath the path for this leadership group* @param id        participant ID* @param closeMode behaviour of listener on explicit close.*/public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode){this.client = Preconditions.checkNotNull(client, "client cannot be null");this.latchPath = PathUtils.validatePath(latchPath);this.id = Preconditions.checkNotNull(id, "id cannot be null");this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");}

zk 下节点结构:

public void start() throws Exception{Preconditions.checkState(statepareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
​startTask.set(AfterConnectionEstablished.execute(client, new Runnable(){@Overridepublic void run(){try{//这里回监听选举主题下节点的变化,和首次参与选举判断逻辑internalStart();}finally{startTask.set(null);}}}));} 
private synchronized void internalStart(){if ( state.get() == State.STARTED ){client.getConnectionStateListenable().addListener(listener);//这里增加listenertry{reset();//选举逻辑}catch ( Exception e ){ThreadUtils.checkInterrupted(e);log.error("An error occurred checking resetting leadership.", e);}}}

先看reset逻辑

void reset() throws Exception{setLeadership(false);//初始化:设置为非leadersetNode(null);//初始化
​//创建子节点的回调操作【只是执行异步化,理解时候可以把下面client操作看做同步操作,这里就是同步操作的后续逻辑】BackgroundCallback callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( debugResetWaitLatch != null ){debugResetWaitLatch.await();debugResetWaitLatch = null;}
​if ( event.getResultCode() == KeeperException.Code.OK.intValue() ){//记录子节点路径,。。。setNode(event.getName());if ( state.get() == State.CLOSED ){setNode(null);}else{//首次会走这里,拿到子节点,排序、如果咱排在第一,那就是leader,否则就观察前一个节点(node)的动静getChildren();}}else{log.error("getChildren() failed. rc = " + event.getResultCode());}}};//创建临时有序子节点【CreateMode.EPHEMERAL_SEQUENTIAL】//路径构造逻辑ZKPaths.makePath(latchPath, LOCK_NAME),这里不细说了,待会看图//子节点的值就是构造时传入的 id【LeaderSelector.getIdBytes(id)】client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));}

Ps:回调事件的code值,0==ok代表操作完成

private void getChildren() throws Exception{BackgroundCallback callback = new BackgroundCallback(){...//子节点排序,判断leadercheckLeadership(event.getChildren());...};//拿到子节点client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));}
private void checkLeadership(List<String> children) throws Exception{final String localOurPath = ourPath.get();//1 排序List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;...if ( ourIndex == 0 ){//处理作为leader的后续逻辑setLeadership(true);}else{String watchPath = sortedChildren.get(ourIndex - 1);Watcher watcher = new Watcher(){@Overridepublic void process(WatchedEvent event){if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) ){...//前一个节点一有变化【删除】,重新拿到所有子节点排序,判断leader【重复前面操作】getChildren();... }}};
​BackgroundCallback callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ){// previous node is gone - resetreset();}}};//观察前一个节点动静// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leakclient.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));}}
  • 下面说下,选上之后,以及选上因为断链失去leader如何通知业务代码的逻辑;还有就是自动续期的概念;

更多推荐

curator leaderLatch 源码阅读笔记

本文发布于:2024-03-13 07:26:38,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1733461.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:源码   笔记   curator   leaderLatch

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!