diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index c174fdd1e2f..2f31b0e84ca 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -34,6 +34,7 @@ import org.apache.zookeeper.server.quorum.LearnerHandler; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.AuthUtil; +import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.TxnDigest; import org.apache.zookeeper.txn.TxnHeader; import org.slf4j.Logger; @@ -364,6 +365,10 @@ public boolean isQuorum() { } } + public boolean isRollover() { + return isQuorum() && zxid > 0 && ZxidUtils.isLastEpochZxid(zxid); + } + public static String op2String(int op) { switch (op) { case OpCode.notification: diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index 0dc1a86860e..a3e172d15b7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -29,6 +29,9 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.quorum.ObserverZooKeeperServer; +import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer; +import org.apache.zookeeper.server.util.ZxidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -205,7 +208,7 @@ public void run() { // iff this is a read or a throttled request(which doesn't need to be written to the disk), // and there are no pending flushes (writes), then just pass this to the next processor if (nextProcessor != null) { - nextProcessor.processRequest(si); + handover(si); if (nextProcessor instanceof Flushable) { ((Flushable) nextProcessor).flush(); } @@ -213,7 +216,7 @@ public void run() { continue; } toFlush.add(si); - if (shouldFlush()) { + if (si.isRollover() || shouldFlush()) { flush(); } ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); @@ -224,6 +227,19 @@ public void run() { LOG.info("SyncRequestProcessor exited!"); } + private void handover(Request request) throws IOException, RequestProcessorException { + if (request.isRollover() && zks instanceof QuorumZooKeeperServer) { + long nextEpoch = ZxidUtils.getEpochFromZxid(request.zxid) + 1; + // Fences upcoming epoch in leader election. So there will be no chance for other peer + // to lead next epoch if this request is considered committed. + ((QuorumZooKeeperServer) zks).fenceRolloverEpoch(nextEpoch); + if (zks instanceof ObserverZooKeeperServer) { + ((ObserverZooKeeperServer) zks).confirmRolloverEpoch(nextEpoch); + } + } + nextProcessor.processRequest(request); + } + private void flush() throws IOException, RequestProcessorException { if (this.toFlush.isEmpty()) { return; @@ -242,7 +258,7 @@ private void flush() throws IOException, RequestProcessorException { final Request i = this.toFlush.remove(); long latency = Time.currentElapsedTime() - i.syncQueueStartTime; ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); - this.nextProcessor.processRequest(i); + handover(i); } if (this.nextProcessor instanceof Flushable) { ((Flushable) this.nextProcessor).flush(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index 0eff9d24837..c0029086008 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -49,7 +49,8 @@ public class Follower extends Learner { ObserverMaster om; - Follower(final QuorumPeer self, final FollowerZooKeeperServer zk) { + // VisibleForTesting + public Follower(final QuorumPeer self, final FollowerZooKeeperServer zk) { this.self = Objects.requireNonNull(self); this.fzk = Objects.requireNonNull(zk); this.zk = zk; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index b6766199988..6afb9f3e03a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -50,6 +50,10 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { private static final Logger LOG = LoggerFactory.getLogger(FollowerZooKeeperServer.class); + // This should be final as it is constructed with no external variables. It is not to allow mockito spy which + // intercepts `this`. + private ParticipantRequestSyncer requestSyncer; + /* * Pending sync requests */ ConcurrentLinkedQueue pendingSyncs; @@ -57,7 +61,8 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { /** * @throws IOException */ - FollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { + // VisibleForTesting + public FollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self); this.pendingSyncs = new ConcurrentLinkedQueue<>(); } @@ -75,12 +80,19 @@ protected void setupRequestProcessors() { ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower())); syncProcessor.start(); + requestSyncer = new ParticipantRequestSyncer(this, LOG, this::logRequest); } LinkedBlockingQueue pendingTxns = new LinkedBlockingQueue<>(); public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) { - final Request request = buildRequestToProcess(hdr, txn, digest); + Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); + request.setTxnDigest(digest); + requestSyncer.syncRequest(request); + } + + private void logRequest(Request request) { + pendingTxns.add(request); syncProcessor.processRequest(request); } @@ -116,6 +128,7 @@ public void commit(long zxid) { Request request = pendingTxns.remove(); request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY); commitProcessor.commit(request); + requestSyncer.finishCommit(request.zxid); } public synchronized void sync() { @@ -188,20 +201,4 @@ protected void unregisterMetrics() { rootContext.unregisterGauge("synced_observers"); } - - /** - * Build a request for the txn - * @param hdr the txn header - * @param txn the txn - * @param digest the digest of txn - * @return a request moving through a chain of RequestProcessors - */ - private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) { - final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); - request.setTxnDigest(digest); - if ((request.zxid & 0xffffffffL) != 0) { - pendingTxns.add(request); - } - return request; - } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index 1561a7dae18..eb39a5428ff 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -1025,14 +1025,13 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol // we're sending the designated leader, and if the leader is changing the followers are // responsible for closing the connection - this way we are sure that at least a majority of them // receive the commit message. - commitAndActivate(zxid, designatedLeader); + commitAndActivate(p, designatedLeader); informAndActivate(p, designatedLeader); } else { p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY); - commit(zxid); + commit(p); inform(p); } - zk.commitProcessor.commit(p.request); if (pendingSyncs.containsKey(zxid)) { for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) { sendSync(r); @@ -1065,16 +1064,7 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA LOG.trace("outstanding proposals all"); } - if ((zxid & 0xffffffffL) == 0) { - /* - * We no longer process NEWLEADER ack with this method. However, - * the learner sends an ack back to the leader after it gets - * UPTODATE, so we just ignore the message. - */ - return; - } - - if (outstandingProposals.size() == 0) { + if (outstandingProposals.isEmpty()) { LOG.debug("outstanding is 0"); return; } @@ -1212,25 +1202,30 @@ void sendObserverPacket(QuorumPacket qp) { long lastCommitted = -1; /** - * Create a commit packet and send it to all the members of the quorum - * - * @param zxid + * Commit proposal to all connected followers including itself. */ - public void commit(long zxid) { + public void commit(Proposal p) { + long zxid = p.getZxid(); synchronized (this) { lastCommitted = zxid; } + + zk.commit(p.request); + QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); sendPacket(qp); ServerMetrics.getMetrics().COMMIT_COUNT.add(1); } //commit and send some info - public void commitAndActivate(long zxid, long designatedLeader) { + public void commitAndActivate(Proposal p, long designatedLeader) { + long zxid = p.getZxid(); synchronized (this) { lastCommitted = zxid; } + zk.commit(p.request); + byte[] data = new byte[8]; ByteBuffer buffer = ByteBuffer.wrap(data); buffer.putLong(designatedLeader); @@ -1277,35 +1272,17 @@ public long getEpoch() { return ZxidUtils.getEpochFromZxid(lastProposed); } - @SuppressWarnings("serial") - public static class XidRolloverException extends Exception { - - public XidRolloverException(String message) { - super(message); - } - - } - /** * create a proposal and send it out to all the members * * @param request * @return the proposal that is queued to send to all the members */ - public Proposal propose(Request request) throws XidRolloverException { + public Proposal propose(Request request) { if (request.isThrottled()) { LOG.error("Throttled request send as proposal: {}. Exiting.", request); ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); } - /** - * Address the rollover issue. All lower 32bits set indicate a new leader - * election. Force a re-election instead. See ZOOKEEPER-1277 - */ - if ((request.zxid & 0xffffffffL) == 0xffffffffL) { - String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start"; - shutdown(msg); - throw new XidRolloverException(msg); - } byte[] data = request.getSerializeData(); proposalStats.setLastBufferSize(data.length); @@ -1331,6 +1308,7 @@ public Proposal propose(Request request) throws XidRolloverException { sendPacket(pp); } ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1); + zk.logRequest(request); return p; } @@ -1465,6 +1443,22 @@ public void reportLookingSid(long sid) { } } + /** + * Comparing to {@link #getEpochToPropose(long, long)}, this method does not bump `acceptedEpoch` + * as the rollover txn may not be persisted yet. + */ + public void rolloverLeaderEpoch(long newEpoch) { + synchronized (connectingFollowers) { + if (waitingForNewEpoch) { + throw new IllegalStateException("ZAB is still waiting new epoch"); + } else if (newEpoch != epoch + 1) { + String msg = String.format("can not rollover leader epoch to %s, current epoch is %s", newEpoch, epoch); + throw new IllegalArgumentException(msg); + } + epoch = newEpoch; + } + } + @Override public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized (connectingFollowers) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index 799b8f96148..ab93cfe1273 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -33,8 +33,11 @@ import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @@ -44,6 +47,7 @@ * FinalRequestProcessor */ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { + private static final Logger LOG = LoggerFactory.getLogger(LeaderZooKeeperServer.class); private ContainerManager containerManager; // guarded by sync @@ -51,6 +55,10 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { PrepRequestProcessor prepRequestProcessor; + SyncRequestProcessor syncProcessor; + private final ParticipantRequestSyncer requestSyncer = + new ParticipantRequestSyncer(this, LOG, r -> syncProcessor.processRequest(r)); + /** * @throws IOException */ @@ -68,8 +76,10 @@ protected void setupRequestProcessors() { RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); + AckRequestProcessor ackProcessor = new AckRequestProcessor(getLeader()); + syncProcessor = new SyncRequestProcessor(this, ackProcessor); + syncProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); - proposalProcessor.initialize(); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); @@ -159,6 +169,7 @@ protected synchronized void shutdownComponents() { if (containerManager != null) { containerManager.stop(); } + syncProcessor.shutdown(); super.shutdownComponents(); } @@ -169,6 +180,21 @@ public int getGlobalOutstandingLimit() { return globalOutstandingLimit; } + @Override + public void confirmRolloverEpoch(long newEpoch) { + getLeader().rolloverLeaderEpoch(newEpoch); + super.confirmRolloverEpoch(newEpoch); + } + + public void logRequest(Request request) { + requestSyncer.syncRequest(request); + } + + public void commit(Request request) { + commitProcessor.commit(request); + requestSyncer.finishCommit(request.zxid); + } + @Override public void createSessionTracker() { sessionTracker = new LeaderSessionTracker( diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 1ef99e50aae..ee01e7d5d0f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -810,6 +810,21 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); zk.startup(); + + long lastCommittedZxid = zk.getLastProcessedZxid(); + long lastCommittedEpoch = ZxidUtils.getEpochFromZxid(lastCommittedZxid); + if (ZxidUtils.isLastEpochZxid(lastCommittedZxid)) { + lastCommittedEpoch += 1; + } + LOG.debug("lastCommittedZxid {}, lastCommittedEpoch {} newEpoch {}", + Long.toHexString(lastCommittedZxid), lastCommittedEpoch, newEpoch); + if (lastCommittedEpoch > newEpoch) { + LOG.info("Switch to new leader epoch {} from {}", lastCommittedEpoch, newEpoch); + newEpoch = lastCommittedEpoch; + self.setAcceptedEpoch(newEpoch); + self.setCurrentEpoch(newEpoch); + } + /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index 57947daa86c..d072dabc954 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -587,6 +587,8 @@ public void run() { ServerMetrics.getMetrics().SNAP_COUNT.add(1); } } else { + LOG.info("Sending diffs last zxid of peer is 0x{}, zxid of leader is 0x{}", + Long.toHexString(peerLastZxid), Long.toHexString(leaderLastZxid)); syncThrottler = learnerMaster.getLearnerDiffSyncThrottler(); syncThrottler.beginSync(exemptFromThrottle); ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress()); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ParticipantRequestSyncer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ParticipantRequestSyncer.java new file mode 100644 index 00000000000..2ed54014422 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ParticipantRequestSyncer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.util.ZxidUtils; +import org.slf4j.Logger; + +public class ParticipantRequestSyncer { + private QuorumZooKeeperServer server; + private final Logger log; + private final Consumer syncRequest; + private long nextEpoch = 0; + private final List nextEpochTxns = new ArrayList<>(); + + public ParticipantRequestSyncer(QuorumZooKeeperServer server, Logger log, Consumer syncRequest) { + this.server = server; + this.log = log; + this.syncRequest = syncRequest; + } + + public void syncRequest(Request request) { + if (nextEpoch != 0) { + // We can't persist requests from new leader epoch for now, as the new leader epoch + // has not been committed yet. Otherwise, we could run into inconsistent if another + // peer wins election and leads the same epoch. + // + // See also https://issues.apache.org/jira/browse/ZOOKEEPER-1277?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13149973 + log.debug("Block request(zxid: {}) on new leader epoch {}.", Long.toHexString(request.zxid), nextEpoch); + nextEpochTxns.add(request); + return; + } + if (ZxidUtils.isLastEpochZxid(request.zxid)) { + nextEpoch = ZxidUtils.getEpochFromZxid(request.zxid) + 1; + log.info("Receive last epoch zxid {}, preparing next epoch {}", Long.toHexString(request.zxid), nextEpoch); + } + syncRequest.accept(request); + } + + public void finishCommit(long zxid) { + if (ZxidUtils.isLastEpochZxid(zxid)) { + log.info("Switch to new leader epoch {}", nextEpoch); + server.confirmRolloverEpoch(nextEpoch); + nextEpoch = 0; + nextEpochTxns.forEach(syncRequest); + nextEpochTxns.clear(); + } + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java index c1e2fe16e43..41ae330d99b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java @@ -21,14 +21,11 @@ import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerMetrics; -import org.apache.zookeeper.server.SyncRequestProcessor; -import org.apache.zookeeper.server.quorum.Leader.XidRolloverException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This RequestProcessor simply forwards requests to an AckRequestProcessor and - * SyncRequestProcessor. + * This RequestProcessor simply forwards requests to {@link Leader#propose(Request)}. */ public class ProposalRequestProcessor implements RequestProcessor { @@ -38,8 +35,6 @@ public class ProposalRequestProcessor implements RequestProcessor { RequestProcessor nextProcessor; - SyncRequestProcessor syncProcessor; - // If this property is set, requests from Learners won't be forwarded // to the CommitProcessor in order to save resources public static final String FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED = @@ -49,8 +44,6 @@ public class ProposalRequestProcessor implements RequestProcessor { public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) { this.zks = zks; this.nextProcessor = nextProcessor; - AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); - syncProcessor = new SyncRequestProcessor(zks, ackProcessor); forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean( FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED); @@ -58,13 +51,6 @@ public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor next forwardLearnerRequestsToCommitProcessorDisabled); } - /** - * initialize this processor - */ - public void initialize() { - syncProcessor.start(); - } - public void processRequest(Request request) throws RequestProcessorException { /* In the following IF-THEN-ELSE block, we process syncs on the leader. * If the sync is coming from a follower, then the follower @@ -81,12 +67,7 @@ public void processRequest(Request request) throws RequestProcessorException { } if (request.getHdr() != null) { // We need to sync and get consensus on any transactions - try { - zks.getLeader().propose(request); - } catch (XidRolloverException e) { - throw new RequestProcessorException(e.getMessage(), e); - } - syncProcessor.processRequest(request); + zks.getLeader().propose(request); } } } @@ -94,7 +75,6 @@ public void processRequest(Request request) throws RequestProcessorException { public void shutdown() { LOG.info("Shutting down"); nextProcessor.shutdown(); - syncProcessor.shutdown(); } private boolean shouldForwardToNextProcessor(Request request) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 876a297f9aa..12dbd0840cb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -1235,7 +1235,7 @@ private void loadDataBase() { } else { throw new IOException( "The current epoch, " + ZxidUtils.zxidToString(currentEpoch) - + ", is older than the last zxid, " + lastProcessedZxid); + + ", is older than the last zxid, " + Long.toHexString(lastProcessedZxid)); } } try { @@ -1578,6 +1578,7 @@ public void run() { } else { try { reconfigFlagClear(); + checkSuspended(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); @@ -2262,7 +2263,7 @@ public void setZKDatabase(ZKDatabase database) { this.zkDb = database; } - protected ZKDatabase getZkDb() { + public ZKDatabase getZkDb() { return zkDb; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java index 240936956fc..a70e5317859 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java @@ -122,6 +122,43 @@ private Request makeUpgradeRequest(long sessionId) { return null; } + /** + * This must only be called after the rollover txn has been persisted as it bumps `currentEpoch`. + * + *

Note that it is possible for leader/follower to commit a proposal while it has not yet persisted + * that proposal. + * + *

See {@link Learner#registerWithLeader(int)} and {@link Leader#getEpochToPropose(long, long)} for details + * of {@link org.apache.zookeeper.server.quorum.QuorumPeer.ZabState#DISCOVERY} phase. + * + *

See {@link Learner#syncWithLeader(long)} for details of + * {@link org.apache.zookeeper.server.quorum.QuorumPeer.ZabState#SYNCHRONIZATION} phase. + */ + public void fenceRolloverEpoch(long newEpoch) throws IOException { + // Once majority nodes increase their `acceptedEpoch`, the `newEpoch` is fenced in future elections. + self.setAcceptedEpoch(newEpoch); + + // There is no DIFF to synchronize. + // + // It is erroneous to bump `currentEpoch` before persisting the rollover proposal. + // + // 1. Leader is able to commit a proposal even if it has not yet persisted the proposal. + // 2. `peerEpoch`(a.k.a. `currentEpoch`) take higher priority than `lastLoggedZxid`. + // + // So, if `currentEpoch` is bumped before persisting the rollover proposal, restarted + // leader could win election and truncate committed proposals in other nodes. + // + // The above applies to followers also. + self.setCurrentEpoch(newEpoch); + } + + public void confirmRolloverEpoch(long newEpoch) { + // Quorum confirms this, there is no chance for others to rule + // this new epoch. So it is safe to broadcast this fact, even + // though we may not persist rollover txn yet. + self.updateElectionVote(newEpoch); + } + /** * Implements the SessionUpgrader interface, * diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java index b3b6935d5c7..5f80b8e0f39 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java @@ -19,6 +19,7 @@ package org.apache.zookeeper.server.util; public class ZxidUtils { + public static final long MAX_COUNTER = 0x00000000ffffffffL; public static long getEpochFromZxid(long zxid) { return zxid >> 32L; @@ -33,4 +34,7 @@ public static String zxidToString(long zxid) { return Long.toHexString(zxid); } + public static boolean isLastEpochZxid(long zxid) { + return getCounterFromZxid(zxid) == MAX_COUNTER; + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java index 031ccc2f7da..7da5b04733e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java @@ -18,6 +18,7 @@ package org.apache.zookeeper.server; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -26,8 +27,9 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.apache.zookeeper.test.ClientTest; @@ -228,18 +230,14 @@ public void tearDown() throws Exception { */ private int createNodes(ZooKeeper zk, int start, int count) throws Exception { LOG.info("Creating nodes {} thru {}", start, (start + count)); - int j = 0; - try { - for (int i = start; i < start + count; i++) { - zk.create("/foo" + i, new byte[0], Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL); - j++; - } - } catch (ConnectionLossException e) { - // this is ok - the leader has dropped leadership - waitForClientsConnected(); + for (int i = start; i < start + count; i++) { + Stat stat = new Stat(); + zk.create("/foo" + i, new byte[0], ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL, stat); + LOG.info("STAT: {}", Long.toHexString(stat.getCzxid())); } - return j; + return count; } + /** * Verify the expected znodes were created and that the last znode, which * caused the roll-over, did not. @@ -247,15 +245,15 @@ private int createNodes(ZooKeeper zk, int start, int count) throws Exception { private void checkNodes(ZooKeeper zk, int start, int count) throws Exception { LOG.info("Validating nodes {} thru {}", start, (start + count)); for (int i = start; i < start + count; i++) { - assertNotNull(zk.exists("/foo" + i, false)); - LOG.error("Exists zxid:{}", Long.toHexString(zk.exists("/foo" + i, false).getCzxid())); + Stat stat = zk.exists("/foo" + i, false); + assertNotNull(stat); + LOG.info("Exists zxid:{}", Long.toHexString(stat.getCzxid())); } assertNull(zk.exists("/foo" + (start + count), false)); } /** - * Prior to the fix this test would hang for a while, then fail with - * connection loss. + * Verify that operations during zxid rollover should be fine. */ @Test public void testSimpleRolloverFollower() throws Exception { @@ -265,6 +263,8 @@ public void testSimpleRolloverFollower() throws Exception { int countCreated = createNodes(zk, 0, 10); checkNodes(zk, 0, countCreated); + + assertEquals(10, countCreated); } /** @@ -308,8 +308,7 @@ public void testRolloverThenRestart() throws Exception { countCreated += createNodes(zk, countCreated, 10); // sanity check - assertTrue(countCreated > 0); - assertTrue(countCreated < 60); + assertEquals(60, countCreated); } /** @@ -350,8 +349,7 @@ public void testRolloverThenFollowerRestart() throws Exception { countCreated += createNodes(zk, countCreated, 10); // sanity check - assertTrue(countCreated > 0); - assertTrue(countCreated < 60); + assertEquals(60, countCreated); } /** @@ -395,8 +393,7 @@ public void testRolloverThenLeaderRestart() throws Exception { countCreated += createNodes(zk, countCreated, 10); // sanity check - assertTrue(countCreated > 0); - assertTrue(countCreated < 50); + assertEquals(50, countCreated); } /** @@ -442,8 +439,7 @@ public void testMultipleRollover() throws Exception { countCreated += createNodes(zk, countCreated, 10); // sanity check - assertTrue(countCreated > 0); - assertTrue(countCreated < 70); + assertEquals(70, countCreated); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java index 9c4e5214d4c..225e4110c52 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java @@ -44,6 +44,7 @@ import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.common.X509Exception; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; @@ -93,6 +94,7 @@ public void setUp() throws IOException, X509Exception { ZKDatabase zkDb = new ZKDatabase(fileTxnSnapLog); zks = new LeaderZooKeeperServer(fileTxnSnapLog, qp, zkDb); + zks.syncProcessor = mock(SyncRequestProcessor.class); leader = new Leader(qp, zks); leaderBean = new LeaderBean(leader, zks); } @@ -135,7 +137,7 @@ public void testGetElectionTimeTaken() { } @Test - public void testGetProposalSize() throws IOException, Leader.XidRolloverException { + public void testGetProposalSize() throws IOException { // Arrange Request req = createMockRequest(); @@ -150,7 +152,7 @@ public void testGetProposalSize() throws IOException, Leader.XidRolloverExceptio } @Test - public void testResetProposalStats() throws IOException, Leader.XidRolloverException { + public void testResetProposalStats() throws IOException { // Arrange int initialProposalSize = leaderBean.getLastProposalSize(); Request req = createMockRequest(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java index 5216eb70324..1341ca87310 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java @@ -196,8 +196,10 @@ protected void setupRequestProcessors() { RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); - ProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(this, commitProcessor); - proposalProcessor.initialize(); + AckRequestProcessor ackProcessor = new AckRequestProcessor(this.getLeader()); + syncProcessor = new MockSyncRequestProcessor(this, ackProcessor); + syncProcessor.start(); + ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); @@ -229,21 +231,6 @@ public void shutdown() { } - private static class MockProposalRequestProcessor extends ProposalRequestProcessor { - - public MockProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) { - super(zks, nextProcessor); - - /** - * The only purpose here is to inject the mocked - * SyncRequestProcessor - */ - AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); - syncProcessor = new MockSyncRequestProcessor(zks, ackProcessor); - } - - } - private static class MockTestQPMain extends TestQPMain { @Override diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index d374062e293..1b788b2097a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -987,7 +987,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro @Test public void testTxnTimeout(@TempDir File testData) throws Exception { testLeaderConversation(new LeaderConversation() { - public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException { + public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException { assertEquals(0, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ZxidRolloverCrashTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ZxidRolloverCrashTest.java new file mode 100644 index 00000000000..459128d76b0 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ZxidRolloverCrashTest.java @@ -0,0 +1,969 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import java.io.IOException; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.jute.Record; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.DataTree; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.util.ZxidUtils; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumUtil; +import org.apache.zookeeper.txn.TxnDigest; +import org.apache.zookeeper.txn.TxnHeader; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZxidRolloverCrashTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(ZxidRolloverCrashTest.class); + private static final long MAX_ZXID_COUNTER = ZxidUtils.MAX_COUNTER; + private static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT; + + QuorumUtil qu; + + @BeforeEach + public void setUp() { + // write and sync + System.setProperty("zookeeper.maxBatchSize", "1"); + } + + @AfterEach + public void tearDown() { + if (qu != null) { + qu.shutdownAll(); + } + } + + private long setZxidCounter(long zxid, long counter) { + return ZxidUtils.makeZxid(ZxidUtils.getEpochFromZxid(zxid), counter); + } + + private long maximizeZxid(long zxid) { + return setZxidCounter(zxid, MAX_ZXID_COUNTER); + } + + static class LeaderContext { + private final AtomicReference leader = new AtomicReference<>(); + private final AtomicLong zxid = new AtomicLong(Long.MAX_VALUE); + private final AtomicInteger acks = new AtomicInteger(); + private final CompletableFuture completed = new CompletableFuture<>(); + + Leader setLeader(Leader leader) { + this.leader.compareAndSet(null, leader); + return leader; + } + + boolean isLeader(Leader leader) { + return this.leader.get() == leader; + } + + int incAcks() { + return acks.incrementAndGet(); + } + } + + @Test + public void testLeaderCrashAfterRolloverMajorityReplicated() throws Exception { + // Intercepts leader to replicate rollover proposal only to n+1 of 2n+1 nodes(including leader). + LeaderContext context = new LeaderContext(); + final int N = 1; + qu = new QuorumUtil(N) { + @Override + protected Leader makeLeader(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException, X509Exception { + return context.setLeader(new Leader(self, new LeaderZooKeeperServer(logFactory, self, self.getZkDb())) { + @Override + public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) { + if (context.isLeader(this) && zxid == context.zxid.get()) { + LOG.info("Ack to 0x{} from peer {}", ZxidUtils.zxidToString(zxid), sid); + if (context.incAcks() >= N + 1) { + self.setSuspended(true); + self.shuttingDownLE = true; + self.getElectionAlg().shutdown(); + context.completed.complete(null); + } + return; + } + super.processAck(sid, zxid, followerAddr); + } + + @Override + protected void sendPacket(QuorumPacket qp) { + if (context.isLeader(this) && qp.getType() == Leader.PROPOSAL && qp.getZxid() >= context.zxid.get()) { + getForwardingFollowers().stream().limit(N).forEach(follower -> { + follower.queuePacket(qp); + }); + } else { + super.sendPacket(qp); + } + } + }); + } + }; + qu.disableJMXTest = true; + qu.startAll(); + + int leaderId = qu.getLeaderServer(); + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + + // Connect only to leader to avoid re-connect attempts. + try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectString(qu.getLeaderQuorumPeer()))) { + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + context.zxid.set(maximizeZxid(zkLeader.getZxid())); + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: leader crash after rollover proposal replicated to minority, + // but before claiming it "committed" + context.completed.join(); + qu.shutdown(leaderId); + } + + qu.restart(leaderId); + + // then: after all servers up, every node should meet following conditions + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + String connectString = qu.getConnectionStringForServer(serverId); + try (ZooKeeper zk = ClientBase.createZKClient(connectString)) { + // then: all proposals up to and including the rollover proposal must be replicated + for (long i = exclusiveStartCounter + 1; i <= MAX_ZXID_COUNTER; i++) { + String path = "/foo" + Long.toHexString(i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + assertEquals(ZxidUtils.makeZxid(epoch, i), stat.getCzxid()); + } + + // then: new epoch proposals after rollover don't persist as leader declares no leadership yet + assertNull(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER + 1), false)); + + // then: new epoch must greater than `epoch + 1` as at least n+1 nodes fenced it + Stat stat = new Stat(); + zk.create("/server" + serverId, null, ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + long newEpoch = ZxidUtils.getEpochFromZxid(stat.getCzxid()); + assertEquals(epoch + 2, newEpoch); + } + } + } + + @Test + public void testLeaderCrashAfterRolloverMinorityReplicated() throws Exception { + // Intercepts leader to replicate rollover proposal only to n of 2n+1 nodes(including leader). + LeaderContext context = new LeaderContext(); + final int N = 2; + qu = new QuorumUtil(N) { + @Override + protected Leader makeLeader(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException, X509Exception { + return context.setLeader(new Leader(self, new LeaderZooKeeperServer(logFactory, self, self.getZkDb())) { + @Override + public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) { + if (context.isLeader(this) && zxid == context.zxid.get()) { + LOG.info("Ack to 0x{} from peer {}", ZxidUtils.zxidToString(zxid), sid); + if (context.incAcks() >= N) { + self.setSuspended(true); + self.shuttingDownLE = true; + self.getElectionAlg().shutdown(); + context.completed.complete(null); + } + return; + } + super.processAck(sid, zxid, followerAddr); + } + + @Override + protected void sendPacket(QuorumPacket qp) { + if (context.isLeader(this) && qp.getType() == Leader.PROPOSAL && qp.getZxid() >= context.zxid.get()) { + getForwardingFollowers().stream().limit(N - 1).forEach(follower -> { + follower.queuePacket(qp); + }); + } else { + super.sendPacket(qp); + } + } + }); + } + }; + qu.disableJMXTest = true; + qu.startAll(); + + int leaderId = qu.getLeaderServer(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + final long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + // Connect only to leader to avoid re-connect attempts. + try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectString(qu.getLeaderQuorumPeer()))) { + // given: leader with about to rollover zxid + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + context.zxid.set(maximizeZxid(zkLeader.getZxid())); + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: leader crash after rollover proposal replicated to minority + context.completed.join(); + qu.shutdown(leaderId); + } + + qu.restart(leaderId); + + // then: after all servers up, every node should meet following conditions + long rolloverProposalZxid = 0; + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnString())) { + // then: all proposals up to but not including the rollover proposal must be replicated + for (long i = exclusiveStartCounter + 1; i < MAX_ZXID_COUNTER; i++) { + String path = "/foo" + Long.toHexString(i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + assertEquals(ZxidUtils.makeZxid(epoch, i), stat.getCzxid()); + } + + // It is indeterminate which part will win. The minority could win as they have higher `currentEpoch`. + // + // We can't make aggressive assertion like `equalTo(epoch + 1)` even when majority wins. As the new + // leader epoch is negotiated to greater than all `acceptedEpoch` in the quorum. So, it is possible + // for the leader epoch to be `greaterThan(epoch + 1)`. + // + // The situation is similar to leader crashed after minority has persisted `currentEpoch` in a + // normal "DISCOVERY" phase. + // + // See "Phase 1: Establish an epoch" of https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zab1.0 + Stat stat = new Stat(); + zk.create("/server" + serverId, null, ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + long newEpoch = ZxidUtils.getEpochFromZxid(stat.getCzxid()); + assertThat(newEpoch, greaterThanOrEqualTo(epoch + 1)); + + // then: it is indeterminate whether the rollover proposal has been committed, + // but it must be consistent among all nodes + if (rolloverProposalZxid == 0) { + rolloverProposalZxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + LOG.info("Get rollover proposal zxid {}", ZxidUtils.zxidToString(rolloverProposalZxid)); + } else { + long zxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + assertEquals(ZxidUtils.zxidToString(rolloverProposalZxid), ZxidUtils.zxidToString(zxid)); + } + } + } + } + + @Test + public void testLeaderCrashBeforeRolloverReplication() throws Exception { + // Intercepts leader not to replicate rollover proposal. + LeaderContext context = new LeaderContext(); + final int N = 1; + qu = new QuorumUtil(N) { + @Override + protected Leader makeLeader(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException, X509Exception { + return context.setLeader(new Leader(self, new LeaderZooKeeperServer(logFactory, self, self.getZkDb())) { + @Override + public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) { + super.processAck(sid, zxid, followerAddr); + if (!context.isLeader(this)) { + return; + } + if (zxid == context.zxid.get()) { + context.acks.incrementAndGet(); + } + if (context.acks.get() != 0 && outstandingProposals.get(context.zxid.get() - 1) == null) { + self.setSuspended(true); + self.shuttingDownLE = true; + self.getElectionAlg().shutdown(); + context.completed.complete(null); + } + } + + @Override + protected void sendPacket(QuorumPacket qp) { + if (context.isLeader(this) && qp.getType() == Leader.PROPOSAL && qp.getZxid() >= context.zxid.get()) { + return; + } + super.sendPacket(qp); + } + }); + } + }; + qu.disableJMXTest = true; + qu.startAll(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + + int leaderId = qu.getLeaderServer(); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + context.zxid.set(setZxidCounter(zkLeader.getZxid(), MAX_ZXID_COUNTER)); + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name, stat) -> { + }, + null); + } + + // when: leader crash before broadcasting rollover proposal + context.completed.join(); + qu.shutdown(leaderId); + } + + String connString = qu.getConnString() + .replace(leaderConnectString + ",", "") + .replace(leaderConnectString, ""); + boolean restarted = false; + for (int j = 0; true; j++) { + try (ZooKeeper zk = ClientBase.createZKClient(connString, ClientBase.CONNECTION_TIMEOUT)) { + // then: all proposals up to but not including the rollover proposal must be replicated + for (long i = exclusiveStartCounter + 1; i < ZxidUtils.MAX_COUNTER; i++) { + String path = "/foo" + Long.toHexString(i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + assertEquals(ZxidUtils.makeZxid(epoch, i), stat.getCzxid()); + } + + // then: the rollover proposal is lost even after old leader with higher `currentEpoch` re-join. + assertNull(zk.exists("/foo" + Long.toHexString(ZxidUtils.MAX_COUNTER), false)); + + // then: new epoch will be `epoch + 1`. + Stat stat = new Stat(); + zk.create("/bar" + j, null, ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEquals(epoch + 1, ZxidUtils.getEpochFromZxid(stat.getCzxid())); + } + + if (restarted) { + break; + } + + // when: rejoin old leader which fences `epoch + 1` + // then: all above holds as majority has formed + qu.start(leaderId); + restarted = true; + connString = qu.getConnectionStringForServer(leaderId); + } + } + + @Test + public void testMinorityFollowersCrashBeforeWriteRolloverToDisk() throws Exception { + final int N = 1; + final int minorityN = N; + class Context { + final BlockingQueue followers = new ArrayBlockingQueue<>(minorityN); + final AtomicLong rolloverZxid = new AtomicLong(Long.MAX_VALUE); + final Set crashed_servers = Collections.synchronizedSet(new HashSet<>()); + final BlockingQueue crashing_servers = new LinkedBlockingQueue<>(); + + boolean bypass(Follower follower, long zxid) { + boolean minority = followers.offer(follower) || followers.contains(follower); + if (minority && zxid >= rolloverZxid.get()) { + if (crashed_servers.add(follower.self.getMyId())) { + crashing_servers.add(follower.self.getMyId()); + } + return true; + } + return false; + } + } + Context context = new Context(); + qu = new QuorumUtil(N) { + @Override + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb()) { + @Override + public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) { + if (!context.bypass(getFollower(), hdr.getZxid())) { + super.logRequest(hdr, txn, digest); + } + } + }); + } + }; + qu.startAll(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + context.rolloverZxid.set(ZxidUtils.makeZxid(epoch, MAX_ZXID_COUNTER)); + + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: minority followers crashed before persisting rollover proposal + for (int i = 0; i < minorityN; i++) { + long serverId = context.crashing_servers.take(); + qu.shutdown((int) serverId); + } + } + + for (long serverId : context.crashed_servers) { + qu.restart((int) serverId); + } + + // then: after all servers up, every node should meet following conditions + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + String connectString = qu.getConnectionStringForServer(serverId); + try (ZooKeeper zk = ClientBase.createZKClient(connectString)) { + // then: all proposals must be replicated + for (int i = 1; i <= 10; i++) { + String path = "/foo" + Long.toHexString(exclusiveStartCounter + i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + + // then: we have rollover to nextEpoch + long nextEpoch = epoch + 1; + assertEquals(nextEpoch, ZxidUtils.getEpochFromZxid(qu.getPeer(serverId).peer.getZkDb().getDataTreeLastProcessedZxid())); + Stat stat = new Stat(); + zk.create("/server" + serverId, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEquals(nextEpoch, ZxidUtils.getEpochFromZxid(stat.getMzxid())); + } + } + } + + @Test + public void testMajorityFollowersCrashBeforeWriteRolloverToDisk() throws Exception { + final int N = 1; + final int majorityN = N + 1; + class Context { + final BlockingQueue followers = new ArrayBlockingQueue<>(majorityN); + final AtomicLong rolloverZxid = new AtomicLong(Long.MAX_VALUE); + final Set crashed_servers = Collections.synchronizedSet(new HashSet<>()); + final BlockingQueue crashing_servers = new LinkedBlockingQueue<>(); + + boolean bypass(Follower follower, long zxid) { + boolean majority = followers.offer(follower) || followers.contains(follower); + if (majority && zxid >= rolloverZxid.get()) { + if (crashed_servers.add(follower.self.getMyId())) { + crashing_servers.add(follower.self.getMyId()); + } + return true; + } + return false; + } + } + Context context = new Context(); + qu = new QuorumUtil(N) { + @Override + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb()) { + @Override + public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) { + if (!context.bypass(getFollower(), hdr.getZxid())) { + super.logRequest(hdr, txn, digest); + } + } + }); + } + }; + qu.startAll(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + ClientBase.CountdownWatcher watcher = new ClientBase.CountdownWatcher(); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString, watcher)) { + context.rolloverZxid.set(ZxidUtils.makeZxid(epoch, MAX_ZXID_COUNTER)); + + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: majority followers crashed before persisting rollover proposal + for (int i = 0; i < majorityN; i++) { + long serverId = context.crashing_servers.take(); + qu.shutdown((int) serverId); + } + + watcher.waitForDisconnected(CONNECTION_TIMEOUT); + watcher.reset(); + + for (long serverId : context.crashed_servers) { + qu.restart((int) serverId); + } + + watcher.waitForConnected(CONNECTION_TIMEOUT); + } + + // then: after quorum reformed, every node should meet following conditions + long rolloverProposalZxid = 0; + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + String connectString = qu.getConnectionStringForServer(serverId); + try (ZooKeeper zk = ClientBase.createZKClient(connectString)) { + // then: all proposals up to but not including the rollover proposal must be replicated + for (long i = exclusiveStartCounter + 1; i < ZxidUtils.MAX_COUNTER; i++) { + String path = "/foo" + Long.toHexString(i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + + // then: it is indeterminate whether the rollover proposal has been committed, + // but it must be consistent among all nodes + if (rolloverProposalZxid == 0) { + rolloverProposalZxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + LOG.info("Get rollover proposal zxid {}", ZxidUtils.zxidToString(rolloverProposalZxid)); + } else { + long zxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + assertEquals(ZxidUtils.zxidToString(rolloverProposalZxid), ZxidUtils.zxidToString(zxid)); + } + + // then: new epoch proposal must not be committed + assertNull(zk.exists("/foo" + Long.toHexString(ZxidUtils.MAX_COUNTER + 1), false)); + } + } + } + + @Test + public void testMinorityFollowersCrashAfterWriteRolloverToDisk() throws Exception { + final int N = 1; + class Context { + final Set crashed_servers = Collections.synchronizedSet(new HashSet<>()); + final BlockingQueue crashing_servers = new LinkedBlockingQueue<>(); + } + Context context = new Context(); + qu = new QuorumUtil(N) { + @Override + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb()) { + @Override + public void fenceRolloverEpoch(long newEpoch) throws IOException { + super.fenceRolloverEpoch(newEpoch); + long myId = self.getMyId(); + if (context.crashed_servers.size() < N && !context.crashed_servers.contains(myId)) { + context.crashed_servers.add(myId); + context.crashing_servers.add(myId); + throw new IOException("crash peer " + myId + "after persist max epoch zxid"); + } + } + }); + } + }; + qu.startAll(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: minority followers crashed after replication + for (int i = 0; i < N; i++) { + long serverId = context.crashing_servers.take(); + qu.shutdown((int) serverId); + } + } + + for (long serverId : context.crashed_servers) { + qu.restart((int) serverId); + } + + // then: after all servers up, every node should meet following conditions + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + String connectString = qu.getConnectionStringForServer(serverId); + try (ZooKeeper zk = ClientBase.createZKClient(connectString)) { + // then: all proposals must be replicated + for (int i = 1; i <= 10; i++) { + String path = "/foo" + Long.toHexString(exclusiveStartCounter + i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + + // then: we have rollover to nextEpoch + long nextEpoch = epoch + 1; + assertEquals(nextEpoch, ZxidUtils.getEpochFromZxid(qu.getPeer(serverId).peer.getZkDb().getDataTreeLastProcessedZxid())); + Stat stat = new Stat(); + zk.create("/server" + serverId, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEquals(nextEpoch, ZxidUtils.getEpochFromZxid(stat.getMzxid())); + } + } + } + + @Test + public void testMajorityFollowersCrashAfterWriteRolloverToDisk() throws Exception { + final int N = 1; + class Context { + final Set crashed_servers = Collections.synchronizedSet(new LinkedHashSet<>()); + final BlockingQueue crashing_servers = new LinkedBlockingQueue<>(); + } + Context context = new Context(); + qu = new QuorumUtil(N) { + @Override + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb()) { + @Override + public void fenceRolloverEpoch(long newEpoch) throws IOException { + super.fenceRolloverEpoch(newEpoch); + long myId = self.getMyId(); + if (context.crashed_servers.size() < N + 1 && !context.crashed_servers.contains(myId)) { + context.crashed_servers.add(myId); + context.crashing_servers.add(myId); + throw new IOException("crash peer " + myId + "after persist max epoch zxid"); + } + } + }); + } + }; + qu.startAll(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + ClientBase.CountdownWatcher watcher = new ClientBase.CountdownWatcher(); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString, watcher)) { + // given: leader about to rollover + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + + // when: multiple proposals during rollover to next epoch + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + + // when: majority followers crashed after replicating rollover proposal + for (int i = 0; i < N + 1; i++) { + long serverId = context.crashing_servers.take(); + qu.shutdown((int) serverId); + } + + watcher.waitForDisconnected(CONNECTION_TIMEOUT); + watcher.reset(); + + for (long serverId : context.crashed_servers) { + qu.restart((int) serverId); + } + + watcher.waitForConnected(CONNECTION_TIMEOUT); + } + + // then: after quorum reformed, every node should meet following conditions + long rolloverProposalZxid = 0; + for (int serverId = 1; serverId <= 2 * N + 1; serverId++) { + String connectString = qu.getConnectionStringForServer(serverId); + try (ZooKeeper zk = ClientBase.createZKClient(connectString)) { + // then: all proposals up to rollover proposal must be replicated + for (long i = exclusiveStartCounter + 1; i < ZxidUtils.MAX_COUNTER; i++) { + String path = "/foo" + Long.toHexString(i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + + // then: it is indeterminate whether the rollover proposal has been committed, + // but it must be consistent among all nodes + if (rolloverProposalZxid == 0) { + rolloverProposalZxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + LOG.info("Get rollover proposal zxid {}", ZxidUtils.zxidToString(rolloverProposalZxid)); + } else { + long zxid = + Optional.ofNullable(zk.exists("/foo" + Long.toHexString(MAX_ZXID_COUNTER), false)) + .map(Stat::getCzxid).orElse(-1L); + assertEquals(ZxidUtils.zxidToString(rolloverProposalZxid), ZxidUtils.zxidToString(zxid)); + } + + // then: new epoch proposal must not be committed + assertNull(zk.exists("/foo" + Long.toHexString(ZxidUtils.MAX_COUNTER + 1), false)); + } + } + } + + @Test + public void testLearnerRejoinDuringLeaderRolloverEpoch() throws Exception { + final int N = 1; + class Context { + private final AtomicLong rolloverZxid = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong followerId = new AtomicLong(-1); + private final CompletableFuture rolloverCommitting = new CompletableFuture<>(); + private final CompletableFuture rolloverCommitted = new CompletableFuture<>(); + } + Context context = new Context(); + qu = new QuorumUtil(N) { + @Override + protected Leader makeLeader(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException, X509Exception { + return new Leader(self, new LeaderZooKeeperServer(logFactory, self, self.getZkDb()) { + @Override + public DataTree.ProcessTxnResult processTxn(Request request) { + DataTree.ProcessTxnResult result = super.processTxn(request); + // Leader is about to rollover, combining with below randomness, + // we can test all cases: + // 1. Sync before rollover proposal committed. + // 2. Sync after rollover proposal committed. + // 3. Sync after proposals from new epoch committed. + if (request.zxid + 2 >= context.rolloverZxid.get()) { + context.rolloverCommitting.join(); + context.rolloverCommitted.complete(null); + } + return result; + } + }) { + @Override + public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { + super.waitForEpochAck(id, ss); + if (id == context.followerId.get()) { + context.rolloverCommitted.join(); + // Sleep a bit before sync to allow more proposals to come. + Thread.sleep(new Random().nextInt(10)); + } + } + }; + } + + @Override + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb())) { + @Override + protected void syncWithLeader(long newLeaderZxid) throws Exception { + super.syncWithLeader(newLeaderZxid); + } + + @Override + protected long registerWithLeader(int pktType) throws IOException { + long leaderZxid = super.registerWithLeader(pktType); + if (self.getMyId() == context.followerId.get()) { + context.rolloverCommitting.complete(null); + } + return leaderZxid; + } + }; + } + }; + qu.startAll(); + + int followerId = (int) qu.getFollowerQuorumPeers().get(0).getMyId(); + CompletableFuture restarted = new CompletableFuture<>(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: a re-joining follower + qu.shutdown(followerId); + + ForkJoinPool.commonPool().submit(() -> { + context.followerId.set(followerId); + qu.restart(followerId); + restarted.complete(null); + return null; + }); + + // given: leader rollover to next epoch + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + context.rolloverZxid.set(ZxidUtils.makeZxid(epoch, MAX_ZXID_COUNTER)); + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + } + + // when: follower rejoin + restarted.join(); + + String followerAddr = qu.getConnectionStringForServer(followerId); + try (ZooKeeper zk = ClientBase.createZKClient(followerAddr)) { + zk.sync("/"); + // then: all proposals must be replicated + for (int i = 1; i <= 10; i++) { + String path = "/foo" + Long.toHexString(exclusiveStartCounter + i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + QuorumPeer follower = qu.getPeer(followerId).peer; + assertEquals(epoch + 1, follower.getAcceptedEpoch()); + assertEquals(epoch + 1, follower.getCurrentEpoch()); + assertEquals(epoch + 1, follower.getCurrentVote().getPeerEpoch()); + } + } + + @Test + public void testLearnerRejoinAfterLeaderRolloverEpoch() throws Exception { + final int N = 1; + qu = new QuorumUtil(N); + qu.startAll(); + + int followerId = (int) qu.getFollowerQuorumPeers().get(0).getMyId(); + + ZooKeeperServer zkLeader = qu.getLeaderQuorumPeer().getActiveServer(); + long epoch = ZxidUtils.getEpochFromZxid(zkLeader.getZxid()); + long exclusiveStartCounter = MAX_ZXID_COUNTER - 5; + // Connect only to leader to avoid re-connect attempts. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: a shutdown follower + qu.shutdown(followerId); + + // given: leader rollover to next epoch + zkLeader.setZxid(setZxidCounter(zkLeader.getZxid(), exclusiveStartCounter)); + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + Long.toHexString(exclusiveStartCounter + i), + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + } + + // when: follower rejoin + qu.restart(followerId); + + String followerAddr = qu.getConnectionStringForServer(followerId); + try (ZooKeeper zk = ClientBase.createZKClient(followerAddr)) { + zk.sync("/"); + // then: all proposals must be replicated + for (int i = 1; i <= 10; i++) { + String path = "/foo" + Long.toHexString(exclusiveStartCounter + i); + Stat stat = zk.exists(path, false); + assertNotNull(stat, path + " not found"); + } + QuorumPeer follower = qu.getPeer(followerId).peer; + assertEquals(epoch + 1, follower.getAcceptedEpoch()); + assertEquals(epoch + 1, follower.getCurrentEpoch()); + assertEquals(epoch + 1, follower.getCurrentVote().getPeerEpoch()); + } + } +} \ No newline at end of file diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java index 3021421b5e4..66f5e61bd61 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java @@ -735,6 +735,10 @@ public static ZooKeeper createZKClient(String cxnString) throws Exception { return createZKClient(cxnString, CONNECTION_TIMEOUT); } + public static ZooKeeper createZKClient(String cxnString, CountdownWatcher watcher) throws Exception { + return createZKClient(cxnString, CONNECTION_TIMEOUT, CONNECTION_TIMEOUT, new ZKClientConfig(), watcher); + } + /** * Returns ZooKeeper client after connecting to ZooKeeper Server. Session * timeout is {@link #CONNECTION_TIMEOUT} @@ -757,6 +761,11 @@ public static ZooKeeper createZKClient(String cxnString, int sessionTimeout, public static ZooKeeper createZKClient(String cxnString, int sessionTimeout, long connectionTimeout, ZKClientConfig config) throws IOException { CountdownWatcher watcher = new CountdownWatcher(); + return createZKClient(cxnString, sessionTimeout, connectionTimeout, config, watcher); + } + + public static ZooKeeper createZKClient(String cxnString, int sessionTimeout, + long connectionTimeout, ZKClientConfig config, CountdownWatcher watcher) throws IOException { ZooKeeper zk = new ZooKeeper(cxnString, sessionTimeout, watcher, config); try { watcher.waitForConnected(connectionTimeout); @@ -765,5 +774,4 @@ public static ZooKeeper createZKClient(String cxnString, int sessionTimeout, } return zk; } - } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java index 865d82b0c7c..e60e43a93c9 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java @@ -34,7 +34,13 @@ import java.util.Set; import java.util.TreeSet; import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.Election; +import org.apache.zookeeper.server.quorum.Follower; +import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; @@ -65,7 +71,7 @@ public static class PeerStruct { } - private final Map peersView = new HashMap<>(); + protected final Map peersView = new HashMap<>(); private final Map peers = new HashMap<>(); @@ -75,15 +81,15 @@ public static class PeerStruct { private String hostPort; - private int tickTime; + protected int tickTime; - private int initLimit; + protected int initLimit; - private int syncLimit; + protected int syncLimit; - private int connectToLearnerMasterLimit; + protected int connectToLearnerMasterLimit; - private int electionAlg; + protected int electionAlg; private boolean localSessionEnabled; @@ -120,7 +126,7 @@ public QuorumUtil(int n, int syncLimit) throws RuntimeException { for (int i = 1; i <= ALL; ++i) { PeerStruct ps = peers.get(i); LOG.info("Creating QuorumPeer {}; public port {}", i, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); assertEquals(ps.clientPort, ps.peer.getClientPort()); } } catch (Exception e) { @@ -143,6 +149,28 @@ public void enableLocalSession(boolean localSessionEnabled) { this.localSessionEnabled = localSessionEnabled; } + protected QuorumPeer newQuorumPeer(PeerStruct ps) throws IOException { + return new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit) { + @Override + protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception { + return QuorumUtil.this.makeLeader(this, logFactory); + } + + @Override + protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { + return QuorumUtil.this.makeFollower(this, logFactory); + } + }; + } + + protected Leader makeLeader(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException, X509Exception { + return new Leader(self, new LeaderZooKeeperServer(logFactory, self, self.getZkDb())); + } + + protected Follower makeFollower(QuorumPeer self, FileTxnSnapLog logFactory) throws IOException { + return new Follower(self, new FollowerZooKeeperServer(logFactory, self, self.getZkDb())); + } + public void startAll() throws IOException { shutdownAll(); for (int i = 1; i <= ALL; ++i) { @@ -206,7 +234,7 @@ public void startQuorum() throws IOException { public void start(int id) throws IOException { PeerStruct ps = getPeer(id); LOG.info("Creating QuorumPeer {}; public port {}", ps.id, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); if (localSessionEnabled) { ps.peer.enableLocalSessions(true); } @@ -225,7 +253,7 @@ public void restart(int id) throws IOException { public void startThenShutdown(int id) throws IOException { PeerStruct ps = getPeer(id); LOG.info("Creating QuorumPeer {}; public port {}", ps.id, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); if (localSessionEnabled) { ps.peer.enableLocalSessions(true); }