Review Board 1.7.22


patch for ZOOKEEPER-1560: Zookeeper client hangs on creation of large nodes

Review Request #7730 - Created Oct. 25, 2012 and submitted

Skye Wanderman-Milne
3.4.4
ZOOKEEPER-1560
Reviewers
zookeeper
phunt, tedyu
zookeeper-git
see ZOOKEEPER-1560 JIRA
unit tests (including testLargeNodeData from ZOOKEEPER-1560 JIRA)

Diff revision 1

This is not the most recent revision of the diff. The latest diff is revision 3. See what's changed.

1 2 3
1 2 3

  1. src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java: Loading...
src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
Revision 70d8538 New Change
[20] 101 lines
[+20] [+] public class ClientCnxnSocketNIO extends ClientCnxnSocket {
102
                    updateLastHeard();
102
                    updateLastHeard();
103
                }
103
                }
104
            }
104
            }
105
        }
105
        }
106
        if (sockKey.isWritable()) {
106
        if (sockKey.isWritable()) {
107
            LinkedList<Packet> pending = new LinkedList<Packet>();

   
108
            Packet p = null;

   
109
            synchronized(outgoingQueue) {
107
            synchronized(outgoingQueue) {
110
                p = findSendablePacket(outgoingQueue,
108
                Packet p = findSendablePacket(outgoingQueue,
111
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());
109
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());
112

    
   
110

   
113
                if (p != null) {
111
                if (p != null) {
114
                    outgoingQueue.removeFirstOccurrence(p);

   
115
                    updateLastSend();
112
                    updateLastSend();
116
                    if ((p.requestHeader != null) &&
113
                    if ((p.requestHeader != null) &&
117
                            (p.requestHeader.getType() != OpCode.ping) &&
114
                            (p.requestHeader.getType() != OpCode.ping) &&
118
                            (p.requestHeader.getType() != OpCode.auth)) {
115
                            (p.requestHeader.getType() != OpCode.auth) &&

    
   
116
                            p.requestHeader.getXid() == 0 /* make sure xid isn't already set */) {
119
                        p.requestHeader.setXid(cnxn.getXid());
117
                        p.requestHeader.setXid(cnxn.getXid());
120
                    }
118
                    }

    
   
119
                    // If we already started writing p, p.bb will already exist

    
   
120
                    if (p.bb == null) {
121
                    p.createBB();
121
                        p.createBB();
122
                    ByteBuffer pbb = p.bb;
122
                    }
123
                    sock.write(pbb);
123
                    sock.write(p.bb);
124
                    if (!pbb.hasRemaining()) {
124
                    if (!p.bb.hasRemaining()) {
125
                        sentCount++;
125
                        sentCount++;

    
   
126
                        outgoingQueue.removeFirstOccurrence(p);
126
                        if (p.requestHeader != null
127
                        if (p.requestHeader != null
127
                                && p.requestHeader.getType() != OpCode.ping
128
                                && p.requestHeader.getType() != OpCode.ping
128
                                && p.requestHeader.getType() != OpCode.auth) {
129
                                && p.requestHeader.getType() != OpCode.auth) {
129
                            pending.add(p);
130
                            synchronized (pendingQueue) {

    
   
131
                                pendingQueue.add(p);

    
   
132
                            }
130
                        }
133
                        }
131
                    }
134
                    }
132
                } else {
135
                } else {
133
                    // No suitable packet to send: turn off write interest flag.
136
                    // No suitable packet to send: turn off write interest flag.
134
                    // Will be turned on later by a later call to enableWrite(),
137
                    // Will be turned on later by a later call to enableWrite(),
135
                    // from within ZooKeeperSaslClient (if client is configured
138
                    // from within ZooKeeperSaslClient (if client is configured
136
                    // to attempt SASL authentication), or in either doIO() or
139
                    // to attempt SASL authentication), or in either doIO() or
137
                    // in doTransport() if not.
140
                    // in doTransport() if not.
138
                    disableWrite();
141
                    disableWrite();
139
                }
142
                }
140
            }
143
            }
141
            synchronized(pendingQueue) {

   
142
                pendingQueue.addAll(pending);

   
143
            }

   
144

    
   

   
145
        }
144
        }
146
    }
145
    }
147

    
   
146

   
148
    private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
147
    private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
149
                                      boolean clientTunneledAuthenticationInProgress) {
148
                                      boolean clientTunneledAuthenticationInProgress) {
150
        synchronized (outgoingQueue) {
149
        synchronized (outgoingQueue) {
151
            if (!outgoingQueue.isEmpty()) {
150
            if (outgoingQueue.isEmpty()) {
152
                if (clientTunneledAuthenticationInProgress) {
151
                return null;
153
                    Packet p = null;
152
            }

    
   
153
            if (outgoingQueue.getFirst().bb != null // If we already starting sending the first packet, we better finish

    
   
154
                || !clientTunneledAuthenticationInProgress) {
Moved from 179

    
   
155
                return outgoingQueue.getFirst();
Moved from 180

    
   
156
            }

    
   
157
            
154
                    // Since client's authentication with server is in progress,
158
            // Since client's authentication with server is in progress,
155
                    // send only the null-header packet queued by primeConnection().
159
            // send only the null-header packet queued by primeConnection().
156
                    // This packet must be sent so that the SASL authentication process
160
            // This packet must be sent so that the SASL authentication process
157
                    // can proceed, but all other packets should wait until
161
            // can proceed, but all other packets should wait until
158
                    // SASL authentication completes.
162
            // SASL authentication completes.
159
                    Iterator<Packet> iter = outgoingQueue.listIterator();
163
            for (Packet p : outgoingQueue) {
160
                    while(iter.hasNext()) {

   
161
                        p = iter.next();

   
162
                        if (p.requestHeader == null) {
164
                if (p.requestHeader == null) {
163
                            // We've found the priming-packet.
165
                    // We've found the priming-packet.
164
                            return p;
166
                    return p;
165
                        } else {
167
                } else {
166
                            // Non-priming packet: defer it until later, leaving it in the queue
168
                    // Non-priming packet: defer it until later, leaving it in the queue
[+20] [20] 4 lines
[+20] private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
171
                            }
173
                    }
172
                        }
174
                }
173
                    }
175
            }
174
                    // no sendable packet found.
176
            // no sendable packet found.
175
                    return null;
177
            return null;
176
                } else {

   
177
                    // Tunnelled authentication is not in progress: just

   
178
                    // send the first packet in the queue.

   
179
                    return outgoingQueue.getFirst();

   
180
                }

   
181
            }
178
        }
182
        }
179
    }
183
        return null;

   
184
    }

   
185

    
   
180

   
186
    @Override
181
    @Override
187
    void cleanup() {
182
    void cleanup() {
188
        if (sockKey != null) {
183
        if (sockKey != null) {
189
            SocketChannel sock = (SocketChannel) sockKey.channel();
184
            SocketChannel sock = (SocketChannel) sockKey.channel();
[+20] [20] 225 lines
  1. src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java: Loading...