Review Board 1.7.22


Add average events per second params to MBeans

Review Request #7112 - Created Sept. 14, 2012 and updated

Ted Malaska
trunk
FLUME-1411
Reviewers
Flume
flume-git
This is not a completely finished patch but an example of a possible solution. If you like the solution I'll finish the patch.

This solution supports two types of averages:
1. Averages since start
2. Rolling Averages (The rolling interval can be set through config properties)

I added two gets for these averages in sinkCounter.

I also added a junit that tests both of these averages and shows how quickly they deviate.
Please let me know what you think.

 
flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
Revision 6bc31ef New Change
[20] 20 lines
[+20]
21
import java.lang.management.ManagementFactory;
21
import java.lang.management.ManagementFactory;
22
import java.util.Collections;
22
import java.util.Collections;
23
import java.util.HashMap;
23
import java.util.HashMap;
24
import java.util.Iterator;
24
import java.util.Iterator;
25
import java.util.Map;
25
import java.util.Map;

    
   
26
import java.util.Map.Entry;

    
   
27
import java.util.Properties;
26
import java.util.concurrent.atomic.AtomicLong;
28
import java.util.concurrent.atomic.AtomicLong;
27

    
   
29

   
28
import javax.management.ObjectName;
30
import javax.management.ObjectName;
29

    
   
31

   

    
   
32
import org.apache.flume.Context;
30
import org.slf4j.Logger;
33
import org.slf4j.Logger;
31
import org.slf4j.LoggerFactory;
34
import org.slf4j.LoggerFactory;
32

    
   
35

   
33
public abstract class MonitoredCounterGroup {
36
public abstract class MonitoredCounterGroup {
34

    
   
37

   
35
  private static final Logger LOG =
38
  private static final Logger LOG =
36
      LoggerFactory.getLogger(MonitoredCounterGroup.class);
39
      LoggerFactory.getLogger(MonitoredCounterGroup.class);
37

    
   
40

   

    
   
41
  public static final String ROLLING_AVERAGE_INTERNAL_LENGTH_CONF = "rollingAverageIntervalLength";

    
   
42
  public static final String CALCULATE_ROLLING_AVERAGES = "calculateRollingAverages";

    
   
43

   

    
   
44
  private static final String LAST_ROLLING_AVERAGE_POST_FIX = ".lastRollingAvg";

    
   
45
  private static final String CURRENT_ROLLING_AVERAGE_POST_FIX = ".currentRollingAvg";

    
   
46

   
38
  private final Type type;
47
  private final Type type;
39
  private final String name;
48
  private final String name;
40
  private final Map<String, AtomicLong> counterMap;
49
  private Map<String, AtomicLong> counterMap;
41

    
   
50

   
42
  private AtomicLong startTime;
51
  private AtomicLong startTime;
43
  private AtomicLong stopTime;
52
  private AtomicLong stopTime;
44

    
   
53

   

    
   
54
  private boolean calculateRollingAverages = false;

    
   
55
  private long rollingAverageIntervalLength = 60000;

    
   
56
  private AtomicLong currentRollingAverageStartTime;
45

    
   
57

   
46
  protected MonitoredCounterGroup(Type type, String name, String... attrs) {
58
  protected MonitoredCounterGroup(Type type, String name, String... attrs) {
47
    this.type = type;
59
    this.type = type;
48
    this.name = name;
60
    this.name = name;
49

    
   
61

   
[+20] [20] 6 lines
[+20]
56

    
   
68

   
57
    counterMap = Collections.unmodifiableMap(counterInitMap);
69
    counterMap = Collections.unmodifiableMap(counterInitMap);
58

    
   
70

   
59
    startTime = new AtomicLong(0L);
71
    startTime = new AtomicLong(0L);
60
    stopTime = new AtomicLong(0L);
72
    stopTime = new AtomicLong(0L);

    
   
73
    currentRollingAverageStartTime = new AtomicLong(0L);
61

    
   
74

   
62
    try {
75
    try {
63
      ObjectName objName = new ObjectName("org.apache.flume."
76
      ObjectName objName = new ObjectName("org.apache.flume."
64
          + type.name().toLowerCase() + ":type=" + this.name);
77
          + type.name().toLowerCase() + ":type=" + this.name);
65

    
   
78

   
[+20] [20] 5 lines
[+20]
71
      LOG.error("Failed to register monitored counter group for type: "
84
      LOG.error("Failed to register monitored counter group for type: "
72
          + type + ", name: " + name, ex);
85
          + type + ", name: " + name, ex);
73
    }
86
    }
74
  }
87
  }
75

    
   
88

   

    
   
89
  public void init(Context context){

    
   
90
    rollingAverageIntervalLength = context.getLong("rollingAverageIntervalLength", rollingAverageIntervalLength);

    
   
91
    calculateRollingAverages = context.getBoolean("calculateRollingAverages", false);

    
   
92

   

    
   
93
    Map<String, AtomicLong> counterInitMap = new HashMap<String, AtomicLong>();

    
   
94

   

    
   
95
    if (calculateRollingAverages){

    
   
96
      for (String key: counterMap.keySet()){

    
   
97
        counterInitMap.put(key, new AtomicLong(0L));

    
   
98
        counterInitMap.put(key + LAST_ROLLING_AVERAGE_POST_FIX, new AtomicLong(0L));

    
   
99
        counterInitMap.put(key + CURRENT_ROLLING_AVERAGE_POST_FIX, new AtomicLong(0L));

    
   
100
      }

    
   
101
    }

    
   
102

   

    
   
103
    counterMap = Collections.unmodifiableMap(counterInitMap);

    
   
104
  }

    
   
105

   
76
  public void start() {
106
  public void start() {
77
    stopTime.set(0L);
107
    stopTime.set(0L);
78
    for (String counter : counterMap.keySet()) {
108
    for (String counter : counterMap.keySet()) {
79
      counterMap.get(counter).set(0L);
109
      counterMap.get(counter).set(0L);
80
    }
110
    }
81
    startTime.set(System.currentTimeMillis());
111
    startTime.set(System.currentTimeMillis());

    
   
112
    currentRollingAverageStartTime.set(startTime.get());
82
    LOG.info("Component type: " + type + ", name: " + name + " started");
113
    LOG.info("Component type: " + type + ", name: " + name + " started");
83
  }
114
  }
84

    
   
115

   
85
  public void stop() {
116
  public void stop() {
86
    stopTime.set(System.currentTimeMillis());
117
    stopTime.set(System.currentTimeMillis());
[+20] [20] 36 lines
[+20] [+] protected long get(String counter) {
123
  protected void set(String counter, long value) {
154
  protected void set(String counter, long value) {
124
    counterMap.get(counter).set(value);
155
    counterMap.get(counter).set(value);
125
  }
156
  }
126

    
   
157

   
127
  protected long addAndGet(String counter, long delta) {
158
  protected long addAndGet(String counter, long delta) {

    
   
159
    addToRollingAverage(counter, delta);
128
    return counterMap.get(counter).addAndGet(delta);
160
    return counterMap.get(counter).addAndGet(delta);
129
  }
161
  }
130

    
   
162

   
131
  protected long increment(String counter) {
163
  protected long increment(String counter) {

    
   
164
    addToRollingAverage(counter, 1);
132
    return counterMap.get(counter).incrementAndGet();
165
    return counterMap.get(counter).incrementAndGet();
133
  }
166
  }
134

    
   
167

   

    
   
168
  private void addToRollingAverage(String counter, long delta) {

    
   
169
    if (calculateRollingAverages) {

    
   
170
      getAndRollCurrentAverageCounter(counter).addAndGet(delta);

    
   
171
    }

    
   
172
  }

    
   
173

   

    
   
174
  private AtomicLong getAndRollCurrentAverageCounter(String counter) {

    
   
175
    if (calculateRollingAverages) {

    
   
176
      if (System.currentTimeMillis() - currentRollingAverageStartTime.get() >  rollingAverageIntervalLength) {

    
   
177
        if (System.currentTimeMillis() - currentRollingAverageStartTime.get() <=  rollingAverageIntervalLength * 2) {

    
   
178
          counterMap.get(counter + LAST_ROLLING_AVERAGE_POST_FIX).set(counterMap.get(counter + CURRENT_ROLLING_AVERAGE_POST_FIX).get());

    
   
179
        } else {

    
   
180
          counterMap.get(counter + LAST_ROLLING_AVERAGE_POST_FIX).set(0);

    
   
181
        }

    
   
182
        counterMap.get(counter + CURRENT_ROLLING_AVERAGE_POST_FIX).set(0);

    
   
183
        currentRollingAverageStartTime.set(System.currentTimeMillis());

    
   
184
      }

    
   
185
      return counterMap.get(counter + CURRENT_ROLLING_AVERAGE_POST_FIX);

    
   
186
    }

    
   
187
    return null;

    
   
188
  }

    
   
189

   

    
   
190
  protected double getAverageSinceStart(String counter) {

    
   
191
    double numOfSecondsPassed = (System.currentTimeMillis() - startTime.get())/1000.0;

    
   
192
    return counterMap.get(counter).get() / numOfSecondsPassed;

    
   
193
  }

    
   
194

   

    
   
195
  protected double getRollingAverage(String counter) {

    
   
196
    if (calculateRollingAverages)

    
   
197
    {

    
   
198
      getAndRollCurrentAverageCounter(counter);

    
   
199

   

    
   
200
      return counterMap.get(counter + LAST_ROLLING_AVERAGE_POST_FIX).get() / (rollingAverageIntervalLength/1000.0);

    
   
201
    } else {

    
   
202
      LOG.warn("Turning to call Rolling Average will property " + CALCULATE_ROLLING_AVERAGES + " is not set to true.");

    
   
203
      return -1;

    
   
204
    }

    
   
205
  }

    
   
206

   
135
  public static enum Type {
207
  public static enum Type {
136
    SOURCE,
208
    SOURCE,
137
    CHANNEL_PROCESSOR,
209
    CHANNEL_PROCESSOR,
138
    CHANNEL,
210
    CHANNEL,
139
    SINK_PROCESSOR,
211
    SINK_PROCESSOR,
140
    SINK
212
    SINK
141
  };
213
  };
142

    
   
214

   
143
  public String getType(){
215
  public String getType(){
144
    return type.name();
216
    return type.name();
145
  }
217
  }
146
}
218
}
flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
Revision 41b28cf New Change
 
flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java
Revision 0a730e9 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java: Loading...
  3. flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java: Loading...