Review Board 1.7.22


Consolidate membership management

Review Request #4729 - Created April 15, 2012 and updated

Alexander Shraer
Reviewers
zookeeper
zookeeper
https://issues.apache.org/jira/browse/ZOOKEEPER-1411

Currently every server has a different configuration file. With this patch, we will have all cluster membership definitions in a single file, and every sever can have a copy of this file. 
Many tests were updated to work with the new configuration format. MembershipBCTest.java tests backward compatibility.

Diff revision 2 (Latest)

1 2
1 2

  1. /src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java: Loading...
  2. /src/java/main/org/apache/zookeeper/server/quorum/Leader.java: Loading...
  3. /src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java: Loading...
  4. /src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java: Loading...
  5. /src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java: Loading...
  6. /src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java: Loading...
  7. /src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java: Loading...
  8. /src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java: Loading...
  9. /src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java: Loading...
  10. /src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java: Loading...
  11. /src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java: Loading...
  12. /src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java: Loading...
  13. /src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java: Loading...
  14. /src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java: Loading...
  15. /src/java/test/org/apache/zookeeper/server/util/DynamicConfigBCTest.java: Loading...
  16. /src/java/test/org/apache/zookeeper/test/CnxManagerTest.java: Loading...
  17. /src/java/test/org/apache/zookeeper/test/FLETest.java: Loading...
  18. /src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java: Loading...
  19. /src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java: Loading...
  20. /src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java: Loading...
  21. /src/java/test/org/apache/zookeeper/test/ObserverTest.java: Loading...
  22. /src/java/test/org/apache/zookeeper/test/QuorumBase.java: Loading...
  23. /src/java/test/org/apache/zookeeper/test/QuorumUtil.java: Loading...
  24. /src/java/test/org/apache/zookeeper/test/StandaloneTest.java: Loading...
/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
Revision 1327937 New Change
1
/**
1
/**
2
 * Licensed to the Apache Software Foundation (ASF) under one
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
8
 * with the License.  You may obtain a copy of the License at
9
 *
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18

    
   
18

   
19

    
   
19

   
20
package org.apache.zookeeper.server.quorum;
20
package org.apache.zookeeper.server.quorum;
21

    
   
21

   
22
import java.io.IOException;
22
import java.io.IOException;
23
import java.nio.ByteBuffer;
23
import java.nio.ByteBuffer;
24
import java.util.HashMap;
24
import java.util.HashMap;
25
import java.util.HashSet;
25
import java.util.HashSet;
26
import java.util.Map;
26
import java.util.Map;
27
import java.util.concurrent.LinkedBlockingQueue;
27
import java.util.concurrent.LinkedBlockingQueue;
28
import java.util.concurrent.TimeUnit;
28
import java.util.concurrent.TimeUnit;
29

    
   
29

   
30
import org.apache.zookeeper.jmx.MBeanRegistry;
30
import org.apache.zookeeper.jmx.MBeanRegistry;
31
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
31
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
32
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
32
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
33
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
33
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
34
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
34
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
35
import org.apache.zookeeper.server.util.ZxidUtils;
35
import org.apache.zookeeper.server.util.ZxidUtils;
36
import org.slf4j.Logger;
36
import org.slf4j.Logger;
37
import org.slf4j.LoggerFactory;
37
import org.slf4j.LoggerFactory;
38

    
   
38

   
39

    
   
39

   
40
/**
40
/**
41
 * Implementation of leader election using TCP. It uses an object of the class
41
 * Implementation of leader election using TCP. It uses an object of the class
42
 * QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based
42
 * QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based
43
 * as with the other UDP implementations.
43
 * as with the other UDP implementations.
44
 *
44
 *
45
 * There are a few parameters that can be tuned to change its behavior. First,
45
 * There are a few parameters that can be tuned to change its behavior. First,
46
 * finalizeWait determines the amount of time to wait until deciding upon a leader.
46
 * finalizeWait determines the amount of time to wait until deciding upon a leader.
47
 * This is part of the leader election algorithm.
47
 * This is part of the leader election algorithm.
48
 */
48
 */
49

    
   
49

   
50

    
   
50

   
51
public class FastLeaderElection implements Election {
51
public class FastLeaderElection implements Election {
52
    private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class);
52
    private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class);
53

    
   
53

   
54
    /**
54
    /**
55
     * Determine how much time a process has to wait
55
     * Determine how much time a process has to wait
56
     * once it believes that it has reached the end of
56
     * once it believes that it has reached the end of
57
     * leader election.
57
     * leader election.
58
     */
58
     */
59
    final static int finalizeWait = 200;
59
    final static int finalizeWait = 200;
60

    
   
60

   
61

    
   
61

   
62
    /**
62
    /**
63
     * Upper bound on the amount of time between two consecutive
63
     * Upper bound on the amount of time between two consecutive
64
     * notification checks. This impacts the amount of time to get
64
     * notification checks. This impacts the amount of time to get
65
     * the system up again after long partitions. Currently 60 seconds.
65
     * the system up again after long partitions. Currently 60 seconds.
66
     */
66
     */
67

    
   
67

   
68
    final static int maxNotificationInterval = 60000;
68
    final static int maxNotificationInterval = 60000;
69

    
   
69

   
70
    /**
70
    /**
71
     * Connection manager. Fast leader election uses TCP for
71
     * Connection manager. Fast leader election uses TCP for
72
     * communication between peers, and QuorumCnxManager manages
72
     * communication between peers, and QuorumCnxManager manages
73
     * such connections.
73
     * such connections.
74
     */
74
     */
75

    
   
75

   
76
    QuorumCnxManager manager;
76
    QuorumCnxManager manager;
77

    
   
77

   
78

    
   
78

   
79
    /**
79
    /**
80
     * Notifications are messages that let other peers know that
80
     * Notifications are messages that let other peers know that
81
     * a given peer has changed its vote, either because it has
81
     * a given peer has changed its vote, either because it has
82
     * joined leader election or because it learned of another
82
     * joined leader election or because it learned of another
83
     * peer with higher zxid or same zxid and higher server id
83
     * peer with higher zxid or same zxid and higher server id
84
     */
84
     */
85

    
   
85

   
86
    static public class Notification {
86
    static public class Notification {
87
        /*
87
        /*
88
         * Proposed leader
88
         * Proposed leader
89
         */
89
         */
90
        long leader;
90
        long leader;
91

    
   
91

   
92
        /*
92
        /*
93
         * zxid of the proposed leader
93
         * zxid of the proposed leader
94
         */
94
         */
95
        long zxid;
95
        long zxid;
96

    
   
96

   
97
        /*
97
        /*
98
         * Epoch
98
         * Epoch
99
         */
99
         */
100
        long electionEpoch;
100
        long electionEpoch;
101

    
   
101

   
102
        /*
102
        /*
103
         * current state of sender
103
         * current state of sender
104
         */
104
         */
105
        QuorumPeer.ServerState state;
105
        QuorumPeer.ServerState state;
106

    
   
106

   
107
        /*
107
        /*
108
         * Address of sender
108
         * Address of sender
109
         */
109
         */
110
        long sid;
110
        long sid;
111

    
   
111

   
112
        /*
112
        /*
113
         * epoch of the proposed leader
113
         * epoch of the proposed leader
114
         */
114
         */
115
        long peerEpoch;
115
        long peerEpoch;
116
    }
116
    }
117

    
   
117

   
118
    /**
118
    /**
119
     * Messages that a peer wants to send to other peers.
119
     * Messages that a peer wants to send to other peers.
120
     * These messages can be both Notifications and Acks
120
     * These messages can be both Notifications and Acks
121
     * of reception of notification.
121
     * of reception of notification.
122
     */
122
     */
123
    static public class ToSend {
123
    static public class ToSend {
124
        static enum mType {crequest, challenge, notification, ack}
124
        static enum mType {crequest, challenge, notification, ack}
125

    
   
125

   
126
        ToSend(mType type,
126
        ToSend(mType type,
127
                long leader,
127
                long leader,
128
                long zxid,
128
                long zxid,
129
                long electionEpoch,
129
                long electionEpoch,
130
                ServerState state,
130
                ServerState state,
131
                long sid,
131
                long sid,
132
                long peerEpoch) {
132
                long peerEpoch) {
133

    
   
133

   
134
            this.leader = leader;
134
            this.leader = leader;
135
            this.zxid = zxid;
135
            this.zxid = zxid;
136
            this.electionEpoch = electionEpoch;
136
            this.electionEpoch = electionEpoch;
137
            this.state = state;
137
            this.state = state;
138
            this.sid = sid;
138
            this.sid = sid;
139
            this.peerEpoch = peerEpoch;
139
            this.peerEpoch = peerEpoch;
140
        }
140
        }
141

    
   
141

   
142
        /*
142
        /*
143
         * Proposed leader in the case of notification
143
         * Proposed leader in the case of notification
144
         */
144
         */
145
        long leader;
145
        long leader;
146

    
   
146

   
147
        /*
147
        /*
148
         * id contains the tag for acks, and zxid for notifications
148
         * id contains the tag for acks, and zxid for notifications
149
         */
149
         */
150
        long zxid;
150
        long zxid;
151

    
   
151

   
152
        /*
152
        /*
153
         * Epoch
153
         * Epoch
154
         */
154
         */
155
        long electionEpoch;
155
        long electionEpoch;
156

    
   
156

   
157
        /*
157
        /*
158
         * Current state;
158
         * Current state;
159
         */
159
         */
160
        QuorumPeer.ServerState state;
160
        QuorumPeer.ServerState state;
161

    
   
161

   
162
        /*
162
        /*
163
         * Address of recipient
163
         * Address of recipient
164
         */
164
         */
165
        long sid;
165
        long sid;
166
        
166
        
167
        /*
167
        /*
168
         * Leader epoch
168
         * Leader epoch
169
         */
169
         */
170
        long peerEpoch;
170
        long peerEpoch;
171
    }
171
    }
172

    
   
172

   
173
    LinkedBlockingQueue<ToSend> sendqueue;
173
    LinkedBlockingQueue<ToSend> sendqueue;
174
    LinkedBlockingQueue<Notification> recvqueue;
174
    LinkedBlockingQueue<Notification> recvqueue;
175

    
   
175

   
176
    /**
176
    /**
177
     * Multi-threaded implementation of message handler. Messenger
177
     * Multi-threaded implementation of message handler. Messenger
178
     * implements two sub-classes: WorkReceiver and  WorkSender. The
178
     * implements two sub-classes: WorkReceiver and  WorkSender. The
179
     * functionality of each is obvious from the name. Each of these
179
     * functionality of each is obvious from the name. Each of these
180
     * spawns a new thread.
180
     * spawns a new thread.
181
     */
181
     */
182

    
   
182

   
183
    private class Messenger {
183
    private class Messenger {
184

    
   
184

   
185
        /**
185
        /**
186
         * Receives messages from instance of QuorumCnxManager on
186
         * Receives messages from instance of QuorumCnxManager on
187
         * method run(), and processes such messages.
187
         * method run(), and processes such messages.
188
         */
188
         */
189

    
   
189

   
190
        class WorkerReceiver implements Runnable {
190
        class WorkerReceiver implements Runnable {
191
            volatile boolean stop;
191
            volatile boolean stop;
192
            QuorumCnxManager manager;
192
            QuorumCnxManager manager;
193

    
   
193

   
194
            WorkerReceiver(QuorumCnxManager manager) {
194
            WorkerReceiver(QuorumCnxManager manager) {
195
                this.stop = false;
195
                this.stop = false;
196
                this.manager = manager;
196
                this.manager = manager;
197
            }
197
            }
198

    
   
198

   
199
            public void run() {
199
            public void run() {
200

    
   
200

   
201
                Message response;
201
                Message response;
202
                while (!stop) {
202
                while (!stop) {
203
                    // Sleeps on receive
203
                    // Sleeps on receive
204
                    try{
204
                    try{
205
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
205
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
206
                        if(response == null) continue;
206
                        if(response == null) continue;
207

    
   
207

   
208
                        /*
208
                        /*
209
                         * If it is from an observer, respond right away.
209
                         * If it is from an observer, respond right away.
210
                         * Note that the following predicate assumes that
210
                         * Note that the following predicate assumes that
211
                         * if a server is not a follower, then it must be
211
                         * if a server is not a follower, then it must be
212
                         * an observer. If we ever have any other type of
212
                         * an observer. If we ever have any other type of
213
                         * learner in the future, we'll have to change the
213
                         * learner in the future, we'll have to change the
214
                         * way we check for observers.
214
                         * way we check for observers.
215
                         */
215
                         */
216
                        if(!self.getVotingView().containsKey(response.sid)){
216
                        if(!self.getVotingView().containsKey(response.sid)){
217
                            Vote current = self.getCurrentVote();
217
                            Vote current = self.getCurrentVote();
218
                            ToSend notmsg = new ToSend(ToSend.mType.notification,
218
                            ToSend notmsg = new ToSend(ToSend.mType.notification,
219
                                    current.getId(),
219
                                    current.getId(),
220
                                    current.getZxid(),
220
                                    current.getZxid(),
221
                                    logicalclock,
221
                                    logicalclock,
222
                                    self.getPeerState(),
222
                                    self.getPeerState(),
223
                                    response.sid,
223
                                    response.sid,
224
                                    current.getPeerEpoch());
224
                                    current.getPeerEpoch());
225

    
   
225

   
226
                            sendqueue.offer(notmsg);
226
                            sendqueue.offer(notmsg);
227
                        } else {
227
                        } else {
228
                            // Receive new message
228
                            // Receive new message
229
                            if (LOG.isDebugEnabled()) {
229
                            if (LOG.isDebugEnabled()) {
230
                                LOG.debug("Receive new notification message. My id = "
230
                                LOG.debug("Receive new notification message. My id = "
231
                                        + self.getId());
231
                                        + self.getId());
232
                            }
232
                            }
233

    
   
233

   
234
                            /*
234
                            /*
235
                             * We check for 28 bytes for backward compatibility
235
                             * We check for 28 bytes for backward compatibility
236
                             */
236
                             */
237
                            if (response.buffer.capacity() < 28) {
237
                            if (response.buffer.capacity() < 28) {
238
                                LOG.error("Got a short response: "
238
                                LOG.error("Got a short response: "
239
                                        + response.buffer.capacity());
239
                                        + response.buffer.capacity());
240
                                continue;
240
                                continue;
241
                            }
241
                            }
242
                            boolean backCompatibility = (response.buffer.capacity() == 28);
242
                            boolean backCompatibility = (response.buffer.capacity() == 28);
243
                            response.buffer.clear();
243
                            response.buffer.clear();
244

    
   
244

   
245
                            // State of peer that sent this message
245
                            // State of peer that sent this message
246
                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
246
                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
247
                            switch (response.buffer.getInt()) {
247
                            switch (response.buffer.getInt()) {
248
                            case 0:
248
                            case 0:
249
                                ackstate = QuorumPeer.ServerState.LOOKING;
249
                                ackstate = QuorumPeer.ServerState.LOOKING;
250
                                break;
250
                                break;
251
                            case 1:
251
                            case 1:
252
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
252
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
253
                                break;
253
                                break;
254
                            case 2:
254
                            case 2:
255
                                ackstate = QuorumPeer.ServerState.LEADING;
255
                                ackstate = QuorumPeer.ServerState.LEADING;
256
                                break;
256
                                break;
257
                            case 3:
257
                            case 3:
258
                                ackstate = QuorumPeer.ServerState.OBSERVING;
258
                                ackstate = QuorumPeer.ServerState.OBSERVING;
259
                                break;
259
                                break;
260
                            }
260
                            }
261

    
   
261

   
262
                            // Instantiate Notification and set its attributes
262
                            // Instantiate Notification and set its attributes
263
                            Notification n = new Notification();
263
                            Notification n = new Notification();
264
                            n.leader = response.buffer.getLong();
264
                            n.leader = response.buffer.getLong();
265
                            n.zxid = response.buffer.getLong();
265
                            n.zxid = response.buffer.getLong();
266
                            n.electionEpoch = response.buffer.getLong();
266
                            n.electionEpoch = response.buffer.getLong();
267
                            n.state = ackstate;
267
                            n.state = ackstate;
268
                            n.sid = response.sid;
268
                            n.sid = response.sid;
269
                            if(!backCompatibility){
269
                            if(!backCompatibility){
270
                                n.peerEpoch = response.buffer.getLong();
270
                                n.peerEpoch = response.buffer.getLong();
271
                            } else {
271
                            } else {
272
                                if(LOG.isInfoEnabled()){
272
                                if(LOG.isInfoEnabled()){
273
                                    LOG.info("Backward compatibility mode, server id=" + n.sid);
273
                                    LOG.info("Backward compatibility mode, server id=" + n.sid);
274
                                }
274
                                }
275
                                n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
275
                                n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
276
                            }
276
                            }
277

    
   
277

   
278
                            /*
278
                            /*
279
                             * Print notification info
279
                             * Print notification info
280
                             */
280
                             */
281
                            if(LOG.isInfoEnabled()){
281
                            if(LOG.isInfoEnabled()){
282
                                printNotification(n);
282
                                printNotification(n);
283
                            }
283
                            }
284

    
   
284

   
285
                            /*
285
                            /*
286
                             * If this server is looking, then send proposed leader
286
                             * If this server is looking, then send proposed leader
287
                             */
287
                             */
288

    
   
288

   
289
                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
289
                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
290
                                recvqueue.offer(n);
290
                                recvqueue.offer(n);
291

    
   
291

   
292
                                /*
292
                                /*
293
                                 * Send a notification back if the peer that sent this
293
                                 * Send a notification back if the peer that sent this
294
                                 * message is also looking and its logical clock is
294
                                 * message is also looking and its logical clock is
295
                                 * lagging behind.
295
                                 * lagging behind.
296
                                 */
296
                                 */
297
                                if((ackstate == QuorumPeer.ServerState.LOOKING)
297
                                if((ackstate == QuorumPeer.ServerState.LOOKING)
298
                                        && (n.electionEpoch < logicalclock)){
298
                                        && (n.electionEpoch < logicalclock)){
299
                                    Vote v = getVote();
299
                                    Vote v = getVote();
300
                                    ToSend notmsg = new ToSend(ToSend.mType.notification,
300
                                    ToSend notmsg = new ToSend(ToSend.mType.notification,
301
                                            v.getId(),
301
                                            v.getId(),
302
                                            v.getZxid(),
302
                                            v.getZxid(),
303
                                            logicalclock,
303
                                            logicalclock,
304
                                            self.getPeerState(),
304
                                            self.getPeerState(),
305
                                            response.sid,
305
                                            response.sid,
306
                                            v.getPeerEpoch());
306
                                            v.getPeerEpoch());
307
                                    sendqueue.offer(notmsg);
307
                                    sendqueue.offer(notmsg);
308
                                }
308
                                }
309
                            } else {
309
                            } else {
310
                                /*
310
                                /*
311
                                 * If this server is not looking, but the one that sent the ack
311
                                 * If this server is not looking, but the one that sent the ack
312
                                 * is looking, then send back what it believes to be the leader.
312
                                 * is looking, then send back what it believes to be the leader.
313
                                 */
313
                                 */
314
                                Vote current = self.getCurrentVote();
314
                                Vote current = self.getCurrentVote();
315
                                if(ackstate == QuorumPeer.ServerState.LOOKING){
315
                                if(ackstate == QuorumPeer.ServerState.LOOKING){
316
                                    if(LOG.isDebugEnabled()){
316
                                    if(LOG.isDebugEnabled()){
317
                                        LOG.debug("Sending new notification. My id =  " +
317
                                        LOG.debug("Sending new notification. My id =  " +
318
                                                self.getId() + " recipient=" +
318
                                                self.getId() + " recipient=" +
319
                                                response.sid + " zxid=0x" +
319
                                                response.sid + " zxid=0x" +
320
                                                Long.toHexString(current.getZxid()) +
320
                                                Long.toHexString(current.getZxid()) +
321
                                                " leader=" + current.getId());
321
                                                " leader=" + current.getId());
322
                                    }
322
                                    }
323
                                    ToSend notmsg = new ToSend(
323
                                    ToSend notmsg = new ToSend(
324
                                            ToSend.mType.notification,
324
                                            ToSend.mType.notification,
325
                                            current.getId(),
325
                                            current.getId(),
326
                                            current.getZxid(),
326
                                            current.getZxid(),
327
                                            logicalclock,
327
                                            logicalclock,
328
                                            self.getPeerState(),
328
                                            self.getPeerState(),
329
                                            response.sid,
329
                                            response.sid,
330
                                            current.getPeerEpoch());
330
                                            current.getPeerEpoch());
331
                                    sendqueue.offer(notmsg);
331
                                    sendqueue.offer(notmsg);
332
                                }
332
                                }
333
                            }
333
                            }
334
                        }
334
                        }
335
                    } catch (InterruptedException e) {
335
                    } catch (InterruptedException e) {
336
                        System.out.println("Interrupted Exception while waiting for new message" +
336
                        System.out.println("Interrupted Exception while waiting for new message" +
337
                                e.toString());
337
                                e.toString());
338
                    }
338
                    }
339
                }
339
                }
340
                LOG.info("WorkerReceiver is down");
340
                LOG.info("WorkerReceiver is down");
341
            }
341
            }
342
        }
342
        }
343

    
   
343

   
344

    
   
344

   
345
        /**
345
        /**
346
         * This worker simply dequeues a message to send and
346
         * This worker simply dequeues a message to send and
347
         * and queues it on the manager's queue.
347
         * and queues it on the manager's queue.
348
         */
348
         */
349

    
   
349

   
350
        class WorkerSender implements Runnable {
350
        class WorkerSender implements Runnable {
351
            volatile boolean stop;
351
            volatile boolean stop;
352
            QuorumCnxManager manager;
352
            QuorumCnxManager manager;
353

    
   
353

   
354
            WorkerSender(QuorumCnxManager manager){
354
            WorkerSender(QuorumCnxManager manager){
355
                this.stop = false;
355
                this.stop = false;
356
                this.manager = manager;
356
                this.manager = manager;
357
            }
357
            }
358

    
   
358

   
359
            public void run() {
359
            public void run() {
360
                while (!stop) {
360
                while (!stop) {
361
                    try {
361
                    try {
362
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
362
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
363
                        if(m == null) continue;
363
                        if(m == null) continue;
364

    
   
364

   
365
                        process(m);
365
                        process(m);
366
                    } catch (InterruptedException e) {
366
                    } catch (InterruptedException e) {
367
                        break;
367
                        break;
368
                    }
368
                    }
369
                }
369
                }
370
                LOG.info("WorkerSender is down");
370
                LOG.info("WorkerSender is down");
371
            }
371
            }
372

    
   
372

   
373
            /**
373
            /**
374
             * Called by run() once there is a new message to send.
374
             * Called by run() once there is a new message to send.
375
             *
375
             *
376
             * @param m     message to send
376
             * @param m     message to send
377
             */
377
             */
378
            private void process(ToSend m) {
378
            private void process(ToSend m) {
379
                byte requestBytes[] = new byte[36];
379
                byte requestBytes[] = new byte[36];
380
                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
380
                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
381

    
   
381

   
382
                /*
382
                /*
383
                 * Building notification packet to send
383
                 * Building notification packet to send
384
                 */
384
                 */
385

    
   
385

   
386
                requestBuffer.clear();
386
                requestBuffer.clear();
387
                requestBuffer.putInt(m.state.ordinal());
387
                requestBuffer.putInt(m.state.ordinal());
388
                requestBuffer.putLong(m.leader);
388
                requestBuffer.putLong(m.leader);
389
                requestBuffer.putLong(m.zxid);
389
                requestBuffer.putLong(m.zxid);
390
                requestBuffer.putLong(m.electionEpoch);
390
                requestBuffer.putLong(m.electionEpoch);
391
                requestBuffer.putLong(m.peerEpoch);
391
                requestBuffer.putLong(m.peerEpoch);
392

    
   
392

   
393
                manager.toSend(m.sid, requestBuffer);
393
                manager.toSend(m.sid, requestBuffer);
394

    
   
394

   
395
            }
395
            }
396
        }
396
        }
397

    
   
397

   
398
        WorkerSender ws;
398
        WorkerSender ws;
399
        WorkerReceiver wr;
399
        WorkerReceiver wr;
400

    
   
400

   
401
        /**
401
        /**
402
         * Constructor of class Messenger.
402
         * Constructor of class Messenger.
403
         *
403
         *
404
         * @param manager   Connection manager
404
         * @param manager   Connection manager
405
         */
405
         */
406
        Messenger(QuorumCnxManager manager) {
406
        Messenger(QuorumCnxManager manager) {
407

    
   
407

   
408
            this.ws = new WorkerSender(manager);
408
            this.ws = new WorkerSender(manager);
409

    
   
409

   
410
            Thread t = new Thread(this.ws,
410
            Thread t = new Thread(this.ws,
411
                    "WorkerSender[myid=" + self.getId() + "]");
411
                    "WorkerSender[myid=" + self.getId() + "]");
412
            t.setDaemon(true);
412
            t.setDaemon(true);
413
            t.start();
413
            t.start();
414

    
   
414

   
415
            this.wr = new WorkerReceiver(manager);
415
            this.wr = new WorkerReceiver(manager);
416

    
   
416

   
417
            t = new Thread(this.wr,
417
            t = new Thread(this.wr,
418
                    "WorkerReceiver[myid=" + self.getId() + "]");
418
                    "WorkerReceiver[myid=" + self.getId() + "]");
419
            t.setDaemon(true);
419
            t.setDaemon(true);
420
            t.start();
420
            t.start();
421
        }
421
        }
422

    
   
422

   
423
        /**
423
        /**
424
         * Stops instances of WorkerSender and WorkerReceiver
424
         * Stops instances of WorkerSender and WorkerReceiver
425
         */
425
         */
426
        void halt(){
426
        void halt(){
427
            this.ws.stop = true;
427
            this.ws.stop = true;
428
            this.wr.stop = true;
428
            this.wr.stop = true;
429
        }
429
        }
430

    
   
430

   
431
    }
431
    }
432

    
   
432

   
433
    QuorumPeer self;
433
    QuorumPeer self;
434
    Messenger messenger;
434
    Messenger messenger;
435
    volatile long logicalclock; /* Election instance */
435
    volatile long logicalclock; /* Election instance */
436
    long proposedLeader;
436
    long proposedLeader;
437
    long proposedZxid;
437
    long proposedZxid;
438
    long proposedEpoch;
438
    long proposedEpoch;
439

    
   
439

   
440

    
   
440

   
441
    /**
441
    /**
442
     * Returns the current vlue of the logical clock counter
442
     * Returns the current vlue of the logical clock counter
443
     */
443
     */
444
    public long getLogicalClock(){
444
    public long getLogicalClock(){
445
    return logicalclock;
445
    return logicalclock;
446
    }
446
    }
447

    
   
447

   
448
    /**
448
    /**
449
     * Constructor of FastLeaderElection. It takes two parameters, one
449
     * Constructor of FastLeaderElection. It takes two parameters, one
450
     * is the QuorumPeer object that instantiated this object, and the other
450
     * is the QuorumPeer object that instantiated this object, and the other
451
     * is the connection manager. Such an object should be created only once
451
     * is the connection manager. Such an object should be created only once
452
     * by each peer during an instance of the ZooKeeper service.
452
     * by each peer during an instance of the ZooKeeper service.
453
     *
453
     *
454
     * @param self  QuorumPeer that created this object
454
     * @param self  QuorumPeer that created this object
455
     * @param manager   Connection manager
455
     * @param manager   Connection manager
456
     */
456
     */
457
    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
457
    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
458
        this.stop = false;
458
        this.stop = false;
459
        this.manager = manager;
459
        this.manager = manager;
460
        starter(self, manager);
460
        starter(self, manager);
461
    }
461
    }
462

    
   
462

   
463
    /**
463
    /**
464
     * This method is invoked by the constructor. Because it is a
464
     * This method is invoked by the constructor. Because it is a
465
     * part of the starting procedure of the object that must be on
465
     * part of the starting procedure of the object that must be on
466
     * any constructor of this class, it is probably best to keep as
466
     * any constructor of this class, it is probably best to keep as
467
     * a separate method. As we have a single constructor currently,
467
     * a separate method. As we have a single constructor currently,
468
     * it is not strictly necessary to have it separate.
468
     * it is not strictly necessary to have it separate.
469
     *
469
     *
470
     * @param self      QuorumPeer that created this object
470
     * @param self      QuorumPeer that created this object
471
     * @param manager   Connection manager
471
     * @param manager   Connection manager
472
     */
472
     */
473
    private void starter(QuorumPeer self, QuorumCnxManager manager) {
473
    private void starter(QuorumPeer self, QuorumCnxManager manager) {
474
        this.self = self;
474
        this.self = self;
475
        proposedLeader = -1;
475
        proposedLeader = -1;
476
        proposedZxid = -1;
476
        proposedZxid = -1;
477

    
   
477

   
478
        sendqueue = new LinkedBlockingQueue<ToSend>();
478
        sendqueue = new LinkedBlockingQueue<ToSend>();
479
        recvqueue = new LinkedBlockingQueue<Notification>();
479
        recvqueue = new LinkedBlockingQueue<Notification>();
480
        this.messenger = new Messenger(manager);
480
        this.messenger = new Messenger(manager);
481
    }
481
    }
482

    
   
482

   
483
    private void leaveInstance(Vote v) {
483
    private void leaveInstance(Vote v) {
484
        if(LOG.isDebugEnabled()){
484
        if(LOG.isDebugEnabled()){
485
            LOG.debug("About to leave FLE instance: leader="
485
            LOG.debug("About to leave FLE instance: leader="
486
                + v.getId() + ", zxid=0x" +
486
                + v.getId() + ", zxid=0x" +
487
                Long.toHexString(v.getZxid()) + ", my id=" + self.getId()
487
                Long.toHexString(v.getZxid()) + ", my id=" + self.getId()
488
                + ", my state=" + self.getPeerState());
488
                + ", my state=" + self.getPeerState());
489
        }
489
        }
490
        recvqueue.clear();
490
        recvqueue.clear();
491
    }
491
    }
492

    
   
492

   
493
    public QuorumCnxManager getCnxManager(){
493
    public QuorumCnxManager getCnxManager(){
494
        return manager;
494
        return manager;
495
    }
495
    }
496

    
   
496

   
497
    volatile boolean stop;
497
    volatile boolean stop;
498
    public void shutdown(){
498
    public void shutdown(){
499
        stop = true;
499
        stop = true;
500
        LOG.debug("Shutting down connection manager");
500
        LOG.debug("Shutting down connection manager");
501
        manager.halt();
501
        manager.halt();
502
        LOG.debug("Shutting down messenger");
502
        LOG.debug("Shutting down messenger");
503
        messenger.halt();
503
        messenger.halt();
504
        LOG.debug("FLE is down");
504
        LOG.debug("FLE is down");
505
    }
505
    }
506

    
   
506

   
507

    
   
507

   
508
    /**
508
    /**
509
     * Send notifications to all peers upon a change in our vote
509
     * Send notifications to all peers upon a change in our vote
510
     */
510
     */
511
    private void sendNotifications() {
511
    private void sendNotifications() {
512
        for (QuorumServer server : self.getVotingView().values()) {
512
        for (QuorumServer server : self.getVotingView().values()) {
513
            long sid = server.id;
513
            long sid = server.id;
514

    
   
514

   
515
            ToSend notmsg = new ToSend(ToSend.mType.notification,
515
            ToSend notmsg = new ToSend(ToSend.mType.notification,
516
                    proposedLeader,
516
                    proposedLeader,
517
                    proposedZxid,
517
                    proposedZxid,
518
                    logicalclock,
518
                    logicalclock,
519
                    QuorumPeer.ServerState.LOOKING,
519
                    QuorumPeer.ServerState.LOOKING,
520
                    sid,
520
                    sid,
521
                    proposedEpoch);
521
                    proposedEpoch);
522
            if(LOG.isDebugEnabled()){
522
            if(LOG.isDebugEnabled()){
523
                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
523
                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
524
                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock)  +
524
                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock)  +
525
                      " (n.round), " + sid + " (recipient), " + self.getId() +
525
                      " (n.round), " + sid + " (recipient), " + self.getId() +
526
                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
526
                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
527
            }
527
            }
528
            sendqueue.offer(notmsg);
528
            sendqueue.offer(notmsg);
529
        }
529
        }
530
    }
530
    }
531

    
   
531

   
532

    
   
532

   
533
    private void printNotification(Notification n){
533
    private void printNotification(Notification n){
534
        LOG.info("Notification: " + n.leader + " (n.leader), 0x"
534
        LOG.info("Notification: " + n.leader + " (n.leader), 0x"
535
                + Long.toHexString(n.zxid) + " (n.zxid), 0x"
535
                + Long.toHexString(n.zxid) + " (n.zxid), 0x"
536
                + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state
536
                + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state
537
                + " (n.state), " + n.sid + " (n.sid), 0x"
537
                + " (n.state), " + n.sid + " (n.sid), 0x"
538
                + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), "
538
                + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), "
539
                + self.getPeerState() + " (my state)");
539
                + self.getPeerState() + " (my state)");
540
    }
540
    }
541

    
   
541

   
542
    /**
542
    /**
543
     * Check if a pair (server id, zxid) succeeds our
543
     * Check if a pair (server id, zxid) succeeds our
544
     * current vote.
544
     * current vote.
545
     *
545
     *
546
     * @param id    Server identifier
546
     * @param id    Server identifier
547
     * @param zxid  Last zxid observed by the issuer of this vote
547
     * @param zxid  Last zxid observed by the issuer of this vote
548
     */
548
     */
549
    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
549
    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
550
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
550
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
551
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
551
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
552
        if(self.getQuorumVerifier().getWeight(newId) == 0){
552
        if(self.getQuorumVerifier().getWeight(newId) == 0){
553
            return false;
553
            return false;
554
        }
554
        }
555
        
555
        
556
        /*
556
        /*
557
         * We return true if one of the following three cases hold:
557
         * We return true if one of the following three cases hold:
558
         * 1- New epoch is higher
558
         * 1- New epoch is higher
559
         * 2- New epoch is the same as current epoch, but new zxid is higher
559
         * 2- New epoch is the same as current epoch, but new zxid is higher
560
         * 3- New epoch is the same as current epoch, new zxid is the same
560
         * 3- New epoch is the same as current epoch, new zxid is the same
561
         *  as current zxid, but server id is higher.
561
         *  as current zxid, but server id is higher.
562
         */
562
         */
563
        
563
        
564
        return ((newEpoch > curEpoch) || 
564
        return ((newEpoch > curEpoch) || 
565
                ((newEpoch == curEpoch) &&
565
                ((newEpoch == curEpoch) &&
566
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
566
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
567
    }
567
    }
568

    
   
568

   
569
    /**
569
    /**
570
     * Termination predicate. Given a set of votes, determines if
570
     * Termination predicate. Given a set of votes, determines if
571
     * have sufficient to declare the end of the election round.
571
     * have sufficient to declare the end of the election round.
572
     *
572
     *
573
     *  @param votes    Set of votes
573
     *  @param votes    Set of votes
574
     *  @param l        Identifier of the vote received last
574
     *  @param l        Identifier of the vote received last
575
     *  @param zxid     zxid of the the vote received last
575
     *  @param zxid     zxid of the the vote received last
576
     */
576
     */
577
    private boolean termPredicate(
577
    private boolean termPredicate(
578
            HashMap<Long, Vote> votes,
578
            HashMap<Long, Vote> votes,
579
            Vote vote) {
579
            Vote vote) {
580

    
   
580

   
581
        HashSet<Long> set = new HashSet<Long>();
581
        HashSet<Long> set = new HashSet<Long>();
582

    
   
582

   
583
        /*
583
        /*
584
         * First make the views consistent. Sometimes peers will have
584
         * First make the views consistent. Sometimes peers will have
585
         * different zxids for a server depending on timing.
585
         * different zxids for a server depending on timing.
586
         */
586
         */
587
        for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
587
        for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
588
            if (vote.equals(entry.getValue())){
588
            if (self.getQuorumVerifier().getVotingMembers().containsKey(entry.getKey())

    
   
589
                    && vote.equals(entry.getValue())){
589
                set.add(entry.getKey());
590
                set.add(entry.getKey());
590
            }
591
            }
591
        }
592
        }
592

    
   
593

   
593
        return self.getQuorumVerifier().containsQuorum(set);
594
        return self.getQuorumVerifier().containsQuorum(set);
594
    }
595
    }
595

    
   

   
596
    /**
596
    /**
597
     * In the case there is a leader elected, and a quorum supporting
597
     * In the case there is a leader elected, and a quorum supporting
598
     * this leader, we have to check if the leader has voted and acked
598
     * this leader, we have to check if the leader has voted and acked
599
     * that it is leading. We need this check to avoid that peers keep
599
     * that it is leading. We need this check to avoid that peers keep
600
     * electing over and over a peer that has crashed and it is no
600
     * electing over and over a peer that has crashed and it is no
601
     * longer leading.
601
     * longer leading.
602
     *
602
     *
603
     * @param votes set of votes
603
     * @param votes set of votes
604
     * @param   leader  leader id
604
     * @param   leader  leader id
605
     * @param   electionEpoch   epoch id
605
     * @param   electionEpoch   epoch id
606
     */
606
     */
607
    private boolean checkLeader(
607
    private boolean checkLeader(
608
            HashMap<Long, Vote> votes,
608
            HashMap<Long, Vote> votes,
609
            long leader,
609
            long leader,
610
            long electionEpoch){
610
            long electionEpoch){
611

    
   
611

   
612
        boolean predicate = true;
612
        boolean predicate = true;
613

    
   
613

   
614
        /*
614
        /*
615
         * If everyone else thinks I'm the leader, I must be the leader.
615
         * If everyone else thinks I'm the leader, I must be the leader.
616
         * The other two checks are just for the case in which I'm not the
616
         * The other two checks are just for the case in which I'm not the
617
         * leader. If I'm not the leader and I haven't received a message
617
         * leader. If I'm not the leader and I haven't received a message
618
         * from leader stating that it is leading, then predicate is false.
618
         * from leader stating that it is leading, then predicate is false.
619
         */
619
         */
620

    
   
620

   
621
        if(leader != self.getId()){
621
        if(leader != self.getId()){
622
            if(votes.get(leader) == null) predicate = false;
622
            if(votes.get(leader) == null) predicate = false;
623
            else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;
623
            else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;
624
        }
624
        }
625

    
   
625

   
626
        return predicate;
626
        return predicate;
627
    }
627
    }
628

    
   
628

   
629
    synchronized void updateProposal(long leader, long zxid, long epoch){
629
    synchronized void updateProposal(long leader, long zxid, long epoch){
630
        if(LOG.isDebugEnabled()){
630
        if(LOG.isDebugEnabled()){
631
            LOG.debug("Updating proposal: " + leader + " (newleader), 0x"
631
            LOG.debug("Updating proposal: " + leader + " (newleader), 0x"
632
                    + Long.toHexString(zxid) + " (newzxid), " + proposedLeader
632
                    + Long.toHexString(zxid) + " (newzxid), " + proposedLeader
633
                    + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");
633
                    + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");
634
        }
634
        }
635
        proposedLeader = leader;
635
        proposedLeader = leader;
636
        proposedZxid = zxid;
636
        proposedZxid = zxid;
637
        proposedEpoch = epoch;
637
        proposedEpoch = epoch;
638
    }
638
    }
639

    
   
639

   
640
    synchronized Vote getVote(){
640
    synchronized Vote getVote(){
641
        return new Vote(proposedLeader, proposedZxid, proposedEpoch);
641
        return new Vote(proposedLeader, proposedZxid, proposedEpoch);
642
    }
642
    }
643

    
   
643

   
644
    /**
644
    /**
645
     * A learning state can be either FOLLOWING or OBSERVING.
645
     * A learning state can be either FOLLOWING or OBSERVING.
646
     * This method simply decides which one depending on the
646
     * This method simply decides which one depending on the
647
     * role of the server.
647
     * role of the server.
648
     *
648
     *
649
     * @return ServerState
649
     * @return ServerState
650
     */
650
     */
651
    private ServerState learningState(){
651
    private ServerState learningState(){
652
        if(self.getLearnerType() == LearnerType.PARTICIPANT){
652
        if(self.getLearnerType() == LearnerType.PARTICIPANT){
653
            LOG.debug("I'm a participant: " + self.getId());
653
            LOG.debug("I'm a participant: " + self.getId());
654
            return ServerState.FOLLOWING;
654
            return ServerState.FOLLOWING;
655
        }
655
        }
656
        else{
656
        else{
657
            LOG.debug("I'm an observer: " + self.getId());
657
            LOG.debug("I'm an observer: " + self.getId());
658
            return ServerState.OBSERVING;
658
            return ServerState.OBSERVING;
659
        }
659
        }
660
    }
660
    }
661

    
   
661

   
662
    /**
662
    /**
663
     * Returns the initial vote value of server identifier.
663
     * Returns the initial vote value of server identifier.
664
     *
664
     *
665
     * @return long
665
     * @return long
666
     */
666
     */
667
    private long getInitId(){
667
    private long getInitId(){
668
        if(self.getLearnerType() == LearnerType.PARTICIPANT)
668
        if(self.getQuorumVerifier().getVotingMembers().containsKey(self.getId()))       
669
            return self.getId();
669
            return self.getId();
670
        else return Long.MIN_VALUE;
670
        else return Long.MIN_VALUE;
671
    }
671
    }
672

    
   
672

   
673
    /**
673
    /**
674
     * Returns initial last logged zxid.
674
     * Returns initial last logged zxid.
675
     *
675
     *
676
     * @return long
676
     * @return long
677
     */
677
     */
678
    private long getInitLastLoggedZxid(){
678
    private long getInitLastLoggedZxid(){
679
        if(self.getLearnerType() == LearnerType.PARTICIPANT)
679
        if(self.getLearnerType() == LearnerType.PARTICIPANT)
680
            return self.getLastLoggedZxid();
680
            return self.getLastLoggedZxid();
681
        else return Long.MIN_VALUE;
681
        else return Long.MIN_VALUE;
682
    }
682
    }
683

    
   
683

   
684
    /**
684
    /**
685
     * Returns the initial vote value of the peer epoch.
685
     * Returns the initial vote value of the peer epoch.
686
     *
686
     *
687
     * @return long
687
     * @return long
688
     */
688
     */
689
    private long getPeerEpoch(){
689
    private long getPeerEpoch(){
690
        if(self.getLearnerType() == LearnerType.PARTICIPANT)
690
        if(self.getLearnerType() == LearnerType.PARTICIPANT)
691
        	try {
691
        	try {
692
        		return self.getCurrentEpoch();
692
        		return self.getCurrentEpoch();
693
        	} catch(IOException e) {
693
        	} catch(IOException e) {
694
        		RuntimeException re = new RuntimeException(e.getMessage());
694
        		RuntimeException re = new RuntimeException(e.getMessage());
695
        		re.setStackTrace(e.getStackTrace());
695
        		re.setStackTrace(e.getStackTrace());
696
        		throw re;
696
        		throw re;
697
        	}
697
        	}
698
        else return Long.MIN_VALUE;
698
        else return Long.MIN_VALUE;
699
    }
699
    }
700
    
700
    
701
    /**
701
    /**
702
     * Starts a new round of leader election. Whenever our QuorumPeer
702
     * Starts a new round of leader election. Whenever our QuorumPeer
703
     * changes its state to LOOKING, this method is invoked, and it
703
     * changes its state to LOOKING, this method is invoked, and it
704
     * sends notifications to all other peers.
704
     * sends notifications to all other peers.
705
     */
705
     */
706
    public Vote lookForLeader() throws InterruptedException {
706
    public Vote lookForLeader() throws InterruptedException {
707
        try {
707
        try {
708
            self.jmxLeaderElectionBean = new LeaderElectionBean();
708
            self.jmxLeaderElectionBean = new LeaderElectionBean();
709
            MBeanRegistry.getInstance().register(
709
            MBeanRegistry.getInstance().register(
710
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
710
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
711
        } catch (Exception e) {
711
        } catch (Exception e) {
712
            LOG.warn("Failed to register with JMX", e);
712
            LOG.warn("Failed to register with JMX", e);
713
            self.jmxLeaderElectionBean = null;
713
            self.jmxLeaderElectionBean = null;
714
        }
714
        }
715
        if (self.start_fle == 0) {
715
        if (self.start_fle == 0) {
716
           self.start_fle = System.currentTimeMillis();
716
           self.start_fle = System.currentTimeMillis();
717
        }
717
        }
718
        try {
718
        try {
719
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
719
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
720

    
   
720

   
721
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
721
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
722

    
   
722

   
723
            int notTimeout = finalizeWait;
723
            int notTimeout = finalizeWait;
724

    
   
724

   
725
            synchronized(this){
725
            synchronized(this){
726
                logicalclock++;
726
                logicalclock++;
727
                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
727
                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
728
            }
728
            }
729

    
   
729

   
730
            LOG.info("New election. My id =  " + self.getId() +
730
            LOG.info("New election. My id =  " + self.getId() +
731
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
731
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
732
            sendNotifications();
732
            sendNotifications();
733

    
   
733

   
734
            /*
734
            /*
735
             * Loop in which we exchange notifications until we find a leader
735
             * Loop in which we exchange notifications until we find a leader
736
             */
736
             */
737

    
   
737

   
738
            while ((self.getPeerState() == ServerState.LOOKING) &&
738
            while ((self.getPeerState() == ServerState.LOOKING) &&
739
                    (!stop)){
739
                    (!stop)){
740
                /*
740
                /*
741
                 * Remove next notification from queue, times out after 2 times
741
                 * Remove next notification from queue, times out after 2 times
742
                 * the termination time
742
                 * the termination time
743
                 */
743
                 */
744
                Notification n = recvqueue.poll(notTimeout,
744
                Notification n = recvqueue.poll(notTimeout,
745
                        TimeUnit.MILLISECONDS);
745
                        TimeUnit.MILLISECONDS);
746

    
   
746

   
747
                /*
747
                /*
748
                 * Sends more notifications if haven't received enough.
748
                 * Sends more notifications if haven't received enough.
749
                 * Otherwise processes new notification.
749
                 * Otherwise processes new notification.
750
                 */
750
                 */
751
                if(n == null){
751
                if(n == null){
752
                    if(manager.haveDelivered()){
752
                    if(manager.haveDelivered()){
753
                        sendNotifications();
753
                        sendNotifications();
754
                    } else {
754
                    } else {
755
                        manager.connectAll();
755
                        manager.connectAll();
756
                    }
756
                    }
757

    
   
757

   
758
                    /*
758
                    /*
759
                     * Exponential backoff
759
                     * Exponential backoff
760
                     */
760
                     */
761
                    int tmpTimeOut = notTimeout*2;
761
                    int tmpTimeOut = notTimeout*2;
762
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
762
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
763
                            tmpTimeOut : maxNotificationInterval);
763
                            tmpTimeOut : maxNotificationInterval);
764
                    LOG.info("Notification time out: " + notTimeout);
764
                    LOG.info("Notification time out: " + notTimeout);
765
                }
765
                }
766
                else if(self.getVotingView().containsKey(n.sid)) {
766
                else if(self.getVotingView().containsKey(n.sid)) {
767
                    /*
767
                    /*
768
                     * Only proceed if the vote comes from a replica in the
768
                     * Only proceed if the vote comes from a replica in the
769
                     * voting view.
769
                     * voting view.
770
                     */
770
                     */
771
                    switch (n.state) {
771
                    switch (n.state) {
772
                    case LOOKING:
772
                    case LOOKING:
773
                        // If notification > current, replace and send messages out
773
                        // If notification > current, replace and send messages out
774
                        if (n.electionEpoch > logicalclock) {
774
                        if (n.electionEpoch > logicalclock) {
775
                            logicalclock = n.electionEpoch;
775
                            logicalclock = n.electionEpoch;
776
                            recvset.clear();
776
                            recvset.clear();
777
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
777
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
778
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
778
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
779
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
779
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
780
                            } else {
780
                            } else {
781
                                updateProposal(getInitId(),
781
                                updateProposal(getInitId(),
782
                                        getInitLastLoggedZxid(),
782
                                        getInitLastLoggedZxid(),
783
                                        getPeerEpoch());
783
                                        getPeerEpoch());
784
                            }
784
                            }
785
                            sendNotifications();
785
                            sendNotifications();
786
                        } else if (n.electionEpoch < logicalclock) {
786
                        } else if (n.electionEpoch < logicalclock) {
787
                            if(LOG.isDebugEnabled()){
787
                            if(LOG.isDebugEnabled()){
788
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
788
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
789
                                        + Long.toHexString(n.electionEpoch)
789
                                        + Long.toHexString(n.electionEpoch)
790
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock));
790
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock));
791
                            }
791
                            }
792
                            break;
792
                            break;
793
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
793
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
794
                                proposedLeader, proposedZxid, proposedEpoch)) {
794
                                proposedLeader, proposedZxid, proposedEpoch)) {
795
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
795
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
796
                            sendNotifications();
796
                            sendNotifications();
797
                        }
797
                        }
798

    
   
798

   
799
                        if(LOG.isDebugEnabled()){
799
                        if(LOG.isDebugEnabled()){
800
                            LOG.debug("Adding vote: from=" + n.sid +
800
                            LOG.debug("Adding vote: from=" + n.sid +
801
                                    ", proposed leader=" + n.leader +
801
                                    ", proposed leader=" + n.leader +
802
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
802
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
803
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
803
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
804
                        }
804
                        }
805

    
   
805

   
806
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
806
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
807

    
   
807

   
808
                        if (termPredicate(recvset,
808
                        if (termPredicate(recvset,
809
                                new Vote(proposedLeader, proposedZxid,
809
                                new Vote(proposedLeader, proposedZxid,
810
                                        logicalclock, proposedEpoch))) {
810
                                        logicalclock, proposedEpoch))) {
811

    
   
811

   
812
                            // Verify if there is any change in the proposed leader
812
                            // Verify if there is any change in the proposed leader
813
                            while((n = recvqueue.poll(finalizeWait,
813
                            while((n = recvqueue.poll(finalizeWait,
814
                                    TimeUnit.MILLISECONDS)) != null){
814
                                    TimeUnit.MILLISECONDS)) != null){
815
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
815
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
816
                                        proposedLeader, proposedZxid, proposedEpoch)){
816
                                        proposedLeader, proposedZxid, proposedEpoch)){
817
                                    recvqueue.put(n);
817
                                    recvqueue.put(n);
818
                                    break;
818
                                    break;
819
                                }
819
                                }
820
                            }
820
                            }
821

    
   
821

   
822
                            /*
822
                            /*
823
                             * This predicate is true once we don't read any new
823
                             * This predicate is true once we don't read any new
824
                             * relevant message from the reception queue
824
                             * relevant message from the reception queue
825
                             */
825
                             */
826
                            if (n == null) {
826
                            if (n == null) {
827
                                self.setPeerState((proposedLeader == self.getId()) ?
827
                                self.setPeerState((proposedLeader == self.getId()) ?
828
                                        ServerState.LEADING: learningState());
828
                                        ServerState.LEADING: learningState());
829

    
   
829

   
830
                                Vote endVote = new Vote(proposedLeader,
830
                                Vote endVote = new Vote(proposedLeader,
831
                                        proposedZxid, proposedEpoch);
831
                                        proposedZxid, proposedEpoch);
832
                                leaveInstance(endVote);
832
                                leaveInstance(endVote);
833
                                return endVote;
833
                                return endVote;
834
                            }
834
                            }
835
                        }
835
                        }
836
                        break;
836
                        break;
837
                    case OBSERVING:
837
                    case OBSERVING:
838
                        LOG.debug("Notification from observer: " + n.sid);
838
                        LOG.debug("Notification from observer: " + n.sid);
839
                        break;
839
                        break;
840
                    case FOLLOWING:
840
                    case FOLLOWING:
841
                    case LEADING:
841
                    case LEADING:
842
                        /*
842
                        /*
843
                         * Consider all notifications from the same epoch
843
                         * Consider all notifications from the same epoch
844
                         * together.
844
                         * together.
845
                         */
845
                         */
846
                        if(n.electionEpoch == logicalclock){
846
                        if(n.electionEpoch == logicalclock){
847
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
847
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
848
                            if(termPredicate(recvset, new Vote(n.leader,
848
                            if(termPredicate(recvset, new Vote(n.leader,
849
                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
849
                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
850
                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
850
                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
851
                                self.setPeerState((n.leader == self.getId()) ?
851
                                self.setPeerState((n.leader == self.getId()) ?
852
                                        ServerState.LEADING: learningState());
852
                                        ServerState.LEADING: learningState());
853

    
   
853

   
854
                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
854
                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
855
                                leaveInstance(endVote);
855
                                leaveInstance(endVote);
856
                                return endVote;
856
                                return endVote;
857
                            }
857
                            }
858
                        }
858
                        }
859

    
   
859

   
860
                        /**
860
                        /**
861
                         * Before joining an established ensemble, verify that
861
                         * Before joining an established ensemble, verify that
862
                         * a majority are following the same leader.
862
                         * a majority are following the same leader.
863
                         */
863
                         */
864
                        outofelection.put(n.sid, new Vote(n.leader, n.zxid,
864
                        outofelection.put(n.sid, new Vote(n.leader, n.zxid,
865
                                n.electionEpoch, n.peerEpoch, n.state));
865
                                n.electionEpoch, n.peerEpoch, n.state));
866
                        if (termPredicate(outofelection, new Vote(n.leader,
866
                        if (termPredicate(outofelection, new Vote(n.leader,
867
                                n.zxid, n.electionEpoch, n.peerEpoch, n.state))
867
                                n.zxid, n.electionEpoch, n.peerEpoch, n.state))
868
                                && checkLeader(outofelection, n.leader, n.electionEpoch)) {
868
                                && checkLeader(outofelection, n.leader, n.electionEpoch)) {
869
                            synchronized(this){
869
                            synchronized(this){
870
                                logicalclock = n.electionEpoch;
870
                                logicalclock = n.electionEpoch;
871
                                self.setPeerState((n.leader == self.getId()) ?
871
                                self.setPeerState((n.leader == self.getId()) ?
872
                                        ServerState.LEADING: learningState());
872
                                        ServerState.LEADING: learningState());
873
                            }
873
                            }
874
                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
874
                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
875
                            leaveInstance(endVote);
875
                            leaveInstance(endVote);
876
                            return endVote;
876
                            return endVote;
877
                        }
877
                        }
878
                        break;
878
                        break;
879
                    default:
879
                    default:
880
                        LOG.warn("Notification state unrecoginized: " + n.state
880
                        LOG.warn("Notification state unrecoginized: " + n.state
881
                              + " (n.state), " + n.sid + " (n.sid)");
881
                              + " (n.state), " + n.sid + " (n.sid)");
882
                        break;
882
                        break;
883
                    }
883
                    }
884
                } else {
884
                } else {
885
                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
885
                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
886
                }
886
                }
887
            }
887
            }
888
            return null;
888
            return null;
889
        } finally {
889
        } finally {
890
            try {
890
            try {
891
                if(self.jmxLeaderElectionBean != null){
891
                if(self.jmxLeaderElectionBean != null){
892
                    MBeanRegistry.getInstance().unregister(
892
                    MBeanRegistry.getInstance().unregister(
893
                            self.jmxLeaderElectionBean);
893
                            self.jmxLeaderElectionBean);
894
                }
894
                }
895
            } catch (Exception e) {
895
            } catch (Exception e) {
896
                LOG.warn("Failed to unregister with JMX", e);
896
                LOG.warn("Failed to unregister with JMX", e);
897
            }
897
            }
898
            self.jmxLeaderElectionBean = null;
898
            self.jmxLeaderElectionBean = null;
899
        }
899
        }
900
    }
900
    }
901
}
901
}
/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
Revision 1327937 New Change
 
/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
Revision 1327937 New Change
 
/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Revision 1327937 New Change
 
/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Revision 1327937 New Change
 
/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
Revision 1327937 New Change
 
/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
Revision 1327937 New Change
 
/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
Revision 1327937 New Change
 
/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
Revision 1327937 New Change
 
/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
Revision 1327937 New Change
 
/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/server/util/DynamicConfigBCTest.java
New File
 
/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/test/FLETest.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/test/ObserverTest.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/test/QuorumBase.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
Revision 1327937 New Change
 
/src/java/test/org/apache/zookeeper/test/StandaloneTest.java
Revision 1327937 New Change
 
  1. /src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java: Loading...
  2. /src/java/main/org/apache/zookeeper/server/quorum/Leader.java: Loading...
  3. /src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java: Loading...
  4. /src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java: Loading...
  5. /src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java: Loading...
  6. /src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java: Loading...
  7. /src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java: Loading...
  8. /src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java: Loading...
  9. /src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java: Loading...
  10. /src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java: Loading...
  11. /src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java: Loading...
  12. /src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java: Loading...
  13. /src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java: Loading...
  14. /src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java: Loading...
  15. /src/java/test/org/apache/zookeeper/server/util/DynamicConfigBCTest.java: Loading...
  16. /src/java/test/org/apache/zookeeper/test/CnxManagerTest.java: Loading...
  17. /src/java/test/org/apache/zookeeper/test/FLETest.java: Loading...
  18. /src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java: Loading...
  19. /src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java: Loading...
  20. /src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java: Loading...
  21. /src/java/test/org/apache/zookeeper/test/ObserverTest.java: Loading...
  22. /src/java/test/org/apache/zookeeper/test/QuorumBase.java: Loading...
  23. /src/java/test/org/apache/zookeeper/test/QuorumUtil.java: Loading...
  24. /src/java/test/org/apache/zookeeper/test/StandaloneTest.java: Loading...