Review Board 1.7.22


ZOOKEEPER-786 Exception in ZooKeeper.toString

Review Request #1714 - Created Sept. 5, 2011 and submitted

Thomas Koch
ZOOKEEPER-786
Reviewers
zookeeper
zookeeper-git
.

 

Diff revision 1 (Latest)

  1. src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java: Loading...
src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
Revision 626da04 New Change
1
/**
1
/**
2
 * Licensed to the Apache Software Foundation (ASF) under one
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
8
 * with the License.  You may obtain a copy of the License at
9
 *
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18

    
   
18

   
19
package org.apache.zookeeper;
19
package org.apache.zookeeper;
20

    
   
20

   
21
import java.io.IOException;
21
import java.io.IOException;
22
import java.net.InetSocketAddress;
22
import java.net.InetSocketAddress;

    
   
23
import java.net.Socket;
23
import java.net.SocketAddress;
24
import java.net.SocketAddress;
24
import java.nio.ByteBuffer;
25
import java.nio.ByteBuffer;
25
import java.nio.channels.SelectionKey;
26
import java.nio.channels.SelectionKey;
26
import java.nio.channels.Selector;
27
import java.nio.channels.Selector;
27
import java.nio.channels.SocketChannel;
28
import java.nio.channels.SocketChannel;
28
import java.util.LinkedList;
29
import java.util.LinkedList;
29
import java.util.List;
30
import java.util.List;
30
import java.util.Set;
31
import java.util.Set;
31

    
   
32

   
32
import org.slf4j.Logger;
33
import org.slf4j.Logger;
33
import org.slf4j.LoggerFactory;
34
import org.slf4j.LoggerFactory;
34
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
35
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
35
import org.apache.zookeeper.ClientCnxn.Packet;
36
import org.apache.zookeeper.ClientCnxn.Packet;
36
import org.apache.zookeeper.ZooDefs.OpCode;
37
import org.apache.zookeeper.ZooDefs.OpCode;
37
import org.apache.zookeeper.ZooKeeper.States;
38
import org.apache.zookeeper.ZooKeeper.States;
38

    
   
39

   
39
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
40
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
40
    private static final Logger LOG = LoggerFactory
41
    private static final Logger LOG = LoggerFactory
41
            .getLogger(ClientCnxnSocketNIO.class);
42
            .getLogger(ClientCnxnSocketNIO.class);
42

    
   
43

   
43
    private final Selector selector = Selector.open();
44
    private final Selector selector = Selector.open();
44

    
   
45

   
45
    private SelectionKey sockKey;
46
    private SelectionKey sockKey;
46

    
   
47

   

    
   
48
    private SocketAddress localSocketAddress;

    
   
49

   

    
   
50
    private SocketAddress remoteSocketAddress;

    
   
51

   
47
    ClientCnxnSocketNIO() throws IOException {
52
    ClientCnxnSocketNIO() throws IOException {
48
        super();
53
        super();
49
    }
54
    }
50

    
   
55

   
51
    @Override
56
    @Override
52
    boolean isConnected() {
57
    boolean isConnected() {
53
        return sockKey != null;
58
        return sockKey != null;
54
    }
59
    }
55
    
60
    
56
    /**
61
    /**
57
     * @return true if a packet was received
62
     * @return true if a packet was received
58
     * @throws InterruptedException
63
     * @throws InterruptedException
59
     * @throws IOException
64
     * @throws IOException
60
     */
65
     */
61
    void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
66
    void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
62
        SocketChannel sock = (SocketChannel) sockKey.channel();
67
        SocketChannel sock = (SocketChannel) sockKey.channel();
63
        if (sock == null) {
68
        if (sock == null) {
64
            throw new IOException("Socket is null!");
69
            throw new IOException("Socket is null!");
65
        }
70
        }
66
        if (sockKey.isReadable()) {
71
        if (sockKey.isReadable()) {
67
            int rc = sock.read(incomingBuffer);
72
            int rc = sock.read(incomingBuffer);
68
            if (rc < 0) {
73
            if (rc < 0) {
69
                throw new EndOfStreamException(
74
                throw new EndOfStreamException(
70
                        "Unable to read additional data from server sessionid 0x"
75
                        "Unable to read additional data from server sessionid 0x"
71
                                + Long.toHexString(sessionId)
76
                                + Long.toHexString(sessionId)
72
                                + ", likely server has closed socket");
77
                                + ", likely server has closed socket");
73
            }
78
            }
74
            if (!incomingBuffer.hasRemaining()) {
79
            if (!incomingBuffer.hasRemaining()) {
75
                incomingBuffer.flip();
80
                incomingBuffer.flip();
76
                if (incomingBuffer == lenBuffer) {
81
                if (incomingBuffer == lenBuffer) {
77
                    recvCount++;
82
                    recvCount++;
78
                    readLength();
83
                    readLength();
79
                } else if (!initialized) {
84
                } else if (!initialized) {
80
                    readConnectResult();
85
                    readConnectResult();
81
                    enableRead();
86
                    enableRead();
82
                    if (!outgoingQueue.isEmpty()) {
87
                    if (!outgoingQueue.isEmpty()) {
83
                        enableWrite();
88
                        enableWrite();
84
                    }
89
                    }
85
                    lenBuffer.clear();
90
                    lenBuffer.clear();
86
                    incomingBuffer = lenBuffer;
91
                    incomingBuffer = lenBuffer;
87
                    updateLastHeard();
92
                    updateLastHeard();
88
                    initialized = true;
93
                    initialized = true;
89
                } else {
94
                } else {
90
                    sendThread.readResponse(incomingBuffer);
95
                    sendThread.readResponse(incomingBuffer);
91
                    lenBuffer.clear();
96
                    lenBuffer.clear();
92
                    incomingBuffer = lenBuffer;
97
                    incomingBuffer = lenBuffer;
93
                    updateLastHeard();
98
                    updateLastHeard();
94
                }
99
                }
95
            }
100
            }
96
        }
101
        }
97
        if (sockKey.isWritable()) {
102
        if (sockKey.isWritable()) {
98
            LinkedList<Packet> pending = new LinkedList<Packet>();
103
            LinkedList<Packet> pending = new LinkedList<Packet>();
99
            synchronized (outgoingQueue) {
104
            synchronized (outgoingQueue) {
100
                if (!outgoingQueue.isEmpty()) {
105
                if (!outgoingQueue.isEmpty()) {
101
                    updateLastSend();
106
                    updateLastSend();
102
                    ByteBuffer pbb = outgoingQueue.getFirst().bb;
107
                    ByteBuffer pbb = outgoingQueue.getFirst().bb;
103
                    sock.write(pbb);
108
                    sock.write(pbb);
104
                    if (!pbb.hasRemaining()) {
109
                    if (!pbb.hasRemaining()) {
105
                        sentCount++;
110
                        sentCount++;
106
                        Packet p = outgoingQueue.removeFirst();
111
                        Packet p = outgoingQueue.removeFirst();
107
                        if (p.requestHeader != null
112
                        if (p.requestHeader != null
108
                                && p.requestHeader.getType() != OpCode.ping
113
                                && p.requestHeader.getType() != OpCode.ping
109
                                && p.requestHeader.getType() != OpCode.auth) {
114
                                && p.requestHeader.getType() != OpCode.auth) {
110
                            pending.add(p);
115
                            pending.add(p);
111
                        }
116
                        }
112
                    }
117
                    }
113
                }
118
                }
114
            }
119
            }
115
            synchronized(pendingQueue) {
120
            synchronized(pendingQueue) {
116
                pendingQueue.addAll(pending);
121
                pendingQueue.addAll(pending);
117
            }
122
            }
118
        }
123
        }
119
    }
124
    }
120

    
   
125

   
121
    @Override
126
    @Override
122
    void cleanup() {
127
    void cleanup() {
123
        if (sockKey != null) {
128
        if (sockKey != null) {
124
            SocketChannel sock = (SocketChannel) sockKey.channel();
129
            SocketChannel sock = (SocketChannel) sockKey.channel();
125
            sockKey.cancel();
130
            sockKey.cancel();
126
            try {
131
            try {
127
                sock.socket().shutdownInput();
132
                sock.socket().shutdownInput();
128
            } catch (IOException e) {
133
            } catch (IOException e) {
129
                if (LOG.isDebugEnabled()) {
134
                if (LOG.isDebugEnabled()) {
130
                    LOG.debug("Ignoring exception during shutdown input", e);
135
                    LOG.debug("Ignoring exception during shutdown input", e);
131
                }
136
                }
132
            }
137
            }
133
            try {
138
            try {
134
                sock.socket().shutdownOutput();
139
                sock.socket().shutdownOutput();
135
            } catch (IOException e) {
140
            } catch (IOException e) {
136
                if (LOG.isDebugEnabled()) {
141
                if (LOG.isDebugEnabled()) {
137
                    LOG.debug("Ignoring exception during shutdown output",
142
                    LOG.debug("Ignoring exception during shutdown output",
138
                            e);
143
                            e);
139
                }
144
                }
140
            }
145
            }
141
            try {
146
            try {
142
                sock.socket().close();
147
                sock.socket().close();
143
            } catch (IOException e) {
148
            } catch (IOException e) {
144
                if (LOG.isDebugEnabled()) {
149
                if (LOG.isDebugEnabled()) {
145
                    LOG.debug("Ignoring exception during socket close", e);
150
                    LOG.debug("Ignoring exception during socket close", e);
146
                }
151
                }
147
            }
152
            }
148
            try {
153
            try {
149
                sock.close();
154
                sock.close();
150
            } catch (IOException e) {
155
            } catch (IOException e) {
151
                if (LOG.isDebugEnabled()) {
156
                if (LOG.isDebugEnabled()) {
152
                    LOG.debug("Ignoring exception during channel close", e);
157
                    LOG.debug("Ignoring exception during channel close", e);
153
                }
158
                }
154
            }
159
            }
155
        }
160
        }
156
        try {
161
        try {
157
            Thread.sleep(100);
162
            Thread.sleep(100);
158
        } catch (InterruptedException e) {
163
        } catch (InterruptedException e) {
159
            if (LOG.isDebugEnabled()) {
164
            if (LOG.isDebugEnabled()) {
160
                LOG.debug("SendThread interrupted during sleep, ignoring");
165
                LOG.debug("SendThread interrupted during sleep, ignoring");
161
            }
166
            }
162
        }
167
        }
163
        sockKey = null;
168
        sockKey = null;
164
    }
169
    }
165
 
170
 
166
    @Override
171
    @Override
167
    void close() {
172
    void close() {
168
        try {
173
        try {
169
            if (LOG.isTraceEnabled()) {
174
            if (LOG.isTraceEnabled()) {
170
                LOG.trace("Doing client selector close");
175
                LOG.trace("Doing client selector close");
171
            }
176
            }
172
            selector.close();
177
            selector.close();
173
            if (LOG.isTraceEnabled()) {
178
            if (LOG.isTraceEnabled()) {
174
                LOG.trace("Closed client selector");
179
                LOG.trace("Closed client selector");
175
            }
180
            }
176
        } catch (IOException e) {
181
        } catch (IOException e) {
177
            LOG.warn("Ignoring exception during selector close", e);
182
            LOG.warn("Ignoring exception during selector close", e);
178
        }
183
        }
179
    }
184
    }
180

    
   
185

   
181
    @Override
186
    @Override
182
    void connect(InetSocketAddress addr) throws IOException {
187
    void connect(InetSocketAddress addr) throws IOException {
183
        SocketChannel sock;
188
        SocketChannel sock;
184
        sock = SocketChannel.open();
189
        sock = SocketChannel.open();
185
        sock.configureBlocking(false);
190
        sock.configureBlocking(false);
186
        sock.socket().setSoLinger(false, -1);
191
        sock.socket().setSoLinger(false, -1);
187
        sock.socket().setTcpNoDelay(true);
192
        sock.socket().setTcpNoDelay(true);
188
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
193
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
189
        if (sock.connect(addr)) {
194
        sock.connect(addr);
190
            sendThread.primeConnection();

   
191
        }

   
192
        initialized = false;
195
        initialized = false;
193

    
   
196

   
194
        /*
197
        /*
195
         * Reset incomingBuffer
198
         * Reset incomingBuffer
196
         */
199
         */
197
        lenBuffer.clear();
200
        lenBuffer.clear();
198
        incomingBuffer = lenBuffer;
201
        incomingBuffer = lenBuffer;
199
    }
202
    }
200

    
   
203

   
201
    /**
204
    /**
202
     * Returns the address to which the socket is connected.
205
     * Returns the address to which the socket is connected.
203
     * 
206
     * 
204
     * @return ip address of the remote side of the connection or null if not
207
     * @return ip address of the remote side of the connection or null if not
205
     *         connected
208
     *         connected
206
     */
209
     */
207
    @Override
210
    @Override
208
    SocketAddress getRemoteSocketAddress() {
211
    SocketAddress getRemoteSocketAddress() {
209
        // a lot could go wrong here, so rather than put in a bunch of code
212
        return remoteSocketAddress;
210
        // to check for nulls all down the chain let's do it the simple

   
211
        // yet bulletproof way

   
212
        try {

   
213
            return ((SocketChannel) sockKey.channel()).socket()

   
214
                    .getRemoteSocketAddress();

   
215
        } catch (NullPointerException e) {

   
216
            return null;

   
217
        }

   
218
    }
213
    }
219

    
   
214

   
220
    /**
215
    /**
221
     * Returns the local address to which the socket is bound.
216
     * Returns the local address to which the socket is bound.
222
     * 
217
     * 
223
     * @return ip address of the remote side of the connection or null if not
218
     * @return ip address of the remote side of the connection or null if not
224
     *         connected
219
     *         connected
225
     */
220
     */
226
    @Override
221
    @Override
227
    SocketAddress getLocalSocketAddress() {
222
    SocketAddress getLocalSocketAddress() {
228
        // a lot could go wrong here, so rather than put in a bunch of code
223
        return localSocketAddress;
229
        // to check for nulls all down the chain let's do it the simple

   
230
        // yet bulletproof way

   
231
        try {

   
232
            return ((SocketChannel) sockKey.channel()).socket()

   
233
                    .getLocalSocketAddress();

   
234
        } catch (NullPointerException e) {

   
235
            return null;

   
236
        }
224
    }

    
   
225
    

    
   
226
    private void updateSocketAddresses() {

    
   
227
        Socket socket = ((SocketChannel) sockKey.channel()).socket();

    
   
228
        localSocketAddress = socket.getLocalSocketAddress();

    
   
229
        remoteSocketAddress = socket.getRemoteSocketAddress();
237
    }
230
    }
238

    
   
231

   
239
    @Override
232
    @Override
240
    synchronized void wakeupCnxn() {
233
    synchronized void wakeupCnxn() {
241
        selector.wakeup();
234
        selector.wakeup();
242
    }
235
    }
243
    
236
    
244
    @Override
237
    @Override
245
    void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue )
238
    void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue )
246
            throws IOException, InterruptedException {
239
            throws IOException, InterruptedException {
247
        selector.select(waitTimeOut);
240
        selector.select(waitTimeOut);
248
        Set<SelectionKey> selected;
241
        Set<SelectionKey> selected;
249
        synchronized (this) {
242
        synchronized (this) {
250
            selected = selector.selectedKeys();
243
            selected = selector.selectedKeys();
251
        }
244
        }
252
        // Everything below and until we get back to the select is
245
        // Everything below and until we get back to the select is
253
        // non blocking, so time is effectively a constant. That is
246
        // non blocking, so time is effectively a constant. That is
254
        // Why we just have to do this once, here
247
        // Why we just have to do this once, here
255
        updateNow();
248
        updateNow();
256
        for (SelectionKey k : selected) {
249
        for (SelectionKey k : selected) {
257
            SocketChannel sc = ((SocketChannel) k.channel());
250
            SocketChannel sc = ((SocketChannel) k.channel());
258
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
251
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
259
                if (sc.finishConnect()) {
252
                if (sc.finishConnect()) {
260
                    updateLastSendAndHeard();
253
                    updateLastSendAndHeard();

    
   
254
                    updateSocketAddresses();
261
                    sendThread.primeConnection();
255
                    sendThread.primeConnection();
262
                }
256
                }
263
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
257
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
264
                doIO(pendingQueue, outgoingQueue);
258
                doIO(pendingQueue, outgoingQueue);
265
            }
259
            }
266
        }
260
        }
267
        if (sendThread.getZkState().isConnected()) {
261
        if (sendThread.getZkState().isConnected()) {
268
            synchronized(outgoingQueue) {
262
            synchronized(outgoingQueue) {
269
                if (!outgoingQueue.isEmpty()) {
263
                if (!outgoingQueue.isEmpty()) {
270
                    enableWrite();
264
                    enableWrite();
271
                } else {
265
                } else {
272
                    disableWrite();
266
                    disableWrite();
273
                }
267
                }
274
            }
268
            }
275
        }
269
        }
276
        selected.clear();
270
        selected.clear();
277
    }
271
    }
278

    
   
272

   
279
    //TODO should this be synchronized?
273
    //TODO should this be synchronized?
280
    @Override
274
    @Override
281
    void testableCloseSocket() throws IOException {
275
    void testableCloseSocket() throws IOException {
282
        LOG.info("testableCloseSocket() called");
276
        LOG.info("testableCloseSocket() called");
283
        ((SocketChannel) sockKey.channel()).socket().close();
277
        ((SocketChannel) sockKey.channel()).socket().close();
284
    }
278
    }
285

    
   
279

   
286
    @Override
280
    @Override
287
    synchronized void enableWrite() {
281
    synchronized void enableWrite() {
288
        int i = sockKey.interestOps();
282
        int i = sockKey.interestOps();
289
        if ((i & SelectionKey.OP_WRITE) == 0) {
283
        if ((i & SelectionKey.OP_WRITE) == 0) {
290
            sockKey.interestOps(i | SelectionKey.OP_WRITE);
284
            sockKey.interestOps(i | SelectionKey.OP_WRITE);
291
        }
285
        }
292
    }
286
    }
293

    
   
287

   
294
    private synchronized void disableWrite() {
288
    private synchronized void disableWrite() {
295
        int i = sockKey.interestOps();
289
        int i = sockKey.interestOps();
296
        if ((i & SelectionKey.OP_WRITE) != 0) {
290
        if ((i & SelectionKey.OP_WRITE) != 0) {
297
            sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
291
            sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
298
        }
292
        }
299
    }
293
    }
300

    
   
294

   
301
    synchronized private void enableRead() {
295
    synchronized private void enableRead() {
302
        int i = sockKey.interestOps();
296
        int i = sockKey.interestOps();
303
        if ((i & SelectionKey.OP_READ) == 0) {
297
        if ((i & SelectionKey.OP_READ) == 0) {
304
            sockKey.interestOps(i | SelectionKey.OP_READ);
298
            sockKey.interestOps(i | SelectionKey.OP_READ);
305
        }
299
        }
306
    }
300
    }
307

    
   
301

   
308
    @Override
302
    @Override
309
    synchronized void enableReadWriteOnly() {
303
    synchronized void enableReadWriteOnly() {
310
        sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
304
        sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
311
    }
305
    }
312
}
306
}
  1. src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java: Loading...