Review Board 1.7.22


FLUME-1765 - Add Load Balancing Support to Log4jAppender

Review Request #8328 - Created Dec. 3, 2012 and updated

Cameron Gandevia
flume-1.4.0
Reviewers
Flume
flume-git
The Log4jAppender should be extended to use the LoadBalancingRpcClient allowing users to configure a load balancing log4jappender

 
flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 * http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing,

    
   
13
 * software distributed under the License is distributed on an

    
   
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

    
   
15
 * KIND, either express or implied.  See the License for the

    
   
16
 * specific language governing permissions and limitations

    
   
17
 * under the License.

    
   
18
 */

    
   
19
package org.apache.flume.clients.log4jappender;

    
   
20

   

    
   
21
import java.util.Properties;

    
   
22

   

    
   
23
import org.apache.commons.lang.StringUtils;

    
   
24
import org.apache.flume.FlumeException;

    
   
25
import org.apache.flume.api.RpcClientConfigurationConstants;

    
   
26
import org.apache.flume.api.RpcClientFactory;

    
   
27
import org.apache.flume.api.RpcClientFactory.ClientType;

    
   
28
import org.apache.log4j.helpers.LogLog;

    
   
29

   

    
   
30
/**

    
   
31
 *

    
   
32
 * Appends Log4j Events to an external Flume client which is decribed by the

    
   
33
 * Log4j configuration file. The appender takes the following required

    
   
34
 * parameters:

    
   
35
 * <p>

    
   
36
 * <strong>Hosts</strong> : A space separated list of host:port of the first hop

    
   
37
 * at which Flume (through an AvroSource) is listening for events.

    
   
38
 * </p>

    
   
39
 * <p>

    
   
40
 * <strong>Selector</strong> : Selection mechanism. Must be either ROUND_ROBIN,

    
   
41
 * RANDOM or custom FQDN to class that inherits from LoadBalancingSelector. If

    
   
42
 * empty defaults to ROUND_ROBIN

    
   
43
 * </p>

    
   
44
 * The appender also takes the following optional parameters:

    
   
45
 * <p>

    
   
46
 * <strong>MaxBackoff</strong> : A long value representing the maximum amount of

    
   
47
 * time in milliseconds the Load balancing client will backoff from a node that

    
   
48
 * has failed to consume an event

    
   
49
 * </p>

    
   
50
 * A sample log4j properties file which appends to a source would look like:

    
   
51
 *

    
   
52
 * <pre>

    
   
53
 * <p>

    
   
54
 * log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender

    
   
55
 * log4j.appender.out2.Hosts = fooflumesource.com:25430 barflumesource.com:25430

    
   
56
 * log4j.appender.out2.Selector = RANDOM

    
   
57
 * log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p>

    
   
58
 * </pre>

    
   
59
 * <p>

    
   
60
 * <pre>

    
   
61
 * <p>

    
   
62
 * log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender

    
   
63
 * log4j.appender.out2.Hosts = fooflumesource.com:25430 barflumesource.com:25430

    
   
64
 * log4j.appender.out2.Selector = ROUND_ROBIN

    
   
65
 * log4j.appender.out2.MaxBackoff = 60000

    
   
66
 * log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p>

    
   
67
 * </pre>

    
   
68
 * <p>

    
   
69
 * <i>Note: Change the last line to the package of the class(es), that will do

    
   
70
 * the appending.For example if classes from the package com.bar.foo are

    
   
71
 * appending, the last line would be:</i>

    
   
72
 * </p>

    
   
73
 *

    
   
74
 * <pre>

    
   
75
 * <p>log4j.logger.com.bar.foo = DEBUG,out2</p>

    
   
76
 * </pre>

    
   
77
 *

    
   
78
 *

    
   
79
 */

    
   
80
public class LoadBalancingLog4jAppender extends Log4jAppender {

    
   
81

   

    
   
82
  private String hosts;

    
   
83
  private String selector;

    
   
84
  private String maxBackoff;

    
   
85

   

    
   
86
  public void setHosts(String hostNames) {

    
   
87
    this.hosts = hostNames;

    
   
88
  }

    
   
89

   

    
   
90
  public void setSelector(String selector) {

    
   
91
    this.selector = selector;

    
   
92
  }

    
   
93

   

    
   
94
  public void setMaxBackoff(String maxBackoff) {

    
   
95
    this.maxBackoff = maxBackoff;

    
   
96
  }

    
   
97

   

    
   
98
  /**

    
   
99
   * Activate the options set using <tt>setHosts()</tt>, <tt>setSelector</tt>

    
   
100
   * and <tt>setMaxBackoff</tt>

    
   
101
   *

    
   
102
   * @throws FlumeException

    
   
103
   *           if the LoadBalancingRpcClient cannot be instantiated.

    
   
104
   */

    
   
105
  @Override

    
   
106
  public void activateOptions() throws FlumeException {

    
   
107
    try {

    
   
108
      final Properties properties = getProperties(hosts, selector, maxBackoff);

    
   
109
      rpcClient = RpcClientFactory.getInstance(properties);

    
   
110
    } catch (FlumeException e) {

    
   
111
      String errormsg = "RPC client creation failed! " + e.getMessage();

    
   
112
      LogLog.error(errormsg);

    
   
113
      throw e;

    
   
114
    }

    
   
115
  }

    
   
116

   

    
   
117
  private Properties getProperties(String hosts, String selector,

    
   
118
      String maxBackoff) throws FlumeException {

    
   
119

   

    
   
120
    if (StringUtils.isEmpty(hosts)) {

    
   
121
      throw new IllegalArgumentException("hosts must not be null");

    
   
122
    }

    
   
123

   

    
   
124
    Properties props = new Properties();

    
   
125
    String[] hostsAndPorts = hosts.split("\\s+");

    
   
126
    StringBuilder names = new StringBuilder();

    
   
127
    for (int i = 0; i < hostsAndPorts.length; i++) {

    
   
128
      String hostAndPort = hostsAndPorts[i];

    
   
129
      String name = "h" + i;

    
   
130
      props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + name,

    
   
131
          hostAndPort);

    
   
132
      names.append(name).append(" ");

    
   
133
    }

    
   
134
    props.put(RpcClientConfigurationConstants.CONFIG_HOSTS, names.toString());

    
   
135
    props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,

    
   
136
        ClientType.DEFAULT_LOADBALANCE.toString());

    
   
137
    if (!StringUtils.isEmpty(selector)) {

    
   
138
      props.put(RpcClientConfigurationConstants.CONFIG_HOST_SELECTOR, selector);

    
   
139
    }

    
   
140

   

    
   
141
    if (!StringUtils.isEmpty(maxBackoff)) {

    
   
142
      long millis = Long.parseLong(maxBackoff.trim());

    
   
143
      if (millis <= 0) {

    
   
144
        throw new IllegalArgumentException(

    
   
145
            "Misconfigured max backoff, value must be greater than 0");

    
   
146
      }

    
   
147
      props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF,

    
   
148
          String.valueOf(true));

    
   
149
      props.put(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, maxBackoff);

    
   
150
    }

    
   
151
    return props;

    
   
152
  }

    
   
153
}
flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
Revision 315a68c New Change
 
flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
New File
 
flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties
New File
 
flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties
New File
 
flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties
New File
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 8eb3734 New Change
 
  1. flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java: Loading...
  2. flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java: Loading...
  3. flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java: Loading...
  4. flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties: Loading...
  5. flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties: Loading...
  6. flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties: Loading...
  7. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...