Review Board 1.7.22


FLUME-1870. Flume sends non-numeric values with type as float to Ganglia causing ganglia to crash

Review Request #9075 - Created Jan. 23, 2013 and submitted

Hari Shreedharan
FLUME-1870
Reviewers
Flume
flume-git
Added code to convert the value to float, if it fails, the type is set as string.
All unit tests pass.

Diff revision 2 (Latest)

1 2
1 2

  1. flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java: Loading...
flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
Revision 8d34fee New Change
[20] 15 lines
[+20]
16
 * specific language governing permissions and limitations
16
 * specific language governing permissions and limitations
17
 * under the License.
17
 * under the License.
18
 */
18
 */
19
package org.apache.flume.instrumentation;
19
package org.apache.flume.instrumentation;
20

    
   
20

   
21
import com.google.common.base.Throwables;

   
22
import java.lang.management.ManagementFactory;

   
23
import java.net.DatagramPacket;
21
import java.net.DatagramPacket;
24
import java.net.DatagramSocket;
22
import java.net.DatagramSocket;
25
import java.net.InetAddress;
23
import java.net.InetAddress;
26
import java.net.InetSocketAddress;
24
import java.net.InetSocketAddress;
27
import java.net.SocketAddress;
25
import java.net.SocketAddress;
28
import java.net.SocketException;
26
import java.net.SocketException;
29
import java.util.ArrayList;
27
import java.util.ArrayList;
30
import java.util.List;
28
import java.util.List;
31
import java.util.Map;
29
import java.util.Map;
32
import java.util.Set;

   
33
import java.util.concurrent.Executors;
30
import java.util.concurrent.Executors;
34
import java.util.concurrent.ScheduledExecutorService;
31
import java.util.concurrent.ScheduledExecutorService;
35
import java.util.concurrent.TimeUnit;
32
import java.util.concurrent.TimeUnit;
36
import javax.management.Attribute;

   
37
import javax.management.AttributeList;

   
38
import javax.management.MBeanAttributeInfo;

   
39
import javax.management.MBeanServer;

   
40
import javax.management.ObjectInstance;

   
41
import org.apache.flume.Context;
33
import org.apache.flume.Context;
42
import org.apache.flume.FlumeException;
34
import org.apache.flume.FlumeException;
43
import org.apache.flume.api.HostInfo;
35
import org.apache.flume.api.HostInfo;
44
import org.apache.flume.conf.ConfigurationException;
36
import org.apache.flume.conf.ConfigurationException;
45
import org.apache.flume.instrumentation.util.JMXPollUtil;
37
import org.apache.flume.instrumentation.util.JMXPollUtil;
[+20] [20] 191 lines
[+20] [+] public boolean isGanglia3() {
237
  protected void createGangliaMessage(String name, String value) {
229
  protected void createGangliaMessage(String name, String value) {
238
    logger.debug("Sending ganglia3 formatted message."
230
    logger.debug("Sending ganglia3 formatted message."
239
            + name + ": " + value);
231
            + name + ": " + value);
240
    name = hostname + "." + name;
232
    name = hostname + "." + name;
241
    xdr_int(0);
233
    xdr_int(0);
242
    xdr_string("float");
234
    String type = "string";

    
   
235
    try {

    
   
236
      Float.parseFloat(value);

    
   
237
      type = "float";

    
   
238
    } catch (NumberFormatException ex) {

    
   
239
      // The param is a string, and so leave the type as is.

    
   
240
    }

    
   
241
    xdr_string(type); // metric type
243
    xdr_string(name);
242
    xdr_string(name);
244
    xdr_string(value);
243
    xdr_string(value);
245
    xdr_string(DEFAULT_UNITS);
244
    xdr_string(DEFAULT_UNITS);
246
    xdr_int(DEFAULT_SLOPE);
245
    xdr_int(DEFAULT_SLOPE);
247
    xdr_int(DEFAULT_TMAX);
246
    xdr_int(DEFAULT_TMAX);
[+20] [20] 5 lines
[+20] [+] protected void createGangliaMessage31(String name, String value) {
253
            + name + ": " + value);
252
            + name + ": " + value);
254
    xdr_int(128); // metric_id = metadata_msg
253
    xdr_int(128); // metric_id = metadata_msg
255
    xdr_string(hostname); // hostname
254
    xdr_string(hostname); // hostname
256
    xdr_string(name); // metric name
255
    xdr_string(name); // metric name
257
    xdr_int(0); // spoof = False
256
    xdr_int(0); // spoof = False
258
    xdr_string("float"); // metric type
257
    String type = "string";

    
   
258
    try {

    
   
259
      Float.parseFloat(value);

    
   
260
      type = "float";

    
   
261
    } catch (NumberFormatException ex) {

    
   
262
      // The param is a string, and so leave the type as is.

    
   
263
    }

    
   
264
    xdr_string(type); // metric type
259
    xdr_string(name); // metric name
265
    xdr_string(name); // metric name
260
    xdr_string(DEFAULT_UNITS); // units
266
    xdr_string(DEFAULT_UNITS); // units
261
    xdr_int(DEFAULT_SLOPE); // slope
267
    xdr_int(DEFAULT_SLOPE); // slope
262
    xdr_int(DEFAULT_TMAX); // tmax, the maximum time between metrics
268
    xdr_int(DEFAULT_TMAX); // tmax, the maximum time between metrics
263
    xdr_int(DEFAULT_DMAX); // dmax, the maximum data value
269
    xdr_int(DEFAULT_DMAX); // dmax, the maximum data value
[+20] [20] 59 lines
[+20] [+] public void configure(Context context) {
323
   *
329
   *
324
   */
330
   */
325
  protected class GangliaCollector implements Runnable {
331
  protected class GangliaCollector implements Runnable {
326

    
   
332

   
327
    private GangliaServer server;
333
    private GangliaServer server;
328
    private final MBeanServer mbeanServer = ManagementFactory.

   
329
            getPlatformMBeanServer();

   
330

    
   
334

   
331
    @Override
335
    @Override
332
    public void run() {
336
    public void run() {
333
      try {
337
      try {
334
        Map<String, Map<String, String>> metricsMap =
338
        Map<String, Map<String, String>> metricsMap =
[+20] [20] 22 lines
  1. flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java: Loading...