Review Board 1.7.22


patch for ZOOKEEPER-1560: Zookeeper client hangs on creation of large nodes

Review Request #7730 - Created Oct. 25, 2012 and submitted

Skye Wanderman-Milne
3.4.4
ZOOKEEPER-1560
Reviewers
zookeeper
phunt, tedyu
zookeeper-git
see ZOOKEEPER-1560 JIRA
unit tests (including testLargeNodeData from ZOOKEEPER-1560 JIRA)

Diff revision 3 (Latest)

1 2 3
1 2 3

  1. src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java: Loading...
src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
Revision 70d8538 New Change
[20] 23 lines
[+20]
24
import java.net.SocketAddress;
24
import java.net.SocketAddress;
25
import java.nio.ByteBuffer;
25
import java.nio.ByteBuffer;
26
import java.nio.channels.SelectionKey;
26
import java.nio.channels.SelectionKey;
27
import java.nio.channels.Selector;
27
import java.nio.channels.Selector;
28
import java.nio.channels.SocketChannel;
28
import java.nio.channels.SocketChannel;
29
import java.util.Iterator;

   
30
import java.util.LinkedList;
29
import java.util.LinkedList;
31
import java.util.List;
30
import java.util.List;

    
   
31
import java.util.ListIterator;
32
import java.util.Set;
32
import java.util.Set;
33

    
   
33

   
34
import org.slf4j.Logger;
Moved to 37

   
35
import org.slf4j.LoggerFactory;
Moved to 38

   
36
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
34
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
37
import org.apache.zookeeper.ClientCnxn.Packet;
35
import org.apache.zookeeper.ClientCnxn.Packet;
38
import org.apache.zookeeper.ZooDefs.OpCode;
36
import org.apache.zookeeper.ZooDefs.OpCode;
Moved from 34

    
   
37
import org.slf4j.Logger;
Moved from 35

    
   
38
import org.slf4j.LoggerFactory;
39

    
   
39

   
40
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
40
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
41
    private static final Logger LOG = LoggerFactory
41
    private static final Logger LOG = LoggerFactory
42
            .getLogger(ClientCnxnSocketNIO.class);
42
            .getLogger(ClientCnxnSocketNIO.class);
43

    
   
43

   
[+20] [20] 58 lines
[+20] public class ClientCnxnSocketNIO extends ClientCnxnSocket {
102
                    updateLastHeard();
102
                    updateLastHeard();
103
                }
103
                }
104
            }
104
            }
105
        }
105
        }
106
        if (sockKey.isWritable()) {
106
        if (sockKey.isWritable()) {
107
            LinkedList<Packet> pending = new LinkedList<Packet>();

   
108
            Packet p = null;

   
109
            synchronized(outgoingQueue) {
107
            synchronized(outgoingQueue) {
110
                p = findSendablePacket(outgoingQueue,
108
                Packet p = findSendablePacket(outgoingQueue,
111
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());
109
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());
112

    
   
110

   
113
                if (p != null) {
111
                if (p != null) {
114
                    outgoingQueue.removeFirstOccurrence(p);

   
115
                    updateLastSend();
112
                    updateLastSend();

    
   
113
                    // If we already started writing p, p.bb will already exist

    
   
114
                    if (p.bb == null) {
116
                    if ((p.requestHeader != null) &&
115
                        if ((p.requestHeader != null) &&
117
                            (p.requestHeader.getType() != OpCode.ping) &&
116
                                (p.requestHeader.getType() != OpCode.ping) &&
118
                            (p.requestHeader.getType() != OpCode.auth)) {
117
                                (p.requestHeader.getType() != OpCode.auth)) {
119
                        p.requestHeader.setXid(cnxn.getXid());
118
                            p.requestHeader.setXid(cnxn.getXid());
120
                    }
119
                        }
121
                    p.createBB();
120
                        p.createBB();
122
                    ByteBuffer pbb = p.bb;
121
                    }
123
                    sock.write(pbb);
122
                    sock.write(p.bb);
124
                    if (!pbb.hasRemaining()) {
123
                    if (!p.bb.hasRemaining()) {
125
                        sentCount++;
124
                        sentCount++;

    
   
125
                        outgoingQueue.removeFirstOccurrence(p);
126
                        if (p.requestHeader != null
126
                        if (p.requestHeader != null
127
                                && p.requestHeader.getType() != OpCode.ping
127
                                && p.requestHeader.getType() != OpCode.ping
128
                                && p.requestHeader.getType() != OpCode.auth) {
128
                                && p.requestHeader.getType() != OpCode.auth) {
129
                            pending.add(p);
129
                            synchronized (pendingQueue) {

    
   
130
                                pendingQueue.add(p);
130
                        }
131
                            }
131
                    }
132
                        }
132
                } else {
133
                    }
133
                    // No suitable packet to send: turn off write interest flag.
134
                }

    
   
135
                if (outgoingQueue.isEmpty()) {

    
   
136
                    // No more packets to send: turn off write interest flag.
134
                    // Will be turned on later by a later call to enableWrite(),
137
                    // Will be turned on later by a later call to enableWrite(),
135
                    // from within ZooKeeperSaslClient (if client is configured
138
                    // from within ZooKeeperSaslClient (if client is configured
136
                    // to attempt SASL authentication), or in either doIO() or
139
                    // to attempt SASL authentication), or in either doIO() or
137
                    // in doTransport() if not.
140
                    // in doTransport() if not.
138
                    disableWrite();
141
                    disableWrite();

    
   
142
                } else {

    
   
143
                    // Just in case

    
   
144
                    enableWrite();
139
                }
145
                }
140
            }
146
            }
141
            synchronized(pendingQueue) {

   
142
                pendingQueue.addAll(pending);

   
143
            }

   
144

    
   

   
145
        }
147
        }
146
    }
148
    }
147

    
   
149

   
148
    private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
150
    private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
149
                                      boolean clientTunneledAuthenticationInProgress) {
151
                                      boolean clientTunneledAuthenticationInProgress) {
150
        synchronized (outgoingQueue) {
152
        synchronized (outgoingQueue) {
151
            if (!outgoingQueue.isEmpty()) {
153
            if (outgoingQueue.isEmpty()) {
152
                if (clientTunneledAuthenticationInProgress) {
154
                return null;
153
                    Packet p = null;
155
            }

    
   
156
            if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish

    
   
157
                || !clientTunneledAuthenticationInProgress) {

    
   
158
                return outgoingQueue.getFirst();

    
   
159
            }

    
   
160

   
154
                    // Since client's authentication with server is in progress,
161
            // Since client's authentication with server is in progress,
155
                    // send only the null-header packet queued by primeConnection().
162
            // send only the null-header packet queued by primeConnection().
156
                    // This packet must be sent so that the SASL authentication process
163
            // This packet must be sent so that the SASL authentication process
157
                    // can proceed, but all other packets should wait until
164
            // can proceed, but all other packets should wait until
158
                    // SASL authentication completes.
165
            // SASL authentication completes.
159
                    Iterator<Packet> iter = outgoingQueue.listIterator();
166
            ListIterator<Packet> iter = outgoingQueue.listIterator();
160
                    while(iter.hasNext()) {
167
            while (iter.hasNext()) {
161
                        p = iter.next();
168
                Packet p = iter.next();
162
                        if (p.requestHeader == null) {
169
                if (p.requestHeader == null) {
163
                            // We've found the priming-packet.
170
                    // We've found the priming-packet. Move it to the beginning of the queue.

    
   
171
                    iter.remove();

    
   
172
                    outgoingQueue.add(0, p);
164
                            return p;
173
                    return p;
165
                        } else {
174
                } else {
166
                            // Non-priming packet: defer it until later, leaving it in the queue
175
                    // Non-priming packet: defer it until later, leaving it in the queue
167
                            // until authentication completes.
176
                    // until authentication completes.
168
                            if (LOG.isDebugEnabled()) {
177
                    if (LOG.isDebugEnabled()) {
169
                                LOG.debug("deferring non-priming packet: " + p +
178
                        LOG.debug("deferring non-priming packet: " + p +
170
                                        "until SASL authentication completes.");
179
                                "until SASL authentication completes.");
171
                            }
180
                    }
172
                        }
181
                }
173
                    }
182
            }
174
                    // no sendable packet found.
183
            // no sendable packet found.
175
                    return null;
184
            return null;
176
                } else {

   
177
                    // Tunnelled authentication is not in progress: just

   
178
                    // send the first packet in the queue.

   
179
                    return outgoingQueue.getFirst();

   
180
                }
185
        }
181
            }
186
    }
182
        }

   
183
        return null;

   
184
    }

   
185

    
   
187

   
186
    @Override
188
    @Override
187
    void cleanup() {
189
    void cleanup() {
188
        if (sockKey != null) {
190
        if (sockKey != null) {
189
            SocketChannel sock = (SocketChannel) sockKey.channel();
191
            SocketChannel sock = (SocketChannel) sockKey.channel();
[+20] [20] 225 lines
  1. src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java: Loading...