Review Board 1.7.22


FLUME-1765 - Add Load Balancing Support to Log4jAppender

Review Request #8328 - Created Dec. 3, 2012 and updated

Cameron Gandevia
flume-1.4.0
Reviewers
Flume
flume-git
The Log4jAppender should be extended to use the LoadBalancingRpcClient allowing users to configure a load balancing log4jappender

 
flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 * http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing,

    
   
13
 * software distributed under the License is distributed on an

    
   
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

    
   
15
 * KIND, either express or implied.  See the License for the

    
   
16
 * specific language governing permissions and limitations

    
   
17
 * under the License.

    
   
18
 */

    
   
19
package org.apache.flume.clients.log4jappender;

    
   
20

   

    
   
21
import java.util.Properties;

    
   
22

   

    
   
23
import org.apache.commons.lang.StringUtils;

    
   
24
import org.apache.flume.FlumeException;

    
   
25
import org.apache.flume.api.LoadBalancingRpcClient;

    
   
26
import org.apache.flume.api.RpcClient;

    
   
27
import org.apache.flume.api.RpcClientConfigurationConstants;

    
   
28
import org.apache.flume.api.RpcClientFactory;

    
   
29
import org.apache.flume.api.RpcClientFactory.ClientType;

    
   
30
import org.apache.log4j.helpers.LogLog;

    
   
31

   

    
   
32
/**

    
   
33
 *

    
   
34
 * Appends Log4j Events to an external Flume client which is decribed by the

    
   
35
 * Log4j configuration file. The appender takes the following required

    
   
36
 * parameters:

    
   
37
 * <p>

    
   
38
 * <strong>Hosts</strong> : A space separated list of host:port of the first hop

    
   
39
 * at which Flume (through an AvroSource) is listening for events.

    
   
40
 * </p>

    
   
41
 * <p>

    
   
42
 * <strong>Selector</strong> : Selection mechanism. Must be either ROUND_ROBIN,

    
   
43
 * RANDOM or custom FQDN to class that inherits from LoadBalancingSelector. If

    
   
44
 * empty defaults to ROUND_ROBIN

    
   
45
 * </p>

    
   
46
 * The appender also takes the following optional parameters:

    
   
47
 * <p>

    
   
48
 * <strong>MaxBackoff</strong> : A long value representing the maximum amount of

    
   
49
 * time in milliseconds the Load balancing client will backoff from a node that

    
   
50
 * has failed to consume an event

    
   
51
 * </p>

    
   
52
 * A sample log4j properties file which appends to a source would look like:

    
   
53
 *

    
   
54
 * <pre>

    
   
55
 * <p>

    
   
56
 * log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender

    
   
57
 * log4j.appender.out2.Hosts = fooflumesource.com:25430 barflumesource.com:25430

    
   
58
 * log4j.appender.out2.Selector = RANDOM

    
   
59
 * log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p>

    
   
60
 * </pre>

    
   
61
 * <p>

    
   
62
 * <pre>

    
   
63
 * <p>

    
   
64
 * log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender

    
   
65
 * log4j.appender.out2.Hosts = fooflumesource.com:25430 barflumesource.com:25430

    
   
66
 * log4j.appender.out2.Selector = ROUND_ROBIN

    
   
67
 * log4j.appender.out2.MaxBackoff = 60000

    
   
68
 * log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p>

    
   
69
 * </pre>

    
   
70
 * <p>

    
   
71
 * <i>Note: Change the last line to the package of the class(es), that will do

    
   
72
 * the appending.For example if classes from the package com.bar.foo are

    
   
73
 * appending, the last line would be:</i>

    
   
74
 * </p>

    
   
75
 *

    
   
76
 * <pre>

    
   
77
 * <p>log4j.logger.com.bar.foo = DEBUG,out2</p>

    
   
78
 * </pre>

    
   
79
 *

    
   
80
 *

    
   
81
 */

    
   
82
public class LoadBalancingLog4jAppender extends Log4jAppender {

    
   
83

   

    
   
84
  private String hosts;

    
   
85
  private String selector;

    
   
86
  private String maxBackoff;

    
   
87

   

    
   
88
  public void setHosts(String hostNames) {

    
   
89
    this.hosts = hostNames;

    
   
90
  }

    
   
91

   

    
   
92
  public void setSelector(String selector) {

    
   
93
    this.selector = selector;

    
   
94
  }

    
   
95

   

    
   
96
  public void setMaxBackoff(String maxBackoff) {

    
   
97
    this.maxBackoff = maxBackoff;

    
   
98
  }

    
   
99

   

    
   
100
  /**

    
   
101
   * Activate the options set using <tt>setHosts()</tt>, <tt>setSelector</tt>

    
   
102
   * and <tt>setMaxBackoff</tt>

    
   
103
   *

    
   
104
   * @throws FlumeException

    
   
105
   *           if the LoadBalancingRpcClient cannot be instantiated.

    
   
106
   */

    
   
107
  @Override

    
   
108
  public void activateOptions() throws FlumeException {

    
   
109
    try {

    
   
110
      Properties properties = getProperties(hosts, selector, maxBackoff);

    
   
111
      rpcClient = RpcClientFactory.getInstance(properties);

    
   
112
    } catch (FlumeException e) {

    
   
113
      String errormsg = "RPC client creation failed! " + e.getMessage();

    
   
114
      LogLog.error(errormsg);

    
   
115
      throw e;

    
   
116
    }

    
   
117
  }

    
   
118

   

    
   
119
  private Properties getProperties(String hosts, String selector,

    
   
120
      String maxBackoff) throws FlumeException {

    
   
121

   

    
   
122
    if (StringUtils.isEmpty(hosts)) {

    
   
123
      throw new IllegalArgumentException("hosts must not be null");

    
   
124
    }

    
   
125

   

    
   
126
    Properties props = new Properties();

    
   
127
    String[] hostsAndPorts = hosts.split("\\s+");

    
   
128
    StringBuilder names = new StringBuilder();

    
   
129
    for (int i = 0; i < hostsAndPorts.length; i++) {

    
   
130
      String hostAndPort = hostsAndPorts[i];

    
   
131
      String name = "h" + i;

    
   
132
      props.put(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + name,

    
   
133
          hostAndPort);

    
   
134
      names.append(name).append(" ");

    
   
135
    }

    
   
136
    props.put(RpcClientConfigurationConstants.CONFIG_HOSTS, names.toString());

    
   
137
    props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,

    
   
138
        ClientType.DEFAULT_LOADBALANCE);

    
   
139

   

    
   
140
    if (!StringUtils.isEmpty(selector)) {

    
   
141
      props.put(RpcClientConfigurationConstants.CONFIG_HOST_SELECTOR, selector);

    
   
142
    }

    
   
143

   

    
   
144
    if (!StringUtils.isEmpty(maxBackoff)) {

    
   
145
      long millis = Long.parseLong(maxBackoff.trim());

    
   
146
      if (millis <= 0) {

    
   
147
        throw new IllegalArgumentException(

    
   
148
            "Misconfigured max backoff, value must be greater than 0");

    
   
149
      }

    
   
150
      props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF,

    
   
151
          String.valueOf(true));

    
   
152
      props.put(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, maxBackoff);

    
   
153
    }

    
   
154
    return props;

    
   
155
  }

    
   
156
}
flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
Revision 083f5d1 New Change
 
flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
New File
 
flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties
New File
 
flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties
New File
 
flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties
New File
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision f8528bb New Change
 
  1. flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java: Loading...
  2. flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java: Loading...
  3. flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java: Loading...
  4. flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties: Loading...
  5. flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties: Loading...
  6. flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties: Loading...
  7. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...