Review Board 1.7.22


FLUME-1980. Log4jAppender should optionally drop events if append fails

Review Request #10412 - Created April 10, 2013 and updated

Hari Shreedharan
FLUME-1980
Reviewers
Flume
flume-git
Added a new "UnsafeMode" to the log4jappender and load balancing lo4j appender.
Added unit tests.
flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
Revision 9fb115e New Change
[20] 23 lines
[+20]
24
import org.apache.flume.FlumeException;
24
import org.apache.flume.FlumeException;
25
import org.apache.flume.api.RpcClientConfigurationConstants;
25
import org.apache.flume.api.RpcClientConfigurationConstants;
26
import org.apache.flume.api.RpcClientFactory;
26
import org.apache.flume.api.RpcClientFactory;
27
import org.apache.flume.api.RpcClientFactory.ClientType;
27
import org.apache.flume.api.RpcClientFactory.ClientType;
28
import org.apache.log4j.helpers.LogLog;
28
import org.apache.log4j.helpers.LogLog;

    
   
29
import org.apache.log4j.spi.LoggingEvent;
29

    
   
30

   
30
/**
31
/**
31
 *
32
 *
32
 * Appends Log4j Events to an external Flume client which is decribed by the
33
 * Appends Log4j Events to an external Flume client which is decribed by the
33
 * Log4j configuration file. The appender takes the following required
34
 * Log4j configuration file. The appender takes the following required
[+20] [20] 46 lines
[+20]
80
public class LoadBalancingLog4jAppender extends Log4jAppender {
81
public class LoadBalancingLog4jAppender extends Log4jAppender {
81

    
   
82

   
82
  private String hosts;
83
  private String hosts;
83
  private String selector;
84
  private String selector;
84
  private String maxBackoff;
85
  private String maxBackoff;

    
   
86
  private boolean configured = false;
85

    
   
87

   
86
  public void setHosts(String hostNames) {
88
  public void setHosts(String hostNames) {
87
    this.hosts = hostNames;
89
    this.hosts = hostNames;
88
  }
90
  }
89

    
   
91

   
90
  public void setSelector(String selector) {
92
  public void setSelector(String selector) {
91
    this.selector = selector;
93
    this.selector = selector;
92
  }
94
  }
93

    
   
95

   
94
  public void setMaxBackoff(String maxBackoff) {
96
  public void setMaxBackoff(String maxBackoff) {
95
    this.maxBackoff = maxBackoff;
97
    this.maxBackoff = maxBackoff;
96
  }
98
  }
97

    
   
99

   

    
   
100
  @Override

    
   
101
  public synchronized void append(LoggingEvent event) {

    
   
102
    if(!configured) {

    
   
103
      String errorMsg = "Flume Log4jAppender not configured correctly! Cannot" +

    
   
104
        " send events to Flume.";

    
   
105
      LogLog.error(errorMsg);

    
   
106
      if(getUnsafeMode()) {

    
   
107
        return;

    
   
108
      }

    
   
109
      throw new FlumeException(errorMsg);

    
   
110
    }

    
   
111
    super.append(event);

    
   
112
  }

    
   
113

   
98
  /**
114
  /**
99
   * Activate the options set using <tt>setHosts()</tt>, <tt>setSelector</tt>
115
   * Activate the options set using <tt>setHosts()</tt>, <tt>setSelector</tt>
100
   * and <tt>setMaxBackoff</tt>
116
   * and <tt>setMaxBackoff</tt>
101
   *
117
   *
102
   * @throws FlumeException
118
   * @throws FlumeException
103
   *           if the LoadBalancingRpcClient cannot be instantiated.
119
   *           if the LoadBalancingRpcClient cannot be instantiated.
104
   */
120
   */
105
  @Override
121
  @Override
106
  public void activateOptions() throws FlumeException {
122
  public void activateOptions() throws FlumeException {
107
    try {
123
    try {
108
      final Properties properties = getProperties(hosts, selector, maxBackoff);
124
      final Properties properties = getProperties(hosts, selector, maxBackoff);
109
      rpcClient = RpcClientFactory.getInstance(properties);
125
      rpcClient = RpcClientFactory.getInstance(properties);
110
    } catch (FlumeException e) {
126
      if(layout != null) {

    
   
127
        layout.activateOptions();

    
   
128
      }

    
   
129
      configured = true;

    
   
130
    } catch (Exception e) {
111
      String errormsg = "RPC client creation failed! " + e.getMessage();
131
      String errormsg = "RPC client creation failed! " + e.getMessage();
112
      LogLog.error(errormsg);
132
      LogLog.error(errormsg);
113
      throw e;
133
      if (getUnsafeMode()) {

    
   
134
        return;
114
    }
135
      }

    
   
136
      throw new FlumeException(e);

    
   
137
    }

    
   
138

   
115
  }
139
  }
116

    
   
140

   
117
  private Properties getProperties(String hosts, String selector,
141
  private Properties getProperties(String hosts, String selector,
118
      String maxBackoff) throws FlumeException {
142
      String maxBackoff) throws FlumeException {
119

    
   
143

   
120
    if (StringUtils.isEmpty(hosts)) {
144
    if (StringUtils.isEmpty(hosts)) {
121
      throw new IllegalArgumentException("hosts must not be null");
145
      throw new FlumeException("hosts must not be null");
122
    }
146
    }
123

    
   
147

   
124
    Properties props = new Properties();
148
    Properties props = new Properties();
125
    String[] hostsAndPorts = hosts.split("\\s+");
149
    String[] hostsAndPorts = hosts.split("\\s+");
126
    StringBuilder names = new StringBuilder();
150
    StringBuilder names = new StringBuilder();
[+20] [20] 12 lines
[+20] private Properties getProperties(String hosts, String selector,
139
    }
163
    }
140

    
   
164

   
141
    if (!StringUtils.isEmpty(maxBackoff)) {
165
    if (!StringUtils.isEmpty(maxBackoff)) {
142
      long millis = Long.parseLong(maxBackoff.trim());
166
      long millis = Long.parseLong(maxBackoff.trim());
143
      if (millis <= 0) {
167
      if (millis <= 0) {
144
        throw new IllegalArgumentException(
168
        throw new FlumeException(
145
            "Misconfigured max backoff, value must be greater than 0");
169
            "Misconfigured max backoff, value must be greater than 0");
146
      }
170
      }
147
      props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF,
171
      props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF,
148
          String.valueOf(true));
172
          String.valueOf(true));
149
      props.put(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, maxBackoff);
173
      props.put(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, maxBackoff);
150
    }
174
    }
151
    return props;
175
    return props;
152
  }
176
  }
153
}
177
}
flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
Revision d61f807 New Change
 
flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
Revision 657af67 New Change
 
flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
Revision de88730 New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 693c0d7 New Change
 
  1. flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java: Loading...
  2. flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java: Loading...
  3. flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java: Loading...
  4. flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java: Loading...
  5. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...