Review Board 1.7.22


FLUME-1852: Issues with EmbeddedAgentConfiguration

Review Request #8975 - Created Jan. 16, 2013 and submitted

Brock Noland
trunk
FLUME-1852
Reviewers
Flume
flume-git
Fixes issues mentioned in 1852.
Unit tests pass and adds tests for items which can be tested.
flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
Revision e52f912 New Change
[20] 30 lines
[+20]
31
import org.apache.flume.conf.channel.ChannelType;
31
import org.apache.flume.conf.channel.ChannelType;
32
import org.apache.flume.conf.sink.SinkProcessorType;
32
import org.apache.flume.conf.sink.SinkProcessorType;
33
import org.apache.flume.conf.sink.SinkType;
33
import org.apache.flume.conf.sink.SinkType;
34

    
   
34

   
35
import com.google.common.base.Joiner;
35
import com.google.common.base.Joiner;

    
   
36
import com.google.common.collect.ImmutableList;
36
import com.google.common.collect.Maps;
37
import com.google.common.collect.Maps;
37

    
   
38

   
38
/**
39
/**
39
 * Stores publicly accessible configuration constants and private
40
 * Stores publicly accessible configuration constants and private
40
 * configuration constants and methods.
41
 * configuration constants and methods.
[+20] [20] 22 lines
[+20] [+] public class EmbeddedAgentConfiguration {
63
   */
64
   */
64
  public static final String SINKS = "sinks";
65
  public static final String SINKS = "sinks";
65

    
   
66

   
66
  public static final String SINKS_PREFIX = join(SINKS, "");
67
  public static final String SINKS_PREFIX = join(SINKS, "");
67
  /**
68
  /**
68
   * Source type, choices are `embedded' or `avro'
69
   * Source type, choices are `embedded'
69
   */
70
   */
70
  public static final String SOURCE_TYPE = join(SOURCE, TYPE);
71
  public static final String SOURCE_TYPE = join(SOURCE, TYPE);
71
  /**
72
  /**
72
   * Prefix for passing configuration parameters to the source
73
   * Prefix for passing configuration parameters to the source
73
   */
74
   */
[+20] [20] 5 lines
[+20] public class EmbeddedAgentConfiguration {
79
  /**
80
  /**
80
   * Prefix for passing configuration parameters to the channel
81
   * Prefix for passing configuration parameters to the channel
81
   */
82
   */
82
  public static final String CHANNEL_PREFIX = join(CHANNEL, "");
83
  public static final String CHANNEL_PREFIX = join(CHANNEL, "");
83
  /**
84
  /**
84
   * Sink processor type, choices are `default' (failover) or `load_balance'
85
   * Sink processor type, choices are `default', `failover' or `load_balance'
85
   */
86
   */
86
  public static final String SINK_PROCESSOR_TYPE = join(SINK_PROCESSOR, TYPE);
87
  public static final String SINK_PROCESSOR_TYPE = join(SINK_PROCESSOR, TYPE);
87
  /**
88
  /**
88
   * Prefix for passing configuration parameters to the sink processor
89
   * Prefix for passing configuration parameters to the sink processor
89
   */
90
   */
90
  public static final String SINK_PROCESSOR_PREFIX = join(SINK_PROCESSOR, "");
91
  public static final String SINK_PROCESSOR_PREFIX = join(SINK_PROCESSOR, "");
91
  /**
92
  /**
92
   * Embedded source which provides simple in-memory transfer to channel.
93
   * Embedded source which provides simple in-memory transfer to channel.
93
   * Use this source via the put,pulAll methods on the EmbeddedAgent. This
94
   * Use this source via the put,putAll methods on the EmbeddedAgent. This
94
   * is the recommended source to use for Embedded Agents.
95
   * is the only supported source to use for Embedded Agents.
95
   */
96
   */
96
  public static final String SOURCE_TYPE_EMBEDDED = EmbeddedSource.class.getName();
97
  public static final String SOURCE_TYPE_EMBEDDED = EmbeddedSource.class.getName();

    
   
98
  private static final String SOURCE_TYPE_EMBEDDED_ALIAS = "EMBEDDED";
97
  /**
99
  /**
98
   * Memory channel which stores events in heap. See Flume User Guide for
100
   * Memory channel which stores events in heap. See Flume User Guide for
99
   * configuration information. This is the recommended channel to use for
101
   * configuration information. This is the recommended channel to use for
100
   * Embedded Agents.
102
   * Embedded Agents.
101
   */
103
   */
102
  public static final String CHANNEL_TYPE_MEMORY = ChannelType.MEMORY.name();
104
  public static final String CHANNEL_TYPE_MEMORY = ChannelType.MEMORY.name();
103
  /**
105
  /**
104
   * File based channel which stores events in heap. See Flume User Guide for
106
   * File based channel which stores events in on local disk. See Flume User
105
   * configuration information.
107
   * Guide for configuration information.
106
   */
108
   */
107
  public static final String CHANNEL_TYPE_FILE = ChannelType.FILE.name();
109
  public static final String CHANNEL_TYPE_FILE = ChannelType.FILE.name();
108

    
   
110

   
109
  /**
111
  /**
110
   * Avro sink which can send events to a downstream avro source. This is the
112
   * Avro sink which can send events to a downstream avro source. This is the
[+20] [20] 16 lines
[+20] public class EmbeddedAgentConfiguration {
127
   */
129
   */
128
  public static final String SINK_PROCESSOR_TYPE_LOAD_BALANCE = SinkProcessorType.LOAD_BALANCE.name();
130
  public static final String SINK_PROCESSOR_TYPE_LOAD_BALANCE = SinkProcessorType.LOAD_BALANCE.name();
129

    
   
131

   
130

    
   
132

   
131
  private static final String[] ALLOWED_SOURCES = {
133
  private static final String[] ALLOWED_SOURCES = {

    
   
134
    SOURCE_TYPE_EMBEDDED_ALIAS,
132
    SOURCE_TYPE_EMBEDDED,
135
    SOURCE_TYPE_EMBEDDED,
133
  };
136
  };
134

    
   
137

   
135
  private static final String[] ALLOWED_CHANNELS = {
138
  private static final String[] ALLOWED_CHANNELS = {
136
    CHANNEL_TYPE_MEMORY,
139
    CHANNEL_TYPE_MEMORY,
[+20] [20] 8 lines
[+20] public class EmbeddedAgentConfiguration {
145
    SINK_PROCESSOR_TYPE_DEFAULT,
148
    SINK_PROCESSOR_TYPE_DEFAULT,
146
    SINK_PROCESSOR_TYPE_FAILOVER,
149
    SINK_PROCESSOR_TYPE_FAILOVER,
147
    SINK_PROCESSOR_TYPE_LOAD_BALANCE
150
    SINK_PROCESSOR_TYPE_LOAD_BALANCE
148
  };
151
  };
149

    
   
152

   

    
   
153
  private static final ImmutableList<String> DISALLOWED_SINK_NAMES =

    
   
154
      ImmutableList.of("source", "channel", "processor");

    
   
155

   
150
  private static void validate(String name,
156
  private static void validate(String name,
151
      Map<String, String> properties) throws FlumeException {
157
      Map<String, String> properties) throws FlumeException {
152

    
   
158

   
153
    if(properties.containsKey(SOURCE_TYPE)) {
159
    if(properties.containsKey(SOURCE_TYPE)) {
154
      checkAllowed(ALLOWED_SOURCES, properties.get(SOURCE_TYPE));
160
      checkAllowed(ALLOWED_SOURCES, properties.get(SOURCE_TYPE));
155
    }
161
    }
156
    checkRequired(properties, CHANNEL_TYPE);
162
    checkRequired(properties, CHANNEL_TYPE);
157
    checkAllowed(ALLOWED_CHANNELS, properties.get(CHANNEL_TYPE));
163
    checkAllowed(ALLOWED_CHANNELS, properties.get(CHANNEL_TYPE));
158
    checkRequired(properties, SINKS);
164
    checkRequired(properties, SINKS);
159
    String sinkNames = properties.get(SINKS);
165
    String sinkNames = properties.get(SINKS);
160
    for(String sink : sinkNames.split("\\s+")) {
166
    for(String sink : sinkNames.split("\\s+")) {

    
   
167
      if(DISALLOWED_SINK_NAMES.contains(sink.toLowerCase())) {

    
   
168
        throw new FlumeException("Sink name " + sink + " is one of the" +

    
   
169
            " disallowed sink names: " + DISALLOWED_SINK_NAMES);

    
   
170
      }
161
      String key = join(sink, TYPE);
171
      String key = join(sink, TYPE);
162
      checkRequired(properties, key);
172
      checkRequired(properties, key);
163
      checkAllowed(ALLOWED_SINKS, properties.get(key));
173
      checkAllowed(ALLOWED_SINKS, properties.get(key));
164

    
   
174

   
165
    }
175
    }
[+20] [20] 14 lines
[+20] private static void validate(String name,
180
      Map<String, String> properties) throws FlumeException {
190
      Map<String, String> properties) throws FlumeException {
181
    validate(name, properties);
191
    validate(name, properties);
182
    // we are going to modify the properties as we parse the config
192
    // we are going to modify the properties as we parse the config
183
    properties = new HashMap<String, String>(properties);
193
    properties = new HashMap<String, String>(properties);
184

    
   
194

   
185
    if(!properties.containsKey(SOURCE_TYPE)) {
195
    if(!properties.containsKey(SOURCE_TYPE) || SOURCE_TYPE_EMBEDDED_ALIAS.

    
   
196
        equalsIgnoreCase(properties.get(SOURCE_TYPE))) {
186
      properties.put(SOURCE_TYPE, SOURCE_TYPE_EMBEDDED);
197
      properties.put(SOURCE_TYPE, SOURCE_TYPE_EMBEDDED);
187
    }
198
    }
188
    String sinkNames = properties.remove(SINKS);
199
    String sinkNames = properties.remove(SINKS);
189

    
   
200

   
190
    String sourceName = "source-" + name;
201
    String sourceName = "source-" + name;
[+20] [20] 6 lines
[+20] private static void validate(String name,
197
     * a simpler client api than passing in an entire agent configuration.
208
     * a simpler client api than passing in an entire agent configuration.
198
     */
209
     */
199
    // user supplied config -> agent configuration
210
    // user supplied config -> agent configuration
200
    Map<String, String> result = Maps.newHashMap();
211
    Map<String, String> result = Maps.newHashMap();
201

    
   
212

   
202
    Joiner joiner = Joiner.on(SEPERATOR);

   
203
    // properties will be modified during iteration so we need a
213
    // properties will be modified during iteration so we need a
204
    // copy of the keys
214
    // copy of the keys
205
    Set<String> userProvidedKeys;
215
    Set<String> userProvidedKeys;
206
    /*
216
    /*
207
     * First we are going to setup all the root level pointers. I.E
217
     * First we are going to setup all the root level pointers. I.E
208
     * point the agent at the components, sink group at sinks, and
218
     * point the agent at the components, sink group at sinks, and
209
     * source at the channel.
219
     * source at the channel.
210
     */
220
     */
211
    // point agent at source
221
    // point agent at source
212
    result.put(joiner.
222
    result.put(join(name, BasicConfigurationConstants.CONFIG_SOURCES),
213
        join(name, BasicConfigurationConstants.CONFIG_SOURCES), sourceName);
223
        sourceName);
214
    // point agent at channel
224
    // point agent at channel
215
    result.put(joiner.
225
    result.put(join(name, BasicConfigurationConstants.CONFIG_CHANNELS),
216
        join(name, BasicConfigurationConstants.CONFIG_CHANNELS), channelName);
226
        channelName);
217
    // point agent at source
227
    // point agent at sinks
218
    result.put(joiner.
228
    result.put(join(name, BasicConfigurationConstants.CONFIG_SINKS),
219
        join(name, BasicConfigurationConstants.CONFIG_SINKS), sinkNames);
229
        sinkNames);
220
    // points the agent at the sinkgroup
230
    // points the agent at the sinkgroup
221
    result.put(joiner.
231
    result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS),
222
        join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS),

   
223
        sinkGroupName);
232
        sinkGroupName);
224
    // points the sinkgroup at the sinks
233
    // points the sinkgroup at the sinks
225
    result.put(joiner.
234
    result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
226
        join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,

   
227
            sinkGroupName, SINKS), sinkNames);
235
            sinkGroupName, SINKS), sinkNames);
228
    // points the source at the channel
236
    // points the source at the channel
229
    result.put(joiner.join(name,
237
    result.put(join(name,
230
        BasicConfigurationConstants.CONFIG_SOURCES, sourceName,
238
        BasicConfigurationConstants.CONFIG_SOURCES, sourceName,
231
        BasicConfigurationConstants.CONFIG_CHANNELS), channelName);
239
        BasicConfigurationConstants.CONFIG_CHANNELS), channelName);
232
    /*
240
    /*
233
     * Second process the the sink configuration and point the sinks
241
     * Second process the sink configuration and point the sinks
234
     * at the channel.
242
     * at the channel.
235
     */
243
     */
236
    userProvidedKeys = new HashSet<String>(properties.keySet());
244
    userProvidedKeys = new HashSet<String>(properties.keySet());
237
    for(String sink :  sinkNames.split("\\s+")) {
245
    for(String sink :  sinkNames.split("\\s+")) {
238
      for(String key : userProvidedKeys) {
246
      for(String key : userProvidedKeys) {
239
        String value = properties.get(key);
247
        String value = properties.get(key);
240
        if(key.startsWith(sink)) {
248
        if(key.startsWith(sink + SEPERATOR)) {
241
          properties.remove(key);
249
          properties.remove(key);
242
          result.put(joiner.join(name,
250
          result.put(join(name,
243
              BasicConfigurationConstants.CONFIG_SINKS, key), value);
251
              BasicConfigurationConstants.CONFIG_SINKS, key), value);
244
        }
252
        }
245
      }
253
      }
246
      // point the sink at the channel
254
      // point the sink at the channel
247
      result.put(joiner.join(name,
255
      result.put(join(name,
248
          BasicConfigurationConstants.CONFIG_SINKS, sink,
256
          BasicConfigurationConstants.CONFIG_SINKS, sink,
249
          BasicConfigurationConstants.CONFIG_CHANNEL), channelName);
257
          BasicConfigurationConstants.CONFIG_CHANNEL), channelName);
250
    }
258
    }
251
    /*
259
    /*
252
     * Third, process all remaining configuration items, prefixing them
260
     * Third, process all remaining configuration items, prefixing them
253
     * correctly and then passing them on to the agent.
261
     * correctly and then passing them on to the agent.
254
     */
262
     */
255
    userProvidedKeys = new HashSet<String>(properties.keySet());
263
    userProvidedKeys = new HashSet<String>(properties.keySet());
256
    for(String key : userProvidedKeys) {
264
    for(String key : userProvidedKeys) {
257
      String value = properties.get(key);
265
      String value = properties.get(key);
258
      if(key.startsWith(SOURCE)) {
266
      if(key.startsWith(SOURCE_PREFIX)) {
259
        // users use `source' but agent needs the actual source name
267
        // users use `source' but agent needs the actual source name
260
        key = key.replace(SOURCE, sourceName);
268
        key = key.replaceFirst(SOURCE, sourceName);
261
        result.put(joiner.join(name,
269
        result.put(join(name,
262
            BasicConfigurationConstants.CONFIG_SOURCES, key), value);
270
            BasicConfigurationConstants.CONFIG_SOURCES, key), value);
263
      } else if(key.startsWith(CHANNEL)) {
271
      } else if(key.startsWith(CHANNEL_PREFIX)) {
264
        // users use `channel' but agent needs the actual channel name
272
        // users use `channel' but agent needs the actual channel name
265
        key = key.replace(CHANNEL, channelName);
273
        key = key.replaceFirst(CHANNEL, channelName);
266
        result.put(joiner.join(name,
274
        result.put(join(name,
267
            BasicConfigurationConstants.CONFIG_CHANNELS, key), value);
275
            BasicConfigurationConstants.CONFIG_CHANNELS, key), value);
268
      } else if(key.startsWith(SINK_PROCESSOR)) {
276
      } else if(key.startsWith(SINK_PROCESSOR_PREFIX)) {
269
        // agent.sinkgroups.sinkgroup.processor.*
277
        // agent.sinkgroups.sinkgroup.processor.*
270
        result.put(joiner.
278
        result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
271
            join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,

   
272
                sinkGroupName, key), value);
279
                sinkGroupName, key), value);
273
      } else {
280
      } else {
274
        // XXX should we simply ignore this?
281
        // XXX should we simply ignore this?
275
        throw new FlumeException("Unknown configuration " + key);
282
        throw new FlumeException("Unknown configuration " + key);
276
      }
283
      }
[+20] [20] 32 lines
flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java
Revision 0a53c5f New Change
 
flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
Revision 3805ea8 New Change
 
flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
Revision 4e94d72 New Change
 
  1. flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java: Loading...
  2. flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java: Loading...
  3. flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java: Loading...
  4. flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java: Loading...