Review Board 1.7.22


FLUME-1940, FLUME-1957 - Logging Snapshots of Flume Metrics on Shutdown

Review Request #10416 - Created April 11, 2013 and updated

Israel Ekpo
flume-1.4
FLUME-1940, FLUME-1957
Reviewers
Flume
flume-git
Logging Metrics from the stop() method for the following components: ChannelCounter, ChannelProcessorCounter, SinkCounter, SinkProcessorCounter, SourceCounter


 

Diff revision 3 (Latest)

1 2 3
1 2 3

  1. flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java: Loading...
flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
Revision 502fe9e New Change
[20] 16 lines
[+20]
17
 * under the License.
17
 * under the License.
18
 */
18
 */
19
package org.apache.flume.instrumentation;
19
package org.apache.flume.instrumentation;
20

    
   
20

   
21
import java.lang.management.ManagementFactory;
21
import java.lang.management.ManagementFactory;

    
   
22
import java.util.ArrayList;
22
import java.util.Collections;
23
import java.util.Collections;
23
import java.util.HashMap;
24
import java.util.HashMap;
24
import java.util.Iterator;
25
import java.util.Iterator;

    
   
26
import java.util.List;
25
import java.util.Map;
27
import java.util.Map;
26
import java.util.concurrent.atomic.AtomicLong;
28
import java.util.concurrent.atomic.AtomicLong;
27

    
   
29

   
28
import javax.management.ObjectName;
30
import javax.management.ObjectName;
29

    
   
31

   
30
import org.slf4j.Logger;
32
import org.slf4j.Logger;
31
import org.slf4j.LoggerFactory;
33
import org.slf4j.LoggerFactory;
32

    
   
34

   

    
   
35
/**

    
   
36
 * Used for keeping track of internal metrics using atomic integers</p>

    
   
37
 *

    
   
38
 * This is used by a variety of component types such as Sources, Channels,

    
   
39
 * Sinks, SinkProcessors, ChannelProcessors, Interceptors and Serializers.

    
   
40
 */
33
public abstract class MonitoredCounterGroup {
41
public abstract class MonitoredCounterGroup {
34

    
   
42

   
35
  private static final Logger LOG =
43
  private static final Logger logger =
36
      LoggerFactory.getLogger(MonitoredCounterGroup.class);
44
      LoggerFactory.getLogger(MonitoredCounterGroup.class);
37

    
   
45

   

    
   
46
  // Key for component's start time in MonitoredCounterGroup.counterMap

    
   
47
  private static final String COUNTER_GROUP_START_TIME = "start.time";

    
   
48

   

    
   
49
  // key for component's stop time in MonitoredCounterGroup.counterMap

    
   
50
  private static final String COUNTER_GROUP_STOP_TIME = "stop.time";

    
   
51

   
38
  private final Type type;
52
  private final Type type;
39
  private final String name;
53
  private final String name;
40
  private final Map<String, AtomicLong> counterMap;
54
  private final Map<String, AtomicLong> counterMap;
41

    
   
55

   
42
  private AtomicLong startTime;
56
  private AtomicLong startTime;
[+20] [20] 17 lines
[+20]
60
    startTime = new AtomicLong(0L);
74
    startTime = new AtomicLong(0L);
61
    stopTime = new AtomicLong(0L);
75
    stopTime = new AtomicLong(0L);
62

    
   
76

   
63
  }
77
  }
64

    
   
78

   

    
   
79
  /**

    
   
80
   * Starts the component

    
   
81
   *

    
   
82
   * Initializes the values for the stop time as well as all the keys in the

    
   
83
   * internal map to zero and sets the start time to the current time in

    
   
84
   * milliseconds since midnight January 1, 1970 UTC

    
   
85
   */
65
  public void start() {
86
  public void start() {
66

    
   
87

   
67
    register();
88
    register();
68
    stopTime.set(0L);
89
    stopTime.set(0L);
69
    for (String counter : counterMap.keySet()) {
90
    for (String counter : counterMap.keySet()) {
70
      counterMap.get(counter).set(0L);
91
      counterMap.get(counter).set(0L);
71
    }
92
    }
72
    startTime.set(System.currentTimeMillis());
93
    startTime.set(System.currentTimeMillis());
73
    LOG.info("Component type: " + type + ", name: " + name + " started");
94
    logger.info("Component type: " + type + ", name: " + name + " started");
74
  }
95
  }
75

    
   
96

   
76
  /**
97
  /**
77
   * Registers the counter. This method should be used only for testing, and
98
   * Registers the counter. This method should be used only for testing, and
78
   * there should be no need for any implementations to directly call this
99
   * there should be no need for any implementations to directly call this
[+20] [20] 5 lines
[+20] public void start() {
84
        ObjectName objName = new ObjectName("org.apache.flume."
105
        ObjectName objName = new ObjectName("org.apache.flume."
85
                + type.name().toLowerCase() + ":type=" + this.name);
106
                + type.name().toLowerCase() + ":type=" + this.name);
86

    
   
107

   
87
        ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName);
108
        ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName);
88
        registered = true;
109
        registered = true;
89
        LOG.info("Monitoried counter group for type: " + type + ", name: " + name
110
        logger.info("Monitoried counter group for type: " + type + ", name: " + name
90
                + ", registered successfully.");
111
                + ", registered successfully.");
91
      } catch (Exception ex) {
112
      } catch (Exception ex) {
92
        LOG.error("Failed to register monitored counter group for type: "
113
        logger.error("Failed to register monitored counter group for type: "
93
                + type + ", name: " + name, ex);
114
                + type + ", name: " + name, ex);
94
      }
115
      }
95
    }
116
    }
96
  }
117
  }
97

    
   
118

   

    
   
119
  /**

    
   
120
   * Shuts Down the Component

    
   
121
   *

    
   
122
   * Used to indicate that the component is shutting down.

    
   
123
   *

    
   
124
   * Sets the stop time and then prints out the metrics from

    
   
125
   * the internal map of keys to values for the following components:

    
   
126
   *

    
   
127
   * - ChannelCounter

    
   
128
   * - ChannelProcessorCounter

    
   
129
   * - SinkCounter

    
   
130
   * - SinkProcessorCounter

    
   
131
   * - SourceCounter

    
   
132
   */
98
  public void stop() {
133
  public void stop() {

    
   
134

   

    
   
135
    // Sets the stopTime for the component as the current time in milliseconds
99
    stopTime.set(System.currentTimeMillis());
136
    stopTime.set(System.currentTimeMillis());
100
    LOG.info("Component type: " + type + ", name: " + name + " stopped");
137

   

    
   
138
    // Prints out a message indicating that this component has been stopped

    
   
139
    logger.info("Component type: " + type + ", name: " + name + " stopped");

    
   
140

   

    
   
141
    // Retrieve the type for this counter group

    
   
142
    final String typePrefix = type.name().toLowerCase();

    
   
143

   

    
   
144
    // Print out the startTime for this component

    
   
145
    logger.info("Shutdown Metric for type: " + type + ", "

    
   
146
      + "name: " + name + ". "

    
   
147
      + typePrefix + "." + COUNTER_GROUP_START_TIME

    
   
148
      + " == " + startTime);

    
   
149

   

    
   
150
    // Print out the stopTime for this component

    
   
151
    logger.info("Shutdown Metric for type: " + type + ", "

    
   
152
      + "name: " + name + ". "

    
   
153
      + typePrefix + "." + COUNTER_GROUP_STOP_TIME

    
   
154
      + " == " + stopTime);

    
   
155

   

    
   
156
    // Retrieve and sort counter group map keys

    
   
157
    final List<String> mapKeys = new ArrayList<String>(counterMap.keySet());

    
   
158

   

    
   
159
    Collections.sort(mapKeys);

    
   
160

   

    
   
161
    // Cycle through and print out all the key value pairs in counterMap

    
   
162
    for (final String counterMapKey : mapKeys) {

    
   
163

   

    
   
164
      // Retrieves the value from the original counterMap.

    
   
165
      final long counterMapValue = get(counterMapKey);

    
   
166

   

    
   
167
      logger.info("Shutdown Metric for type: " + type + ", "

    
   
168
        + "name: " + name + ". "

    
   
169
        + counterMapKey + " == " + counterMapValue);

    
   
170
    }
101
  }
171
  }
102

    
   
172

   

    
   
173
  /**

    
   
174
   * Returns when this component was first started

    
   
175
   *

    
   
176
   * @return

    
   
177
   */
103
  public long getStartTime() {
178
  public long getStartTime() {
104
    return startTime.get();
179
    return startTime.get();
105
  }
180
  }
106

    
   
181

   

    
   
182
  /**

    
   
183
   * Returns when this component was stopped

    
   
184
   *

    
   
185
   * @return

    
   
186
   */
107
  public long getStopTime() {
187
  public long getStopTime() {
108
    return stopTime.get();
188
    return stopTime.get();
109
  }
189
  }
110

    
   
190

   
111
  @Override
191
  @Override
[+20] [20] 15 lines
[+20] [+] public final String toString() {
127

    
   
207

   
128
    return sb.toString();
208
    return sb.toString();
129
  }
209
  }
130

    
   
210

   
131

    
   
211

   

    
   
212
  /**

    
   
213
   * Retrieves the current value for this key

    
   
214
   *

    
   
215
   * @param counter The key for this metric

    
   
216
   * @return The current value for this key

    
   
217
   */
132
  protected long get(String counter) {
218
  protected long get(String counter) {
133
    return counterMap.get(counter).get();
219
    return counterMap.get(counter).get();
134
  }
220
  }
135

    
   
221

   

    
   
222
  /**

    
   
223
   * Sets the value for this key to the given value

    
   
224
   *

    
   
225
   * @param counter The key for this metric

    
   
226
   * @param value The new value for this key

    
   
227
   */
136
  protected void set(String counter, long value) {
228
  protected void set(String counter, long value) {
137
    counterMap.get(counter).set(value);
229
    counterMap.get(counter).set(value);
138
  }
230
  }
139

    
   
231

   

    
   
232
  /**

    
   
233
   * Atomically adds the delta to the current value for this key

    
   
234
   *

    
   
235
   * @param counter The key for this metric

    
   
236
   * @param delta

    
   
237
   * @return The updated value for this key

    
   
238
   */
140
  protected long addAndGet(String counter, long delta) {
239
  protected long addAndGet(String counter, long delta) {
141
    return counterMap.get(counter).addAndGet(delta);
240
    return counterMap.get(counter).addAndGet(delta);
142
  }
241
  }
143

    
   
242

   

    
   
243
  /**

    
   
244
   * Atomically increments the current value for this key by one

    
   
245
   *

    
   
246
   * @param counter The key for this metric

    
   
247
   * @return The updated value for this key

    
   
248
   */
144
  protected long increment(String counter) {
249
  protected long increment(String counter) {
145
    return counterMap.get(counter).incrementAndGet();
250
    return counterMap.get(counter).incrementAndGet();
146
  }
251
  }
147

    
   
252

   

    
   
253
  /**

    
   
254
   * Component Enum Constants

    
   
255
   *

    
   
256
   * Used by each component's constructor to distinguish which type the

    
   
257
   * component is.

    
   
258
   */
148
  public static enum Type {
259
  public static enum Type {
149
    SOURCE,
260
    SOURCE,
150
    CHANNEL_PROCESSOR,
261
    CHANNEL_PROCESSOR,
151
    CHANNEL,
262
    CHANNEL,
152
    SINK_PROCESSOR,
263
    SINK_PROCESSOR,
[+20] [20] 10 lines
  1. flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java: Loading...