Review Board 1.7.22


FLUME-1316: AvroSink should be configurable for connect-timeout and request-timeout

Review Request #5683 - Created June 30, 2012 and submitted

Mike Percy
FLUME-1316
Reviewers
Flume
flume-git
Patch to add support for configurable connect and request timeout to Avro Sink. Also refactors some of the RpcClient libs to reduce the # of codepaths.
Added a unit test for the new params. Existing unit tests pass.
flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
Revision 5c6d0e3 New Change
[20] 36 lines
[+20]
37
import org.slf4j.Logger;
37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
38
import org.slf4j.LoggerFactory;
39

    
   
39

   
40
import com.google.common.base.Preconditions;
40
import com.google.common.base.Preconditions;
41
import com.google.common.collect.Lists;
41
import com.google.common.collect.Lists;

    
   
42
import java.util.Properties;

    
   
43
import org.apache.flume.api.RpcClientConfigurationConstants;
42

    
   
44

   
43
/**
45
/**
44
 * <p>
46
 * <p>
45
 * A {@link Sink} implementation that can send events to an RPC server (such as
47
 * A {@link Sink} implementation that can send events to an RPC server (such as
46
 * Flume's {@link AvroSource}).
48
 * Flume's {@link AvroSource}).
[+20] [20] 25 lines
[+20]
72
 * </p>
74
 * </p>
73
 * <table>
75
 * <table>
74
 * <tr>
76
 * <tr>
75
 * <th>Parameter</th>
77
 * <th>Parameter</th>
76
 * <th>Description</th>
78
 * <th>Description</th>
77
 * <th>Unit / Type</th>
79
 * <th>Unit (data type)</th>
78
 * <th>Default</th>
80
 * <th>Default</th>
79
 * </tr>
81
 * </tr>
80
 * <tr>
82
 * <tr>
81
 * <td><tt>hostname</tt></td>
83
 * <td><tt>hostname</tt></td>
82
 * <td>The hostname to which events should be sent.</td>
84
 * <td>The hostname to which events should be sent.</td>
83
 * <td>Hostname or IP / String</td>
85
 * <td>Hostname or IP (String)</td>
84
 * <td>none (required)</td>
86
 * <td>none (required)</td>
85
 * </tr>
87
 * </tr>
86
 * <tr>
88
 * <tr>
87
 * <td><tt>port</tt></td>
89
 * <td><tt>port</tt></td>
88
 * <td>The port to which events should be sent on <tt>hostname</tt>.</td>
90
 * <td>The port to which events should be sent on <tt>hostname</tt>.</td>
89
 * <td>TCP port / int</td>
91
 * <td>TCP port (int)</td>
90
 * <td>none (required)</td>
92
 * <td>none (required)</td>
91
 * </tr>
93
 * </tr>
92
 * <tr>
94
 * <tr>
93
 * <td><tt>batch-size</tt></td>
95
 * <td><tt>batch-size</tt></td>
94
 * <td>The maximum number of events to send per RPC.</td>
96
 * <td>The maximum number of events to send per RPC.</td>
95
 * <td>events / int</td>
97
 * <td>events (int)</td>
96
 * <td>100</td>
98
 * <td>100</td>
97
 * </tr>
99
 * </tr>

    
   
100
 * <tr>

    
   
101
 * <td><tt>connect-timeout</tt></td>

    
   
102
 * <td>Maximum time to wait for the first Avro handshake and RPC request</td>

    
   
103
 * <td>milliseconds (long)</td>

    
   
104
 * <td>20000</td>

    
   
105
 * </tr>

    
   
106
 * <tr>

    
   
107
 * <td><tt>request-timeout</tt></td>

    
   
108
 * <td>Maximum time to wait RPC requests after the first</td>

    
   
109
 * <td>milliseconds (long)</td>

    
   
110
 * <td>20000</td>

    
   
111
 * </tr>
98
 * </table>
112
 * </table>
99
 * <p>
113
 * <p>
100
 * <b>Metrics</b>
114
 * <b>Metrics</b>
101
 * </p>
115
 * </p>
102
 * <p>
116
 * <p>
103
 * TODO
117
 * TODO
104
 * </p>
118
 * </p>
105
 */
119
 */
106
public class AvroSink extends AbstractSink implements Configurable {
120
public class AvroSink extends AbstractSink implements Configurable {
107

    
   
121

   
108
  private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
122
  private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
109
  private static final Integer defaultBatchSize = 100;

   
110

    
   
123

   
111
  private String hostname;
124
  private String hostname;
112
  private Integer port;
125
  private Integer port;
113
  private Integer batchSize;

   
114

    
   
126

   
115
  private RpcClient client;
127
  private RpcClient client;
116
  private CounterGroup counterGroup;
128
  private CounterGroup counterGroup;

    
   
129
  private Properties clientProps;
117

    
   
130

   
118
  public AvroSink() {
131
  public AvroSink() {
119
    counterGroup = new CounterGroup();
132
    counterGroup = new CounterGroup();
120
  }
133
  }
121

    
   
134

   
122
  @Override
135
  @Override
123
  public void configure(Context context) {
136
  public void configure(Context context) {

    
   
137
    clientProps = new Properties();

    
   
138

   
124
    hostname = context.getString("hostname");
139
    hostname = context.getString("hostname");
125
    port = context.getInteger("port");
140
    port = context.getInteger("port");
126

    
   
141

   
127
    batchSize = context.getInteger("batch-size");

   
128
    if (batchSize == null) {

   
129
      batchSize = defaultBatchSize;

   
130
    }

   
131

    
   

   
132
    Preconditions.checkState(hostname != null, "No hostname specified");
142
    Preconditions.checkState(hostname != null, "No hostname specified");
133
    Preconditions.checkState(port != null, "No port specified");
143
    Preconditions.checkState(port != null, "No port specified");

    
   
144

   

    
   
145
    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");

    
   
146
    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +

    
   
147
        "h1", hostname + ":" + port);

    
   
148

   

    
   
149
    Integer batchSize = context.getInteger("batch-size");

    
   
150
    if (batchSize != null) {

    
   
151
      clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,

    
   
152
          String.valueOf(batchSize));

    
   
153
    }

    
   
154

   

    
   
155
    Long connectTimeout = context.getLong("connect-timeout");

    
   
156
    if (connectTimeout != null) {

    
   
157
      clientProps.setProperty(

    
   
158
          RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,

    
   
159
          String.valueOf(connectTimeout));

    
   
160
    }

    
   
161

   

    
   
162
    Long requestTimeout = context.getLong("request-timeout");

    
   
163
    if (requestTimeout != null) {

    
   
164
      clientProps.setProperty(

    
   
165
          RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,

    
   
166
          String.valueOf(requestTimeout));

    
   
167
    }
134
  }
168
  }
135

    
   
169

   
136
  /**
170
  /**
137
   * If this function is called successively without calling
171
   * If this function is called successively without calling
138
   * {@see #destroyConnection()}, only the first call has any effect.
172
   * {@see #destroyConnection()}, only the first call has any effect.
139
   * @throws FlumeException if an RPC client connection could not be opened
173
   * @throws FlumeException if an RPC client connection could not be opened
140
   */
174
   */
141
  private void createConnection() throws FlumeException {
175
  private void createConnection() throws FlumeException {
142

    
   
176

   
143
    if (client == null) {
177
    if (client == null) {
144
      logger.debug("Avro sink {}: Building RpcClient with hostname: {}, " +
178
      logger.info("Avro sink {}: Building RpcClient with hostname: {}, " +
145
          "port: {}, batchSize: {}",
179
          "port: {}",
146
          new Object[] { getName(), hostname, port, batchSize });
180
          new Object[] { getName(), hostname, port });
147

    
   
181

   
148
       client = RpcClientFactory.getDefaultInstance(hostname, port, batchSize);
182
       client = RpcClientFactory.getInstance(clientProps);

    
   
183
       logger.debug("Avro sink {}: Created RpcClient: {}", getName(), client);
149
    }
184
    }
150

    
   
185

   
151
  }
186
  }
152

    
   
187

   
153
  private void destroyConnection() {
188
  private void destroyConnection() {
[+20] [20] 39 lines
[+20] [+] private void verifyConnection() throws FlumeException {
193
    logger.info("Starting {}...", this);
228
    logger.info("Starting {}...", this);
194

    
   
229

   
195
    try {
230
    try {
196
      createConnection();
231
      createConnection();
197
    } catch (FlumeException e) {
232
    } catch (FlumeException e) {
198
      logger.warn("Unable to create avro client using hostname:" + hostname
233
      logger.warn("Unable to create avro client using hostname: " + hostname
199
          + ", port:" + port + ", batchSize: " + batchSize +
234
          + ", port: " + port, e);
200
          ". Exception follows.", e);

   
201

    
   
235

   
202
      /* Try to prevent leaking resources. */
236
      /* Try to prevent leaking resources. */
203
      destroyConnection();
237
      destroyConnection();
204
    }
238
    }
205

    
   
239

   
[+20] [20] 30 lines
[+20] [+] public Status process() throws EventDeliveryException {
236

    
   
270

   
237
      verifyConnection();
271
      verifyConnection();
238

    
   
272

   
239
      List<Event> batch = Lists.newLinkedList();
273
      List<Event> batch = Lists.newLinkedList();
240

    
   
274

   
241
      for (int i = 0; i < batchSize; i++) {
275
      for (int i = 0; i < client.getBatchSize(); i++) {
242
        Event event = channel.take();
276
        Event event = channel.take();
243

    
   
277

   
244
        if (event == null) {
278
        if (event == null) {
245
          counterGroup.incrementAndGet("batch.underflow");
279
          counterGroup.incrementAndGet("batch.underflow");
246
          break;
280
          break;
[+20] [20] 10 lines
[+20] public Status process() throws EventDeliveryException {
257
      }
291
      }
258

    
   
292

   
259
      transaction.commit();
293
      transaction.commit();
260
      counterGroup.incrementAndGet("batch.success");
294
      counterGroup.incrementAndGet("batch.success");
261

    
   
295

   
262
    } catch (ChannelException e) {
296
    } catch (Throwable t) {
263
      transaction.rollback();
297
      transaction.rollback();

    
   
298
      counterGroup.incrementAndGet("batch.failure");

    
   
299
      if (t instanceof Error) {

    
   
300
        throw (Error) t;

    
   
301
      } else if (t instanceof ChannelException) {
264
      logger.error("Avro Sink " + getName() + ": Unable to get event from" +
302
        logger.error("Avro Sink " + getName() + ": Unable to get event from" +
265
          " channel. Exception follows.", e);
303
            " channel " + channel.getName() + ". Exception follows.", t);
266
      status = Status.BACKOFF;
304
        status = Status.BACKOFF;
267

    
   
305
      } else {
268
    } catch (Exception ex) {

   
269
      transaction.rollback();

   
270
      destroyConnection();
306
        destroyConnection();
271
      throw new EventDeliveryException("Failed to send message", ex);
307
        throw new EventDeliveryException("Failed to send events", t);

    
   
308
      }
272
    } finally {
309
    } finally {
273
      transaction.close();
310
      transaction.close();
274
    }
311
    }
275

    
   
312

   
276
    return status;
313
    return status;
277
  }
314
  }
278

    
   
315

   
279
}
316
}
flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
Revision 3765924 New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 94f951f New Change
 
flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
Revision 606a4bd New Change
 
flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
Revision e304689 New Change
 
flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
Revision e19b093 New Change
 
flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
Revision e4f23a6 New Change
 
flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
Revision 77bf331 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java: Loading...
  3. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...
  4. flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java: Loading...
  5. flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java: Loading...
  6. flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java: Loading...
  7. flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java: Loading...
  8. flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java: Loading...