一、前言
前面阐述了服务器的总体框架,下面来分析服务器的所有父类ZooKeeperServer。
二、ZooKeeperServer源码分析
2.1 类的继承关系
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {}
说明:ZooKeeperServer是ZooKeeper中所有服务器的父类,其实现了Session.Expirer和ServerStats.Provider接口,SessionExpirer中定义了expire方法(表示会话过期)和getServerId方法(表示获取服务器ID),而Provider则主要定义了获取服务器某些数据的方法。
2.2 类的内部类
1. DataTreeBuilder类
public interface DataTreeBuilder { // 构建DataTree public DataTree build(); }
说明:其定义了构建树DataTree的接口。
2. BasicDataTreeBuilder类
static public class BasicDataTreeBuilder implements DataTreeBuilder { public DataTree build() { return new DataTree(); } }
说明:实现DataTreeBuilder接口,返回新创建的树DataTree。
3. MissingSessionException类
public static class MissingSessionException extends IOException { private static final long serialVersionUID = 7467414635467261007L; public MissingSessionException(String msg) { super(msg); } }
说明:表示会话缺失异常。
4. ChangeRecord类
static class ChangeRecord { ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) { // 属性赋值 this.zxid = zxid; this.path = path; this.stat = stat; this.childCount = childCount; this.acl = acl; } // zxid long zxid; // 路径 String path; // 统计数据 StatPersisted stat; /* Make sure to create a new object when changing */ // 子节点个数 int childCount; // ACL列表 List<ACL> acl; /* Make sure to create a new object when changing */ @SuppressWarnings("unchecked") // 拷贝 ChangeRecord duplicate(long zxid) { StatPersisted stat = new StatPersisted(); if (this.stat != null) { DataTree.copyStatPersisted(this.stat, stat); } return new ChangeRecord(zxid, path, stat, childCount, acl == null ? new ArrayList<ACL>() : new ArrayList(acl)); } }
说明:ChangeRecord数据结构是用于方便PrepRequestProcessor和FinalRequestProcessor之间进行信息共享,其包含了一个拷贝方法duplicate,用于返回属性相同的ChangeRecord实例。
2.3 类的属性
说明:类中包含了心跳频率,会话(处理会话)、事务日志快照、内存数据库、请求处理器、未处理的ChangeRecord、服务器统计信息等。
2.4 类的构造函数
1. ZooKeeperServer()型构造函数
public ZooKeeperServer() { serverStats = new ServerStats(this); }
说明:其只初始化了服务器的统计信息。
2. ZooKeeperServer(FileTxnSnapLog, int, int, int, DataTreeBuilder, ZKDatabase)型构造函数
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb) { // 给属性赋值 serverStats = new ServerStats(this); this.txnLogFactory = txnLogFactory; this.zkDb = zkDb; this.tickTime = tickTime; this.minSessionTimeout = minSessionTimeout; this.maxSessionTimeout = maxSessionTimeout; LOG.info("Created server with tickTime " + tickTime + " minSessionTimeout " + getMinSessionTimeout() + " maxSessionTimeout " + getMaxSessionTimeout() + " datadir " + txnLogFactory.getDataDir() + " snapdir " + txnLogFactory.getSnapDir()); }
说明:该构造函数会初始化服务器统计数据、事务日志工厂、心跳时间、会话时间(最短超时时间和最长超时时间)。
3. ZooKeeperServer(FileTxnSnapLog, int, DataTreeBuilder)型构造函数
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) throws IOException { this(txnLogFactory, tickTime, -1, -1, treeBuilder, new ZKDatabase(txnLogFactory)); }
说明:其首先会生成ZooKeeper内存数据库后,然后调用第二个构造函数进行初始化操作。
4. ZooKeeperServer(File, File, int)型构造函数
public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException { this( new FileTxnSnapLog(snapDir, logDir), tickTime, new BasicDataTreeBuilder()); }
说明:其会调用同名构造函数进行初始化操作。
5. ZooKeeperServer(FileTxnSnapLog, DataTreeBuilder)型构造函数
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException { this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, treeBuilder, new ZKDatabase(txnLogFactory)); }
说明:其生成内存数据库之后再调用同名构造函数进行初始化操作。
2.5 核心函数分析
1. loadData函数
public void loadData() throws IOException, InterruptedException { /* * When a new leader starts executing Leader#lead, it * invokes this method. The database, however, has been * initialized before running leader election so that * the server could pick its zxid for its initial vote. * It does it by invoking QuorumPeer#getLastLoggedZxid. * Consequently, we don't need to initialize it once more * and avoid the penalty of loading it a second time. Not * reloading it is particularly important for applications * that host a large database. * * The following if block checks whether the database has * been initialized or not. Note that this method is * invoked by at least one other method: * ZooKeeperServer#startdata. * * See ZOOKEEPER-12 for more detail. */ if(zkDb.isInitialized()){ // 内存数据库已被初始化 // 设置为最后处理的Zxid setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { // 未被初始化,则加载数据库 setZxid(zkDb.loadDataBase()); } // Clean up dead sessions LinkedList<Long> deadSessions = new LinkedList<Long>(); for (Long session : zkDb.getSessions()) { // 遍历所有的会话 if (zkDb.getSessionWithTimeOuts().get(session) == null) { // 删除过期的会话 deadSessions.add(session); } } // 完成DataTree的初始化 zkDb.setDataTreeInit(true); for (long session : deadSessions) { // 遍历过期会话 // XXX: Is lastProcessedZxid really the best thing to use? // 删除会话 killSession(session, zkDb.getDataTreeLastProcessedZxid()); } }
说明:该函数用于加载数据,其首先会判断内存库是否已经加载设置zxid,之后会调用killSession函数删除过期的会话,killSession会从sessionTracker中删除session,并且killSession最后会调用DataTree的killSession函数,其源码如下
void killSession(long session, long zxid) { // the list is already removed from the ephemerals // so we do not have to worry about synchronizing on // the list. This is only called from FinalRequestProcessor // so there is no need for synchronization. The list is not // changed here. Only create and delete change the list which // are again called from FinalRequestProcessor in sequence. // 移除session,并获取该session对应的所有临时节点 HashSet<String> list = ephemerals.remove(session); if (list != null) { for (String path : list) { // 遍历所有临时节点 try { // 删除路径对应的节点 deleteNode(path, zxid); if (LOG.isDebugEnabled()) { LOG .debug("Deleting ephemeral node " + path + " for session 0x" + Long.toHexString(session)); } } catch (NoNodeException e) { LOG.warn("Ignoring NoNodeException for path " + path + " while removing ephemeral for dead session 0x" + Long.toHexString(session)); } } } }
说明:DataTree的killSession函数的逻辑首先移除session,然后取得该session下的所有临时节点,然后逐一删除临时节点。
2. submit函数
public void submitRequest(Request si) { if (firstProcessor == null) { // 第一个处理器为空 synchronized (this) { try { while (!running) { // 直到running为true,否则继续等待 wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); // 是否为合法的packet boolean validpacket = Request.isValid(si.type); if (validpacket) { // 处理请求 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } }
说明:当firstProcessor为空时,并且running标志为false时,其会一直等待,直到running标志为true,之后调用touch函数判断session是否存在或者已经超时,之后判断请求的类型是否合法,合法则使用请求处理器进行处理。
3. processConnectRequest函数
说明:其首先将传递的ByteBuffer进行反序列化,转化为相应的ConnectRequest,之后进行一系列判断(可能抛出异常),然后获取并判断该ConnectRequest中会话id是否为0,若为0,则表示可以创建会话,否则,重新打开会话。
4. processPacket函数
说明:该函数首先将传递的ByteBuffer进行反序列,转化为相应的RequestHeader,然后根据该RequestHeader判断是否需要认证,若认证失败,则构造认证失败的响应并发送给客户端,然后关闭连接,并且再补接收任何packet。若认证成功,则构造认证成功的响应并发送给客户端。若不需要认证,则再判断其是否为SASL类型,若是,则进行处理,然后构造响应并发送给客户端,否则,构造请求并且提交请求。
三、总结
本篇分析了ZooKeeperServer的源码,了解了其对于请求和会话的处理,也谢谢各位园友的观看~