Review Board 1.7.22


JMX support for Seq Generator Source

Review Request #11822 - Created June 12, 2013 and submitted

Sravya Tirukkovalur
FLUME-2082
Reviewers
Flume
brocknoland, hshreedharan, mpercy
flume-git
Seq Generator Source supports following metrics:
START
STOP
EVENTS_ACCEPTED
APPEND_BATCH_ACCEPTED
Ran existing tests
flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
Revision 0f85e87 New Change
[20] 26 lines
[+20]
27
import org.apache.flume.Event;
27
import org.apache.flume.Event;
28
import org.apache.flume.EventDeliveryException;
28
import org.apache.flume.EventDeliveryException;
29
import org.apache.flume.PollableSource;
29
import org.apache.flume.PollableSource;
30
import org.apache.flume.conf.Configurable;
30
import org.apache.flume.conf.Configurable;
31
import org.apache.flume.event.EventBuilder;
31
import org.apache.flume.event.EventBuilder;

    
   
32
import org.apache.flume.instrumentation.SourceCounter;
32
import org.slf4j.Logger;
33
import org.slf4j.Logger;
33
import org.slf4j.LoggerFactory;
34
import org.slf4j.LoggerFactory;
34

    
   
35

   
35
public class SequenceGeneratorSource extends AbstractSource implements
36
public class SequenceGeneratorSource extends AbstractSource implements
36
    PollableSource, Configurable {
37
    PollableSource, Configurable {
37

    
   
38

   
38
  private static final Logger logger = LoggerFactory
39
  private static final Logger logger = LoggerFactory
39
      .getLogger(SequenceGeneratorSource.class);
40
      .getLogger(SequenceGeneratorSource.class);
40

    
   
41

   
41
  private long sequence;
42
  private long sequence;
42
  private int batchSize;
43
  private int batchSize;
43
  private CounterGroup counterGroup;
44
  private SourceCounter sourceCounter;
44
  private List<Event> batchArrayList;
45
  private List<Event> batchArrayList;
45
  private long totalEvents;
46
  private long totalEvents;
46
  private long eventsSent = 0;
47
  private long eventsSent = 0;
47

    
   
48

   
48
  public SequenceGeneratorSource() {
49
  public SequenceGeneratorSource() {
49
    sequence = 0;
50
    sequence = 0;
50
    counterGroup = new CounterGroup();

   
51
  }
51
  }
52

    
   
52

   
53
  /**
53
  /**
54
   * Read parameters from context
54
   * Read parameters from context
55
   * <li>batchSize = type int that defines the size of event batches
55
   * <li>batchSize = type int that defines the size of event batches
56
   */
56
   */
57
  @Override
57
  @Override
58
  public void configure(Context context) {
58
  public void configure(Context context) {
59
    batchSize = context.getInteger("batchSize", 1);
59
    batchSize = context.getInteger("batchSize", 1);
60
    if (batchSize > 1) {
60
    if (batchSize > 1) {
61
      batchArrayList = new ArrayList<Event>(batchSize);
61
      batchArrayList = new ArrayList<Event>(batchSize);
62
    }
62
    }
63
    totalEvents = context.getLong("totalEvents", Long.MAX_VALUE);
63
    totalEvents = context.getLong("totalEvents", Long.MAX_VALUE);

    
   
64
    if (sourceCounter == null) {

    
   
65
      sourceCounter = new SourceCounter(getName());

    
   
66
    }
64
  }
67
  }
65

    
   
68

   
66
  @Override
69
  @Override
67
  public Status process() throws EventDeliveryException {
70
  public Status process() throws EventDeliveryException {
68

    
   
71

   
69
    Status status = Status.READY;
72
    Status status = Status.READY;
70
    int i = 0;
73
    int i = 0;
71
    try {
74
    try {
72
      if (batchSize <= 1) {
75
      if (batchSize <= 1) {
73
        if(eventsSent < totalEvents) {
76
        if(eventsSent < totalEvents) {
74
          getChannelProcessor().processEvent(
77
          getChannelProcessor().processEvent(
75
            EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
78
            EventBuilder.withBody(String.valueOf(sequence++).getBytes()));

    
   
79
          sourceCounter.incrementEventAcceptedCount();
76
          eventsSent++;
80
          eventsSent++;
77
        } else {
81
        } else {
78
          status = Status.BACKOFF;
82
          status = Status.BACKOFF;
79
        }
83
        }
80
      } else {
84
      } else {
[+20] [20] 7 lines
[+20] public Status process() throws EventDeliveryException {
88
            status = Status.BACKOFF;
92
            status = Status.BACKOFF;
89
          }
93
          }
90
        }
94
        }
91
        if(!batchArrayList.isEmpty()) {
95
        if(!batchArrayList.isEmpty()) {
92
          getChannelProcessor().processEventBatch(batchArrayList);
96
          getChannelProcessor().processEventBatch(batchArrayList);

    
   
97
          sourceCounter.incrementAppendBatchAcceptedCount();

    
   
98
          sourceCounter.addToEventAcceptedCount(batchArrayList.size());
93
        }
99
        }
94
      }
100
      }
95
      counterGroup.incrementAndGet("events.successful");
101

   
96
    } catch (ChannelException ex) {
102
    } catch (ChannelException ex) {
97
      counterGroup.incrementAndGet("events.failed");

   
98
      eventsSent -= i;
103
      eventsSent -= i;
99
      logger.error( getName() + " source could not write to channel.", ex);
104
      logger.error( getName() + " source could not write to channel.", ex);
100
    }
105
    }
101

    
   
106

   
102
    return status;
107
    return status;
103
  }
108
  }
104

    
   
109

   
105
  @Override
110
  @Override
106
  public void start() {
111
  public void start() {
107
    logger.info("Sequence generator source starting");
112
    logger.info("Sequence generator source starting");
108

    
   
113

   
109
    super.start();
114
    super.start();
110

    
   
115
    sourceCounter.start();
111
    logger.debug("Sequence generator source started");
116
    logger.debug("Sequence generator source started");
112
  }
117
  }
113

    
   
118

   
114
  @Override
119
  @Override
115
  public void stop() {
120
  public void stop() {
116
    logger.info("Sequence generator source stopping");
121
    logger.info("Sequence generator source stopping");
117

    
   
122

   
118
    super.stop();
123
    super.stop();

    
   
124
    sourceCounter.stop();
119

    
   
125

   
120
    logger.info("Sequence generator source stopped. Metrics:{}", counterGroup);
126
    logger.info("Sequence generator source stopped. Metrics:{}",getName(), sourceCounter);
121
  }
127
  }
122

    
   
128

   
123
}
129
}
  1. flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java: Loading...