From 432578d3f2501c95c79970f1624a8acc8f4bad18 Mon Sep 17 00:00:00 2001 From: Dimas Shidqi Parikesit Date: Wed, 5 Jun 2024 23:51:37 +0700 Subject: [PATCH 1/2] ZOOKEEPER-4837: Network issue causes ephemeral node unremoved after the session expiration --- .../apache/zookeeper/server/ZKDatabase.java | 21 +++- .../server/persistence/FileSnap.java | 66 ++++++++++++ .../server/persistence/FileTxnSnapLog.java | 81 ++++++++++++++ .../server/persistence/SnapShot.java | 10 ++ .../quorum/EphemeralNodeDeletionTest.java | 102 ++++++++++++++++++ 5 files changed, 279 insertions(+), 1 deletion(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index d98c97f2c07..2286ee03f79 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -293,6 +293,25 @@ public long loadDataBase() throws IOException { return zxid; } + /** + * load the database from the disk onto memory and also add + * the transactions to the committedlog in memory + * until the checkpoint zxid + * + * @return the last valid zxid on disk + * @throws IOException + */ + public long loadDataBase(long zxid) throws IOException { + long startTime = Time.currentElapsedTime(); + long newZxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener, zxid); + initialized = true; + long loadTime = Time.currentElapsedTime() - startTime; + ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime); + LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}", + loadTime, Long.toHexString(zxid), dataTree.getTreeDigest()); + return newZxid; + } + /** * Fast forward the database adding transactions from the committed log into memory. * @return the last valid zxid. @@ -602,7 +621,7 @@ public boolean truncateLog(long zxid) throws IOException { return false; } - loadDataBase(); + loadDataBase(zxid); return true; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java index cb7efc403e0..c7182611882 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java @@ -125,6 +125,72 @@ public long deserialize(DataTree dt, Map sessions) throws IOExcep return dt.lastProcessedZxid; } + /** + * deserialize a data tree from the most recent snapshot before the checkpoint zxid + * + * @return the zxid of the snapshot + */ + public long deserialize(DataTree dt, Map sessions, long zxid) throws IOException { + // we run through 100 snapshots (not all of them) + // if we cannot get it running within 100 snapshots + // we should give up + List snapList = findNValidSnapshots(100); + if (snapList.size() == 0) { + return -1L; + } + File snap = null; + long snapZxid = -1; + boolean foundValid = false; + for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) { + snap = snapList.get(i); + LOG.info("Reading snapshot {}", snap); + snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); + + // if this snapshot has a higher zxid than the checkpoint zxid, skip and continue to the previous snapshot + if (snapZxid > zxid) { + continue; + } + + try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) { + InputArchive ia = BinaryInputArchive.getArchive(snapIS); + deserialize(dt, sessions, ia); + SnapStream.checkSealIntegrity(snapIS, ia); + + // Digest feature was added after the CRC to make it backward + // compatible, the older code can still read snapshots which + // includes digest. + // + // To check the intact, after adding digest we added another + // CRC check. + if (dt.deserializeZxidDigest(ia, snapZxid)) { + SnapStream.checkSealIntegrity(snapIS, ia); + } + + // deserialize lastProcessedZxid and check inconsistency + if (dt.deserializeLastProcessedZxid(ia)) { + SnapStream.checkSealIntegrity(snapIS, ia); + } + + foundValid = true; + break; + } catch (IOException e) { + LOG.warn("problem reading snap file {}", snap, e); + } + } + if (!foundValid) { + throw new IOException("Not able to find valid snapshots in " + snapDir); + } + dt.lastProcessedZxid = snapZxid; + lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000); + + // compare the digest if this is not a fuzzy snapshot, we want to compare + // and find inconsistent asap. + if (dt.getDigestFromLoadedSnapshot() != null) { + dt.compareSnapshotDigests(dt.lastProcessedZxid); + } + return dt.lastProcessedZxid; + } + /** * deserialize the datatree from an inputarchive * @param dt the datatree to be serialized into diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index 29b4c0a616d..d7761261075 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -312,6 +312,87 @@ public long restore(DataTree dt, Map sessions, PlayBackListener l return finalizer.run(); } + /** + * this function restores the server + * database after reading from the + * snapshots and transaction logs + * until the checkpoint zxid. + * + * @param dt the datatree to be restored + * @param sessions the sessions to be restored + * @param listener the playback listener to run on the + * database restoration + * @return the highest zxid restored + * @throws IOException + */ + public long restore(DataTree dt, Map sessions, PlayBackListener listener, long zxid) + throws IOException { + long snapLoadingStartTime = Time.currentElapsedTime(); + long deserializeResult = snapLog.deserialize(dt, sessions, zxid); + ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime); + FileTxnLog txnLog = new FileTxnLog(dataDir); + boolean trustEmptyDB; + File initFile = new File(dataDir.getParent(), "initialize"); + if (Files.deleteIfExists(initFile.toPath())) { + LOG.info("Initialize file found, an empty database will not block voting participation"); + trustEmptyDB = true; + } else { + trustEmptyDB = autoCreateDB; + } + + RestoreFinalizer finalizer = () -> { + long highestZxid = fastForwardFromEdits(dt, sessions, listener); + // The snapshotZxidDigest will reset after replaying the txn of the + // zxid in the snapshotZxidDigest, if it's not reset to null after + // restoring, it means either there are not enough txns to cover that + // zxid or that txn is missing + DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot(); + if (snapshotZxidDigest != null) { + LOG.warn( + "Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, " + + "which might lead to inconsistent state", + Long.toHexString(highestZxid), + Long.toHexString(snapshotZxidDigest.getZxid())); + } + return highestZxid; + }; + + if (-1L == deserializeResult) { + /* + * this means that we couldn't find any snapshot, so we need to + * initialize an empty database (reported in ZOOKEEPER-2325) + */ + if (txnLog.getLastLoggedZxid() != -1) { + // ZOOKEEPER-3056: provides an escape hatch for users upgrading + // from old versions of zookeeper (3.4.x, pre 3.5.3). + if (!trustEmptySnapshot) { + throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!"); + } else { + LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING); + return finalizer.run(); + } + } + + if (trustEmptyDB) { + /* + * TODO: (br33d) we should either put a ConcurrentHashMap on restore() + * or use Map on save() + */ + save(dt, (ConcurrentHashMap) sessions, false); + + /* return a zxid of 0, since we know the database is empty */ + return 0L; + } else { + /* return a zxid of -1, since we are possibly missing data */ + LOG.warn("Unexpected empty data tree, setting zxid to -1"); + dt.lastProcessedZxid = -1L; + return -1L; + } + } + + return finalizer.run(); + } + /** * This function will fast forward the server database to have the latest * transactions in it. This is the same as restore, but only reads from diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java index f5660c7df44..c8426ed0349 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java @@ -40,6 +40,16 @@ public interface SnapShot { */ long deserialize(DataTree dt, Map sessions) throws IOException; + /** + * deserialize a data tree from the last valid snapshot before the checkpoint zxid and return the last zxid that was deserialized + * + * @param dt the datatree to be deserialized into + * @param sessions the sessions to be deserialized into + * @return the last zxid that was deserialized from the snapshot + * @throws IOException + */ + long deserialize(DataTree dt, Map sessions, long zxid) throws IOException; + /** * persist the datatree and the sessions into a persistence storage * @param dt the datatree to be serialized diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java index 15e6b768468..60c54d3f62d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java @@ -47,6 +47,96 @@ public class EphemeralNodeDeletionTest extends QuorumPeerTestBase { private static int SERVER_COUNT = 3; private MainThread[] mt = new MainThread[SERVER_COUNT]; + @Test + @Timeout(value = 300) + public void testEphemeralNodeDeletionNewBug() throws Exception { + final int[] clientPorts = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + String server; + + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + + ":participant;127.0.0.1:" + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + // start all the servers + for (int i = 0; i < SERVER_COUNT; i++) { + final int customId = i; + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) { + @Override + public TestQPMain getTestQPMain() { + return new MockTestQPMain(); + } + }; + Thread.sleep(1000); + mt[i].start(); + } + + // ensure all servers started + for (int i = 0; i < SERVER_COUNT; i++) { + assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT), + "waiting for server " + i + " being up"); + } + + // Check that node 1 is the initial leader + assertEquals(1, mt[1].getQuorumPeer().getLeaderId()); + + CountdownWatcher watch = new CountdownWatcher(); + // QuorumPeer l = getByServerState(mt, ServerState.LEADING); + ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[0], ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + Stat firstEphemeralNode = new Stat(); + + // 1: create ephemeral node + String nodePath = "/e1"; + zk.create(nodePath, "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, firstEphemeralNode); + + // 2: Inject network error + for (int i = 0; i < SERVER_COUNT; i++) { + CustomQuorumPeer cqp = (CustomQuorumPeer) mt[i].getQuorumPeer(); + cqp.setInjectError(true); + } + + // 3: Quit + zk.close(); + + // 4: Wait until node 1 and node 2 have removed the ephemeral node + while (true) { + try { + Thread.sleep(250); + } catch (InterruptedException e) { + // ignore + } + + if (mt[1].getQuorumPeer().getZkDb().getSessionCount() == 0 + && mt[2].getQuorumPeer().getZkDb().getSessionCount() == 0) { + + // Double check + Thread.sleep(1000); + if (mt[1].getQuorumPeer().getZkDb().getSessionCount() == 0 + && mt[2].getQuorumPeer().getZkDb().getSessionCount() == 0) { + break; + } + } + } + + // 5: Remove network error + for (int i = 0; i < SERVER_COUNT; i++) { + CustomQuorumPeer cqp = (CustomQuorumPeer) mt[i].getQuorumPeer(); + cqp.setInjectError(false); + } + + // Node must have been deleted from 1 and 2 + assertNodeNotExist(nodePath, 2); + assertNodeNotExist(nodePath, 1); + + // Node is not deleted from 0 + assertNodeNotExist(nodePath, 0); + } + /** * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2355. * ZooKeeper ephemeral node is never deleted if follower fail while reading @@ -156,6 +246,18 @@ public TestQPMain getTestQPMain() { followerZK.close(); } + void assertNodeNotExist(String nodePath, int idx) throws Exception { + QuorumPeer qp = mt[idx].getQuorumPeer(); + + CountdownWatcher watch = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper("127.0.0.1:" + qp.getClientPort(), ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + Stat exists = zk.exists(nodePath, false); + zk.close(); + assertNull(exists, "Node must have been deleted from the node " + idx); + } + @AfterEach public void tearDown() { // stop all severs From 4caaea14a18bf3620e233ef3355b5d8fa9c357a2 Mon Sep 17 00:00:00 2001 From: Dimas Shidqi Parikesit Date: Thu, 6 Jun 2024 01:06:36 +0700 Subject: [PATCH 2/2] test description --- .../server/quorum/EphemeralNodeDeletionTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java index 60c54d3f62d..af3981d8c3a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java @@ -47,9 +47,12 @@ public class EphemeralNodeDeletionTest extends QuorumPeerTestBase { private static int SERVER_COUNT = 3; private MainThread[] mt = new MainThread[SERVER_COUNT]; + /** + * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-4837. + */ @Test @Timeout(value = 300) - public void testEphemeralNodeDeletionNewBug() throws Exception { + public void testEphemeralNodeDeletionZK4837() throws Exception { final int[] clientPorts = new int[SERVER_COUNT]; StringBuilder sb = new StringBuilder(); String server; @@ -63,7 +66,6 @@ public void testEphemeralNodeDeletionNewBug() throws Exception { String currentQuorumCfgSection = sb.toString(); // start all the servers for (int i = 0; i < SERVER_COUNT; i++) { - final int customId = i; mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) { @Override public TestQPMain getTestQPMain() { @@ -133,7 +135,7 @@ public TestQPMain getTestQPMain() { assertNodeNotExist(nodePath, 2); assertNodeNotExist(nodePath, 1); - // Node is not deleted from 0 + // If buggy, node is not deleted from 0 assertNodeNotExist(nodePath, 0); }