Review Board 1.7.22


FLUME-1823. LoadBalancingRpcClient method must throw exception if it is called after close is called.

Review Request #8874 - Created Jan. 8, 2013 and updated

Hari Shreedharan
FLUME-1823
Reviewers
Flume
flume-git
A closed RpcClient will now throw if append is called after close
Added a unit test to verify the new feature. All current tests pass.
flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
Revision f396104 New Change
[20] 54 lines
[+20] [+] public class LoadBalancingRpcClient extends AbstractRpcClient {
55

    
   
55

   
56
  private List<HostInfo> hosts;
56
  private List<HostInfo> hosts;
57
  private HostSelector selector;
57
  private HostSelector selector;
58
  private Map<String, RpcClient> clientMap;
58
  private Map<String, RpcClient> clientMap;
59
  private Properties configurationProperties;
59
  private Properties configurationProperties;

    
   
60
  private volatile boolean isOpen = false;
60

    
   
61

   
61
  @Override
62
  @Override
62
  public void append(Event event) throws EventDeliveryException {
63
  public void append(Event event) throws EventDeliveryException {

    
   
64
    throwIfClosed();
63
    boolean eventSent = false;
65
    boolean eventSent = false;
64
    Iterator<HostInfo> it = selector.createHostIterator();
66
    Iterator<HostInfo> it = selector.createHostIterator();
65

    
   
67

   
66
    while (it.hasNext()) {
68
    while (it.hasNext()) {
67
      HostInfo host = it.next();
69
      HostInfo host = it.next();
[+20] [20] 13 lines
[+20] public class LoadBalancingRpcClient extends AbstractRpcClient {
81
    }
83
    }
82
  }
84
  }
83

    
   
85

   
84
  @Override
86
  @Override
85
  public void appendBatch(List<Event> events) throws EventDeliveryException {
87
  public void appendBatch(List<Event> events) throws EventDeliveryException {

    
   
88
    throwIfClosed();
86
    boolean batchSent = false;
89
    boolean batchSent = false;
87
    Iterator<HostInfo> it = selector.createHostIterator();
90
    Iterator<HostInfo> it = selector.createHostIterator();
88

    
   
91

   
89
    while (it.hasNext()) {
92
    while (it.hasNext()) {
90
      HostInfo host = it.next();
93
      HostInfo host = it.next();
[+20] [20] 13 lines
[+20] public class LoadBalancingRpcClient extends AbstractRpcClient {
104
    }
107
    }
105
  }
108
  }
106

    
   
109

   
107
  @Override
110
  @Override
108
  public boolean isActive() {
111
  public boolean isActive() {
109
    // This client is always active and does not need to be replaced.
112
    return isOpen;
110
    // Internally it will test the delegates and replace them where needed.
113
  }
111
    return true;
114

   

    
   
115
  private void throwIfClosed() throws EventDeliveryException {

    
   
116
    if (!isOpen) {

    
   
117
      throw new EventDeliveryException("Rpc Client is closed");

    
   
118
    }
112
  }
119
  }
113

    
   
120

   
114
  @Override
121
  @Override
115
  public void close() throws FlumeException {
122
  public void close() throws FlumeException {

    
   
123
    isOpen = false;
116
    synchronized (this) {
124
    synchronized (this) {
117
      Iterator<String> it = clientMap.keySet().iterator();
125
      Iterator<String> it = clientMap.keySet().iterator();
118
      while (it.hasNext()) {
126
      while (it.hasNext()) {
119
        String name = it.next();
127
        String name = it.next();
120
        RpcClient client = clientMap.get(name);
128
        RpcClient client = clientMap.get(name);
[+20] [20] 54 lines
[+20] [+] protected void configure(Properties properties) throws FlumeException {
175
            + lbTypeName, ex);
183
            + lbTypeName, ex);
176
      }
184
      }
177
    }
185
    }
178

    
   
186

   
179
    selector.setHosts(hosts);
187
    selector.setHosts(hosts);

    
   
188
    isOpen = true;
180
  }
189
  }
181

    
   
190

   
182
  private synchronized RpcClient getClient(HostInfo info)
191
  private synchronized RpcClient getClient(HostInfo info)
183
      throws FlumeException {
192
      throws FlumeException, EventDeliveryException {
184

    
   
193
    throwIfClosed();
185
    String name = info.getReferenceName();
194
    String name = info.getReferenceName();
186
    RpcClient client = clientMap.get(name);
195
    RpcClient client = clientMap.get(name);
187
    if (client == null) {
196
    if (client == null) {
188
      client = createClient(name);
197
      client = createClient(name);
189
      clientMap.put(name, client);
198
      clientMap.put(name, client);
[+20] [20] 93 lines
flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
Revision 9071734 New Change
 
  1. flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java: Loading...
  2. flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java: Loading...