Review Board 1.7.22


Support for optional channels in MultiplexingChannelSelector

Review Request #4034 - Created Feb. 24, 2012 and submitted

Hari Shreedharan
flume-728
FLUME-946
Reviewers
Flume
flume-git
Support for optional channels in multiplexing channel selector. If no required channels configured, but there are optional channels then it means, write to default channels and optionally write to optional channels if they are available.
Updated unit tests to support these cases
flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
Revision 81dc3e8 New Change
[20] 35 lines
[+20] [+] public class MultiplexingChannelSelector extends AbstractChannelSelector {
36
  public static final String CONFIG_MULTIPLEX_HEADER_NAME = "header";
36
  public static final String CONFIG_MULTIPLEX_HEADER_NAME = "header";
37
  public static final String DEFAULT_MULTIPLEX_HEADER =
37
  public static final String DEFAULT_MULTIPLEX_HEADER =
38
      "flume.selector.header";
38
      "flume.selector.header";
39
  public static final String CONFIG_PREFIX_MAPPING = "mapping.";
39
  public static final String CONFIG_PREFIX_MAPPING = "mapping.";
40
  public static final String CONFIG_DEFAULT_CHANNEL = "default";
40
  public static final String CONFIG_DEFAULT_CHANNEL = "default";

    
   
41
  public static final String CONFIG_PREFIX_OPTIONAL = "optional";

    
   
42

   
41
  @SuppressWarnings("unused")
43
  @SuppressWarnings("unused")
42
  private static final Logger LOG = LoggerFactory
44
  private static final Logger LOG = LoggerFactory
43
    .getLogger(MultiplexingChannelSelector.class);
45
    .getLogger(MultiplexingChannelSelector.class);
44

    
   
46

   
45
  private static final List<Channel> EMPTY_LIST =
47
  private static final List<Channel> EMPTY_LIST =
46
      Collections.emptyList();
48
      Collections.emptyList();
47

    
   
49

   
48
  private String headerName;
50
  private String headerName;
49

    
   
51

   
50
  private Map<String, List<Channel>> channelMapping;
52
  private Map<String, List<Channel>> channelMapping;
51

    
   
53
  private Map<String, List<Channel>> optionalChannels;
52
  private List<Channel> defaultChannels;
54
  private List<Channel> defaultChannels;
53
  @Override
55
  @Override
54
  public List<Channel> getRequiredChannels(Event event) {
56
  public List<Channel> getRequiredChannels(Event event) {
55
    String headerValue = event.getHeaders().get(headerName);
57
    String headerValue = event.getHeaders().get(headerName);
56
    if (headerValue == null || headerValue.trim().length() == 0) {
58
    if (headerValue == null || headerValue.trim().length() == 0) {
[+20] [20] 11 lines
[+20] public class MultiplexingChannelSelector extends AbstractChannelSelector {
68
    return channels;
70
    return channels;
69
  }
71
  }
70

    
   
72

   
71
  @Override
73
  @Override
72
  public List<Channel> getOptionalChannels(Event event) {
74
  public List<Channel> getOptionalChannels(Event event) {
73
    return EMPTY_LIST;
75
    String hdr = event.getHeaders().get(headerName);

    
   
76
    List<Channel> channels = optionalChannels.get(hdr);

    
   
77

   

    
   
78
    if(channels == null) {

    
   
79
      channels = EMPTY_LIST;

    
   
80
    }

    
   
81
    return channels;
74
  }
82
  }
75

    
   
83

   
76
  @Override
84
  @Override
77
  public void configure(Context context) {
85
  public void configure(Context context) {
78
    this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME,
86
    this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME,
[+20] [20] 32 lines
[+20] public void configure(Context context) {
111
        throw new FlumeException("Selector channel configured twice");
119
        throw new FlumeException("Selector channel configured twice");
112
      }
120
      }
113
    }
121
    }
114
    //If no mapping is configured, it is ok.
122
    //If no mapping is configured, it is ok.
115
    //All events will go to the default channel(s).
123
    //All events will go to the default channel(s).

    
   
124
    Map<String, String> optionalChannelsMapping =

    
   
125
        context.getSubProperties(CONFIG_PREFIX_OPTIONAL + ".");

    
   
126

   

    
   
127
    optionalChannels = new HashMap<String, List<Channel>>();

    
   
128
    for (String hdr : optionalChannelsMapping.keySet()) {

    
   
129
      List<Channel> confChannels = getChannelListFromNames(

    
   
130
              optionalChannelsMapping.get(hdr), channelNameMap);

    
   
131
      if (confChannels.isEmpty()) {

    
   
132
        confChannels = EMPTY_LIST;

    
   
133
      }

    
   
134
      //Remove channels from optional channels, which are already

    
   
135
      //configured to be required channels.

    
   
136

   

    
   
137
      List<Channel> reqdChannels = channelMapping.get(hdr);

    
   
138
      //Check if there are required channels, else defaults to default channels

    
   
139
      if(reqdChannels == null || reqdChannels.isEmpty()) {

    
   
140
        reqdChannels = defaultChannels;

    
   
141
      }

    
   
142
      for (Channel c : reqdChannels) {

    
   
143
        if (confChannels.contains(c)) {

    
   
144
          confChannels.remove(c);

    
   
145
        }

    
   
146
      }

    
   
147

   

    
   
148
      if (optionalChannels.put(hdr, confChannels) != null) {

    
   
149
        throw new FlumeException("Selector channel configured twice");

    
   
150
      }

    
   
151
    }
116

    
   
152

   
117
  }
153
  }
118

    
   
154

   
119
  //Given a list of channel names as space delimited string,
155
  //Given a list of channel names as space delimited string,
120
  //returns list of channels.
156
  //returns list of channels.
[+20] [20] 17 lines
flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
Revision 2626b20 New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 0c0951b New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java: Loading...
  3. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...