Review Board 1.7.22


FLUME-1897. Thrift Sink

Review Request #9381 - Created Feb. 8, 2013 and submitted

Hari Shreedharan
FLUME-1897
Reviewers
Flume
flume-git
Added Thrift Sink. Refactored the Avro Sink - moved the core processing logic to AbstractRpcSink and made Thrift and Avro Sinks simple sub-classes.
Added unit tests.
flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 * http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing,

    
   
13
 * software distributed under the License is distributed on an

    
   
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

    
   
15
 * KIND, either express or implied.  See the License for the

    
   
16
 * specific language governing permissions and limitations

    
   
17
 * under the License.

    
   
18
 */

    
   
19
package org.apache.flume.sink;

    
   
20

   

    
   
21
import com.google.common.base.Preconditions;

    
   
22
import com.google.common.collect.Lists;

    
   
23
import org.apache.flume.Channel;

    
   
24
import org.apache.flume.ChannelException;

    
   
25
import org.apache.flume.Context;

    
   
26
import org.apache.flume.Event;

    
   
27
import org.apache.flume.EventDeliveryException;

    
   
28
import org.apache.flume.FlumeException;

    
   
29
import org.apache.flume.Transaction;

    
   
30
import org.apache.flume.api.RpcClient;

    
   
31
import org.apache.flume.api.RpcClientConfigurationConstants;

    
   
32
import org.apache.flume.conf.Configurable;

    
   
33
import org.apache.flume.instrumentation.SinkCounter;

    
   
34
import org.slf4j.Logger;

    
   
35
import org.slf4j.LoggerFactory;

    
   
36

   

    
   
37
import java.util.List;

    
   
38
import java.util.Properties;

    
   
39

   

    
   
40
/**

    
   
41
 * This sink provides the basic RPC functionality for Flume. This sink takes

    
   
42
 * several arguments which are used in RPC.

    
   
43
 * This sink forms one half of Flume's tiered collection support. Events sent to

    
   
44
 * this sink are transported over the network to the hostname / port pair using

    
   
45
 * the RPC implementation encapsulated in {@link RpcClient}.

    
   
46
 * The destination is an instance of Flume's {@link org.apache.flume.source

    
   
47
 * .AvroSource} or {@link org.apache.flume.source.ThriftSource} (based on

    
   
48
 * which implementation of this class is used), which

    
   
49
 * allows Flume agents to forward to other Flume agents, forming a tiered

    
   
50
 * collection infrastructure. Of course, nothing prevents one from using this

    
   
51
 * sink to speak to other custom built infrastructure that implements the same

    
   
52
 * RPC protocol.

    
   
53
 * </p>

    
   
54
 * <p>

    
   
55
 * Events are taken from the configured {@link Channel} in batches of the

    
   
56
 * configured <tt>batch-size</tt>. The batch size has no theoretical limits

    
   
57
 * although all events in the batch <b>must</b> fit in memory. Generally, larger

    
   
58
 * batches are far more efficient, but introduce a slight delay (measured in

    
   
59
 * millis) in delivery. The batch behavior is such that underruns (i.e. batches

    
   
60
 * smaller than the configured batch size) are possible. This is a compromise

    
   
61
 * made to maintain low latency of event delivery. If the channel returns a null

    
   
62
 * event, meaning it is empty, the batch is immediately sent, regardless of

    
   
63
 * size. Batch underruns are tracked in the metrics. Empty batches do not incur

    
   
64
 * an RPC roundtrip.

    
   
65
 * </p>

    
   
66
 * <p>

    
   
67
 * <b>Configuration options</b>

    
   
68
 * </p>

    
   
69
 * <table>

    
   
70
 * <tr>

    
   
71
 * <th>Parameter</th>

    
   
72
 * <th>Description</th>

    
   
73
 * <th>Unit (data type)</th>

    
   
74
 * <th>Default</th>

    
   
75
 * </tr>

    
   
76
 * <tr>

    
   
77
 * <td><tt>hostname</tt></td>

    
   
78
 * <td>The hostname to which events should be sent.</td>

    
   
79
 * <td>Hostname or IP (String)</td>

    
   
80
 * <td>none (required)</td>

    
   
81
 * </tr>

    
   
82
 * <tr>

    
   
83
 * <td><tt>port</tt></td>

    
   
84
 * <td>The port to which events should be sent on <tt>hostname</tt>.</td>

    
   
85
 * <td>TCP port (int)</td>

    
   
86
 * <td>none (required)</td>

    
   
87
 * </tr>

    
   
88
 * <tr>

    
   
89
 * <td><tt>batch-size</tt></td>

    
   
90
 * <td>The maximum number of events to send per RPC.</td>

    
   
91
 * <td>events (int)</td>

    
   
92
 * <td>100</td>

    
   
93
 * </tr>

    
   
94
 * <tr>

    
   
95
 * <td><tt>connect-timeout</tt></td>

    
   
96
 * <td>Maximum time to wait for the first Avro handshake and RPC request</td>

    
   
97
 * <td>milliseconds (long)</td>

    
   
98
 * <td>20000</td>

    
   
99
 * </tr>

    
   
100
 * <tr>

    
   
101
 * <td><tt>request-timeout</tt></td>

    
   
102
 * <td>Maximum time to wait RPC requests after the first</td>

    
   
103
 * <td>milliseconds (long)</td>

    
   
104
 * <td>20000</td>

    
   
105
 * </tr>

    
   
106
 * </table>

    
   
107
 * <p>

    
   
108
 * <b>Metrics</b>

    
   
109
 * </p>

    
   
110
 * <p>

    
   
111
 * TODO

    
   
112
 * </p>

    
   
113
 *

    
   
114
 * <strong>Implementation Notes:</strong> Any implementation of this class

    
   
115
 * must override the {@linkplain #initializeRpcClient(Properties)} method.

    
   
116
 * This method will be called whenever this sink needs to create a new

    
   
117
 * connection to the source.

    
   
118
 */

    
   
119
public abstract class AbstractRpcSink extends AbstractSink

    
   
120
  implements Configurable {

    
   
121

   

    
   
122
  private static final Logger logger = LoggerFactory.getLogger

    
   
123
    (AbstractRpcSink.class);

    
   
124
  private String hostname;

    
   
125
  private Integer port;

    
   
126
  private RpcClient client;

    
   
127
  private Properties clientProps;

    
   
128
  private SinkCounter sinkCounter;

    
   
129

   

    
   
130
  @Override

    
   
131
  public void configure(Context context) {

    
   
132
    clientProps = new Properties();

    
   
133

   

    
   
134
    hostname = context.getString("hostname");

    
   
135
    port = context.getInteger("port");

    
   
136

   

    
   
137
    Preconditions.checkState(hostname != null, "No hostname specified");

    
   
138
    Preconditions.checkState(port != null, "No port specified");

    
   
139

   

    
   
140
    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");

    
   
141
    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +

    
   
142
        "h1", hostname + ":" + port);

    
   
143

   

    
   
144
    Integer batchSize = context.getInteger("batch-size");

    
   
145
    if (batchSize != null) {

    
   
146
      clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,

    
   
147
          String.valueOf(batchSize));

    
   
148
    }

    
   
149

   

    
   
150
    Long connectTimeout = context.getLong("connect-timeout");

    
   
151
    if (connectTimeout != null) {

    
   
152
      clientProps.setProperty(

    
   
153
          RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,

    
   
154
          String.valueOf(connectTimeout));

    
   
155
    }

    
   
156

   

    
   
157
    Long requestTimeout = context.getLong("request-timeout");

    
   
158
    if (requestTimeout != null) {

    
   
159
      clientProps.setProperty(

    
   
160
          RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,

    
   
161
          String.valueOf(requestTimeout));

    
   
162
    }

    
   
163

   

    
   
164
    if (sinkCounter == null) {

    
   
165
      sinkCounter = new SinkCounter(getName());

    
   
166
    }

    
   
167
  }

    
   
168

   

    
   
169
  /**

    
   
170
   * Returns a new {@linkplain RpcClient} instance configured using the given

    
   
171
   * {@linkplain Properties} object. This method is called whenever a new

    
   
172
   * connection needs to be created to the next hop.

    
   
173
   * @param props

    
   
174
   * @return

    
   
175
   */

    
   
176
  protected abstract RpcClient initializeRpcClient(Properties props);

    
   
177

   

    
   
178
  /**

    
   
179
   * If this function is called successively without calling

    
   
180
   * {@see #destroyConnection()}, only the first call has any effect.

    
   
181
   * @throws org.apache.flume.FlumeException if an RPC client connection could not be opened

    
   
182
   */

    
   
183
  private void createConnection() throws FlumeException {

    
   
184

   

    
   
185
    if (client == null) {

    
   
186
      logger.info("Rpc sink {}: Building RpcClient with hostname: {}, " +

    
   
187
          "port: {}",

    
   
188
          new Object[] { getName(), hostname, port });

    
   
189
      try {

    
   
190
        client = initializeRpcClient(clientProps);

    
   
191
        Preconditions.checkNotNull(client, "Rpc Client could not be " +

    
   
192
          "initialized. " + getName() + " could not be started");

    
   
193
        sinkCounter.incrementConnectionCreatedCount();

    
   
194
      } catch (Exception ex) {

    
   
195
        sinkCounter.incrementConnectionFailedCount();

    
   
196
        if (ex instanceof FlumeException) {

    
   
197
          throw (FlumeException) ex;

    
   
198
        } else {

    
   
199
          throw new FlumeException(ex);

    
   
200
        }

    
   
201
      }

    
   
202
       logger.debug("Rpc sink {}: Created RpcClient: {}", getName(), client);

    
   
203
    }

    
   
204

   

    
   
205
  }

    
   
206

   

    
   
207
  private void destroyConnection() {

    
   
208
    if (client != null) {

    
   
209
      logger.debug("Rpc sink {} closing Rpc client: {}", getName(), client);

    
   
210
      try {

    
   
211
        client.close();

    
   
212
        sinkCounter.incrementConnectionClosedCount();

    
   
213
      } catch (FlumeException e) {

    
   
214
        sinkCounter.incrementConnectionFailedCount();

    
   
215
        logger.error("Rpc sink " + getName() + ": Attempt to close Rpc " +

    
   
216
            "client failed. Exception follows.", e);

    
   
217
      }

    
   
218
    }

    
   
219

   

    
   
220
    client = null;

    
   
221
  }

    
   
222

   

    
   
223
  /**

    
   
224
   * Ensure the connection exists and is active.

    
   
225
   * If the connection is not active, destroy it and recreate it.

    
   
226
   *

    
   
227
   * @throws org.apache.flume.FlumeException If there are errors closing or opening the RPC

    
   
228
   * connection.

    
   
229
   */

    
   
230
  private void verifyConnection() throws FlumeException {

    
   
231
    if (client == null) {

    
   
232
      createConnection();

    
   
233
    } else if (!client.isActive()) {

    
   
234
      destroyConnection();

    
   
235
      createConnection();

    
   
236
    }

    
   
237
  }

    
   
238

   

    
   
239
  /**

    
   
240
   * The start() of RpcSink is more of an optimization that allows connection

    
   
241
   * to be created before the process() loop is started. In case it so happens

    
   
242
   * that the start failed, the process() loop will itself attempt to reconnect

    
   
243
   * as necessary. This is the expected behavior since it is possible that the

    
   
244
   * downstream source becomes unavailable in the middle of the process loop

    
   
245
   * and the sink will have to retry the connection again.

    
   
246
   */

    
   
247
  @Override

    
   
248
  public void start() {

    
   
249
    logger.info("Starting {}...", this);

    
   
250
    sinkCounter.start();

    
   
251
    try {

    
   
252
      createConnection();

    
   
253
    } catch (FlumeException e) {

    
   
254
      logger.warn("Unable to create Rpc client using hostname: " + hostname

    
   
255
          + ", port: " + port, e);

    
   
256

   

    
   
257
      /* Try to prevent leaking resources. */

    
   
258
      destroyConnection();

    
   
259
    }

    
   
260

   

    
   
261
    super.start();

    
   
262

   

    
   
263
    logger.info("Rpc sink {} started.", getName());

    
   
264
  }

    
   
265

   

    
   
266
  @Override

    
   
267
  public void stop() {

    
   
268
    logger.info("Rpc sink {} stopping...", getName());

    
   
269

   

    
   
270
    destroyConnection();

    
   
271
    sinkCounter.stop();

    
   
272
    super.stop();

    
   
273

   

    
   
274
    logger.info("Rpc sink {} stopped. Metrics: {}", getName(), sinkCounter);

    
   
275
  }

    
   
276

   

    
   
277
  @Override

    
   
278
  public String toString() {

    
   
279
    return "RpcSink " + getName() + " { host: " + hostname + ", port: " +

    
   
280
        port + " }";

    
   
281
  }

    
   
282

   

    
   
283
  @Override

    
   
284
  public Status process() throws EventDeliveryException {

    
   
285
    Status status = Status.READY;

    
   
286
    Channel channel = getChannel();

    
   
287
    Transaction transaction = channel.getTransaction();

    
   
288

   

    
   
289
    try {

    
   
290
      transaction.begin();

    
   
291

   

    
   
292
      verifyConnection();

    
   
293

   

    
   
294
      List<Event> batch = Lists.newLinkedList();

    
   
295

   

    
   
296
      for (int i = 0; i < client.getBatchSize(); i++) {

    
   
297
        Event event = channel.take();

    
   
298

   

    
   
299
        if (event == null) {

    
   
300
          break;

    
   
301
        }

    
   
302

   

    
   
303
        batch.add(event);

    
   
304
      }

    
   
305

   

    
   
306
      int size = batch.size();

    
   
307
      int batchSize = client.getBatchSize();

    
   
308

   

    
   
309
      if (size == 0) {

    
   
310
        sinkCounter.incrementBatchEmptyCount();

    
   
311
        status = Status.BACKOFF;

    
   
312
      } else {

    
   
313
        if (size < batchSize) {

    
   
314
          sinkCounter.incrementBatchUnderflowCount();

    
   
315
        } else {

    
   
316
          sinkCounter.incrementBatchCompleteCount();

    
   
317
        }

    
   
318
        sinkCounter.addToEventDrainAttemptCount(size);

    
   
319
        client.appendBatch(batch);

    
   
320
      }

    
   
321

   

    
   
322
      transaction.commit();

    
   
323
      sinkCounter.addToEventDrainSuccessCount(size);

    
   
324

   

    
   
325
    } catch (Throwable t) {

    
   
326
      transaction.rollback();

    
   
327
      if (t instanceof Error) {

    
   
328
        throw (Error) t;

    
   
329
      } else if (t instanceof ChannelException) {

    
   
330
        logger.error("Rpc Sink " + getName() + ": Unable to get event from" +

    
   
331
            " channel " + channel.getName() + ". Exception follows.", t);

    
   
332
        status = Status.BACKOFF;

    
   
333
      } else {

    
   
334
        destroyConnection();

    
   
335
        throw new EventDeliveryException("Failed to send events", t);

    
   
336
      }

    
   
337
    } finally {

    
   
338
      transaction.close();

    
   
339
    }

    
   
340

   

    
   
341
    return status;

    
   
342
  }

    
   
343
}
flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
Revision d4ddcbe New Change
 
flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
New File
 
flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
New File
 
  1. flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java: Loading...
  3. flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java: Loading...
  4. flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java: Loading...