Review Board 1.7.22


FLUME-1755: Load balancing RPC client has issues with downed hosts

Review Request #8288 - Created Nov. 30, 2012 and submitted

Mike Percy
FLUME-1755
Reviewers
Flume
flume-git
Patch to fix exception handling in Load balancing RPC client
Added a unit test to exercise this case. Fails on current code base.
flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
Revision 42297c1 New Change
[20] 62 lines
[+20] [+] public class LoadBalancingRpcClient extends AbstractRpcClient {
63
    boolean eventSent = false;
63
    boolean eventSent = false;
64
    Iterator<HostInfo> it = selector.createHostIterator();
64
    Iterator<HostInfo> it = selector.createHostIterator();
65

    
   
65

   
66
    while (it.hasNext()) {
66
    while (it.hasNext()) {
67
      HostInfo host = it.next();
67
      HostInfo host = it.next();
68
      RpcClient client;

   
69
      try {
68
      try {
70
        client = getClient(host);
69
        RpcClient client = getClient(host);
71
        client.append(event);
70
        client.append(event);
72
        eventSent = true;
71
        eventSent = true;
73
        break;
72
        break;
74
      } catch (Exception ex) {
73
      } catch (Exception ex) {
75
        selector.informFailure(host);
74
        selector.informFailure(host);
[+20] [20] 11 lines
[+20] public class LoadBalancingRpcClient extends AbstractRpcClient {
87
    boolean batchSent = false;
86
    boolean batchSent = false;
88
    Iterator<HostInfo> it = selector.createHostIterator();
87
    Iterator<HostInfo> it = selector.createHostIterator();
89

    
   
88

   
90
    while (it.hasNext()) {
89
    while (it.hasNext()) {
91
      HostInfo host = it.next();
90
      HostInfo host = it.next();
92
      RpcClient client = getClient(host);

   
93
      try {
91
      try {

    
   
92
        RpcClient client = getClient(host);
94
        client.appendBatch(events);
93
        client.appendBatch(events);
95
        batchSent = true;
94
        batchSent = true;
96
        break;
95
        break;
97
      } catch (Exception ex) {
96
      } catch (Exception ex) {
98
        selector.informFailure(host);
97
        selector.informFailure(host);
[+20] [20] 79 lines
[+20] [+] protected void configure(Properties properties) throws FlumeException {
178
    }
177
    }
179

    
   
178

   
180
    selector.setHosts(hosts);
179
    selector.setHosts(hosts);
181
  }
180
  }
182

    
   
181

   
183
  private synchronized RpcClient getClient(HostInfo info) {
182
  private synchronized RpcClient getClient(HostInfo info)

    
   
183
      throws FlumeException {

    
   
184

   
184
    String name = info.getReferenceName();
185
    String name = info.getReferenceName();
185
    RpcClient client = clientMap.get(name);
186
    RpcClient client = clientMap.get(name);
186
    if (client == null) {
187
    if (client == null) {
187
      client = createClient(name);
188
      client = createClient(name);
188
      clientMap.put(name, client);
189
      clientMap.put(name, client);
[+20] [20] 8 lines
[+20] private synchronized RpcClient getClient(HostInfo info) { private synchronized RpcClient getClient(HostInfo info)
197
    }
198
    }
198

    
   
199

   
199
    return client;
200
    return client;
200
  }
201
  }
201

    
   
202

   
202
  private RpcClient createClient(String referenceName) {
203
  private RpcClient createClient(String referenceName) throws FlumeException {
203
    Properties props = getClientConfigurationProperties(referenceName);
204
    Properties props = getClientConfigurationProperties(referenceName);
204
    return RpcClientFactory.getInstance(props);
205
    return RpcClientFactory.getInstance(props);
205
  }
206
  }
206

    
   
207

   
207
  private Properties getClientConfigurationProperties(String referenceName) {
208
  private Properties getClientConfigurationProperties(String referenceName) {
[+20] [20] 74 lines
flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
Revision 49a69bf New Change
 
  1. flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java: Loading...
  2. flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java: Loading...