源码阅读笔记"/>
curator leaderLatch 源码阅读笔记
zookeeper 结构
curator api 介绍
CuratorFramework //可以直接看做zkclient 的上层封装,作为client用
基本用法:
//build client CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().retryPolicy(new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(),zookeeperProperties.getMaxRetries(), zookeeperProperties.getMaxSleepTimeMs())).connectString(zookeeperProperties.getServerList()); client = builder.build();client.start(); //基本操作 client.create().forPath("/head", new byte[0]); client.delete().inBackground().forPath("/head"); client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]); client.getData().watched().inBackground().forPath("/test");
curator leaderLatch 源码分析
-
先说构建、开始leader选举和选上或者没选上等待继续选的过程;
//基本用法 LeaderLatch leaderLatch = new LeaderLatch(client, "/leaderLatchTest", UUID.randomUUID().toString()); leaderLatch.addListener(new LeaderLatchListener() {@Overridepublic void isLeader() {System.out.println("isLeader");} @Overridepublic void notLeader() {System.out.println("notLeader");} }); leaderLatch.start();
看下构造方法,只是在传入操作zk节点的client,涉及到leader选举的主题路径,以及竞争过程中创建出字节点的值
public LeaderLatch(CuratorFramework client, String latchPath, String id){this(client, latchPath, id, CloseMode.SILENT);} /*** @param client the client* @param latchPath the path for this leadership group* @param id participant ID* @param closeMode behaviour of listener on explicit close.*/public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode){this.client = Preconditions.checkNotNull(client, "client cannot be null");this.latchPath = PathUtils.validatePath(latchPath);this.id = Preconditions.checkNotNull(id, "id cannot be null");this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");}
zk 下节点结构:
public void start() throws Exception{Preconditions.checkState(statepareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); startTask.set(AfterConnectionEstablished.execute(client, new Runnable(){@Overridepublic void run(){try{//这里回监听选举主题下节点的变化,和首次参与选举判断逻辑internalStart();}finally{startTask.set(null);}}}));}
private synchronized void internalStart(){if ( state.get() == State.STARTED ){client.getConnectionStateListenable().addListener(listener);//这里增加listenertry{reset();//选举逻辑}catch ( Exception e ){ThreadUtils.checkInterrupted(e);log.error("An error occurred checking resetting leadership.", e);}}}
先看reset逻辑
void reset() throws Exception{setLeadership(false);//初始化:设置为非leadersetNode(null);//初始化 //创建子节点的回调操作【只是执行异步化,理解时候可以把下面client操作看做同步操作,这里就是同步操作的后续逻辑】BackgroundCallback callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( debugResetWaitLatch != null ){debugResetWaitLatch.await();debugResetWaitLatch = null;} if ( event.getResultCode() == KeeperException.Code.OK.intValue() ){//记录子节点路径,。。。setNode(event.getName());if ( state.get() == State.CLOSED ){setNode(null);}else{//首次会走这里,拿到子节点,排序、如果咱排在第一,那就是leader,否则就观察前一个节点(node)的动静getChildren();}}else{log.error("getChildren() failed. rc = " + event.getResultCode());}}};//创建临时有序子节点【CreateMode.EPHEMERAL_SEQUENTIAL】//路径构造逻辑ZKPaths.makePath(latchPath, LOCK_NAME),这里不细说了,待会看图//子节点的值就是构造时传入的 id【LeaderSelector.getIdBytes(id)】client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));}
Ps:回调事件的
code值
,0==ok代表操作完成
private void getChildren() throws Exception{BackgroundCallback callback = new BackgroundCallback(){...//子节点排序,判断leadercheckLeadership(event.getChildren());...};//拿到子节点client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));}
private void checkLeadership(List<String> children) throws Exception{final String localOurPath = ourPath.get();//1 排序List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;...if ( ourIndex == 0 ){//处理作为leader的后续逻辑setLeadership(true);}else{String watchPath = sortedChildren.get(ourIndex - 1);Watcher watcher = new Watcher(){@Overridepublic void process(WatchedEvent event){if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) ){...//前一个节点一有变化【删除】,重新拿到所有子节点排序,判断leader【重复前面操作】getChildren();... }}}; BackgroundCallback callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ){// previous node is gone - resetreset();}}};//观察前一个节点动静// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leakclient.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));}}
-
下面说下,选上之后,以及选上因为断链失去leader如何通知业务代码的逻辑;还有就是自动续期的概念;
更多推荐
curator leaderLatch 源码阅读笔记
发布评论