Review Board 1.7.22


FLUME-1772: AbstractConfigurationProvider should remove component which throws exception from configure method.

Review Request #8421 - Created Dec. 7, 2012 and submitted

Brock Noland
trunk
FLUME-1772
Reviewers
Flume
flume-git
Catches Exception around all the configure() methods of components. If they throw an exception, they are removed from the configuration (not added).
Tests added for source, channel, and sink.

Diff revision 3

This is not the most recent revision of the diff. The latest diff is revision 4. See what's changed.

1 2 3 4
1 2 3 4

  1. flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java: Loading...
  2. flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java: Loading...
flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
Revision daef76b New Change
[20] 16 lines
[+20]
17
 */
17
 */
18
package org.apache.flume.node;
18
package org.apache.flume.node;
19

    
   
19

   
20
import java.util.ArrayList;
20
import java.util.ArrayList;
21
import java.util.HashMap;
21
import java.util.HashMap;

    
   
22
import java.util.HashSet;
22
import java.util.List;
23
import java.util.List;
23
import java.util.Map;
24
import java.util.Map;
24
import java.util.Map.Entry;
25
import java.util.Map.Entry;
25
import java.util.Set;
26
import java.util.Set;
26

    
   
27

   
[+20] [20] 27 lines
[+20]
54
import org.apache.flume.sink.SinkGroup;
55
import org.apache.flume.sink.SinkGroup;
55
import org.apache.flume.source.DefaultSourceFactory;
56
import org.apache.flume.source.DefaultSourceFactory;
56
import org.slf4j.Logger;
57
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
58
import org.slf4j.LoggerFactory;
58

    
   
59

   

    
   
60
import com.google.common.base.Preconditions;
59
import com.google.common.collect.ArrayListMultimap;
61
import com.google.common.collect.ArrayListMultimap;
60
import com.google.common.collect.ImmutableMap;

   
61
import com.google.common.collect.ListMultimap;
62
import com.google.common.collect.ListMultimap;

    
   
63
import com.google.common.collect.Lists;

    
   
64
import com.google.common.collect.Maps;
62

    
   
65

   
63
public abstract class AbstractConfigurationProvider implements
66
public abstract class AbstractConfigurationProvider implements
64
    ConfigurationProvider {
67
    ConfigurationProvider {
65

    
   
68

   
66
  private static final Logger LOGGER = LoggerFactory
69
  private static final Logger LOGGER = LoggerFactory
67
      .getLogger(AbstractConfigurationProvider.class);
70
      .getLogger(AbstractConfigurationProvider.class);
68

    
   
71

   
69
  private final String agentName;
72
  private final String agentName;
70
  private final SourceFactory sourceFactory;
73
  private final SourceFactory sourceFactory;
71
  private final SinkFactory sinkFactory;
74
  private final SinkFactory sinkFactory;
72
  private final ChannelFactory channelFactory;
75
  private final ChannelFactory channelFactory;
73

    
   
76

   
74
  private final Map<Class<? extends Channel>, Map<String, Channel>> channels;

   
75

    
   
77

   

    
   
78
  private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache;

    
   
79

   
76
  public AbstractConfigurationProvider(String agentName) {
80
  public AbstractConfigurationProvider(String agentName) {
77
    super();
81
    super();
78
    this.agentName = agentName;
82
    this.agentName = agentName;
79
    this.sourceFactory = new DefaultSourceFactory();
83
    this.sourceFactory = new DefaultSourceFactory();
80
    this.sinkFactory = new DefaultSinkFactory();
84
    this.sinkFactory = new DefaultSinkFactory();
81
    this.channelFactory = new DefaultChannelFactory();
85
    this.channelFactory = new DefaultChannelFactory();
82

    
   
86

   
83
    channels = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
87
    channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
84
  }
88
  }
85

    
   
89

   
86
  protected abstract FlumeConfiguration getFlumeConfiguration();
90
  protected abstract FlumeConfiguration getFlumeConfiguration();
87

    
   
91

   
88
  public MaterializedConfiguration getConfiguration() {
92
  public MaterializedConfiguration getConfiguration() {
89
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
93
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
90
    FlumeConfiguration fconfig = getFlumeConfiguration();
94
    FlumeConfiguration fconfig = getFlumeConfiguration();
91
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
95
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
92
    if (agentConf != null) {
96
    if (agentConf != null) {

    
   
97
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();

    
   
98
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();

    
   
99
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
93
      try {
100
      try {
94
        loadChannels(agentConf, conf);
101
        loadChannels(agentConf, channelComponentMap);
95
        loadSources(agentConf, conf);
102
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
96
        loadSinks(agentConf, conf);
103
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);

    
   
104
        Set<String> channelNames =

    
   
105
            new HashSet<String>(channelComponentMap.keySet());

    
   
106
        for(String channelName : channelNames) {

    
   
107
          ChannelComponent channelComponent = channelComponentMap.

    
   
108
              get(channelName);

    
   
109
          if(channelComponent.components.isEmpty()) {

    
   
110
            LOGGER.warn(String.format("Channel %s has no components connected" +

    
   
111
                " and has been removed.", channelName));

    
   
112
            channelComponentMap.remove(channelName);

    
   
113
          } else {

    
   
114
            LOGGER.info(String.format("Channel %s connected to %s",

    
   
115
                channelName, channelComponent.components.toString()));

    
   
116
            conf.addChannel(channelName, channelComponent.channel);

    
   
117
          }

    
   
118
        }

    
   
119
        for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {

    
   
120
          conf.addSourceRunner(entry.getKey(), entry.getValue());

    
   
121
        }

    
   
122
        for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {

    
   
123
          conf.addSinkRunner(entry.getKey(), entry.getValue());

    
   
124
        }
97
      } catch (InstantiationException ex) {
125
      } catch (InstantiationException ex) {
98
        LOGGER.error("Failed to instantiate component", ex);
126
        LOGGER.error("Failed to instantiate component", ex);

    
   
127
      } finally {

    
   
128
        channelComponentMap.clear();

    
   
129
        sourceRunnerMap.clear();

    
   
130
        sinkRunnerMap.clear();
99
      }
131
      }
100
    } else {
132
    } else {
101
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
133
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
102
    }
134
    }
103
    return conf;
135
    return conf;
104
  }
136
  }
105

    
   
137

   
106
  public String getAgentName() {
138
  public String getAgentName() {
107
    return agentName;
139
    return agentName;
108
  }
140
  }
109

    
   
141

   
110

    
   

   
111
  private void loadChannels(AgentConfiguration agentConf,
142
  private void loadChannels(AgentConfiguration agentConf,
112
      MaterializedConfiguration conf) throws InstantiationException {
143
      Map<String, ChannelComponent> channelComponentMap)

    
   
144
          throws InstantiationException {
113
    LOGGER.info("Creating channels");
145
    LOGGER.info("Creating channels");
114

    
   
146

   
115
    /*
147
    /*
116
     * Some channels will be reused across re-configurations. To handle this,
148
     * Some channels will be reused across re-configurations. To handle this,
117
     * we store all the names of current channels, perform the reconfiguration,
149
     * we store all the names of current channels, perform the reconfiguration,
118
     * and then if a channel was not used, we delete our reference to it.
150
     * and then if a channel was not used, we delete our reference to it.
119
     * This supports the scenario where you enable channel "ch0" then remove it
151
     * This supports the scenario where you enable channel "ch0" then remove it
120
     * and add it back. Without this, channels like memory channel would cause
152
     * and add it back. Without this, channels like memory channel would cause
121
     * the first instances data to show up in the seconds.
153
     * the first instances data to show up in the seconds.
122
     */
154
     */
123
    ListMultimap<Class<? extends Channel>, String> channelsNotReused =
155
    ListMultimap<Class<? extends Channel>, String> channelsNotReused =
124
        ArrayListMultimap.create();
156
        ArrayListMultimap.create();
125
    // assume all channels will not be re-used
157
    // assume all channels will not be re-used
126
    for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : channels.entrySet()) {
158
    for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : channelCache.entrySet()) {
127
      Class<? extends Channel> channelKlass = entry.getKey();
159
      Class<? extends Channel> channelKlass = entry.getKey();
128
      Set<String> channelNames = entry.getValue().keySet();
160
      Set<String> channelNames = entry.getValue().keySet();
129
      channelsNotReused.get(channelKlass).addAll(channelNames);
161
      channelsNotReused.get(channelKlass).addAll(channelNames);
130
    }
162
    }
131

    
   
163

   
[+20] [20] 6 lines
[+20] private void loadChannels(AgentConfiguration agentConf,
138
    for (String chName : channelNames) {
170
    for (String chName : channelNames) {
139
      ComponentConfiguration comp = compMap.get(chName);
171
      ComponentConfiguration comp = compMap.get(chName);
140
      if(comp != null) {
172
      if(comp != null) {
141
        Channel channel = getOrCreateChannel(channelsNotReused,
173
        Channel channel = getOrCreateChannel(channelsNotReused,
142
            comp.getComponentName(), comp.getType());
174
            comp.getComponentName(), comp.getType());

    
   
175
        try {
143
        Configurables.configure(channel, comp);
176
          Configurables.configure(channel, comp);
144
        conf.addChannel(comp.getComponentName(), channel);
177
          channelComponentMap.put(comp.getComponentName(),

    
   
178
              new ChannelComponent(channel));
145
        LOGGER.info("Created channel " + chName);
179
          LOGGER.info("Created channel " + chName);

    
   
180
        } catch (Exception e) {

    
   
181
          String msg = String.format("Channel %s has been removed due to an " +

    
   
182
              "error during configuration", chName);

    
   
183
          LOGGER.error(msg, e);

    
   
184
        }
146
      }
185
      }
147
    }
186
    }
148
    /*
187
    /*
149
     * Components which DO NOT have a ComponentConfiguration object
188
     * Components which DO NOT have a ComponentConfiguration object
150
     * and use only Context
189
     * and use only Context
151
     */
190
     */
152
    for (String chName : channelNames) {
191
    for (String chName : channelNames) {
153
      Context context = agentConf.getChannelContext().get(chName);
192
      Context context = agentConf.getChannelContext().get(chName);
154
      if(context != null){
193
      if(context != null){
155
        Channel channel =
194
        Channel channel =
156
            getOrCreateChannel(channelsNotReused, chName, context.getString(
195
            getOrCreateChannel(channelsNotReused, chName, context.getString(
157
                BasicConfigurationConstants.CONFIG_TYPE));
196
                BasicConfigurationConstants.CONFIG_TYPE));

    
   
197
        try {
158
        Configurables.configure(channel, context);
198
          Configurables.configure(channel, context);
159
        conf.addChannel(chName, channel);
199
          channelComponentMap.put(chName, new ChannelComponent(channel));
160
        LOGGER.info("Created channel " + chName);
200
          LOGGER.info("Created channel " + chName);

    
   
201
        } catch (Exception e) {

    
   
202
          String msg = String.format("Channel %s has been removed due to an " +

    
   
203
                "error during configuration", chName);

    
   
204
          LOGGER.error(msg, e);

    
   
205
        }
161
      }
206
      }
162
    }
207
    }
163
    /*
208
    /*
164
     * Any channel which was not re-used, will have it's reference removed
209
     * Any channel which was not re-used, will have it's reference removed
165
     */
210
     */
166
    for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
211
    for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
167
      Map<String, Channel> channelMap = channels.get(channelKlass);
212
      Map<String, Channel> channelMap = channelCache.get(channelKlass);
168
      if (channelMap != null) {
213
      if (channelMap != null) {
169
        for (String channelName : channelsNotReused.get(channelKlass)) {
214
        for (String channelName : channelsNotReused.get(channelKlass)) {
170
          if(channelMap.remove(channelName) != null) {
215
          if(channelMap.remove(channelName) != null) {
171
            LOGGER.info("Removed {} of type {}", channelName, channelKlass);
216
            LOGGER.info("Removed {} of type {}", channelName, channelKlass);
172
          }
217
          }
173
        }
218
        }
174
        if (channelMap.isEmpty()) {
219
        if (channelMap.isEmpty()) {
175
          channels.remove(channelKlass);
220
          channelCache.remove(channelKlass);
176
        }
221
        }
177
      }
222
      }
178
    }
223
    }
179
  }
224
  }
180

    
   
225

   
[+20] [20] 10 lines
[+20] [+] private Channel getOrCreateChannel(
191
    if(channelClass.isAnnotationPresent(Disposable.class)) {
236
    if(channelClass.isAnnotationPresent(Disposable.class)) {
192
      Channel channel = channelFactory.create(name, type);
237
      Channel channel = channelFactory.create(name, type);
193
      channel.setName(name);
238
      channel.setName(name);
194
      return channel;
239
      return channel;
195
    }
240
    }
196
    Map<String, Channel> channelMap = channels.get(channelClass);
241
    Map<String, Channel> channelMap = channelCache.get(channelClass);
197
    if (channelMap == null) {
242
    if (channelMap == null) {
198
      channelMap = new HashMap<String, Channel>();
243
      channelMap = new HashMap<String, Channel>();
199
      channels.put(channelClass, channelMap);
244
      channelCache.put(channelClass, channelMap);
200
    }
245
    }
201
    Channel channel = channelMap.get(name);
246
    Channel channel = channelMap.get(name);
202
    if(channel == null) {
247
    if(channel == null) {
203
      channel = channelFactory.create(name, type);
248
      channel = channelFactory.create(name, type);
204
      channel.setName(name);
249
      channel.setName(name);
205
      channelMap.put(name, channel);
250
      channelMap.put(name, channel);
206
    }
251
    }
207
    channelsNotReused.get(channelClass).remove(name);
252
    channelsNotReused.get(channelClass).remove(name);
208
    return channel;
253
    return channel;
209
  }
254
  }
210

    
   
255

   
211
  private void loadSources(AgentConfiguration agentConf, MaterializedConfiguration conf)
256
  private void loadSources(AgentConfiguration agentConf,

    
   
257
      Map<String, ChannelComponent> channelComponentMap,

    
   
258
      Map<String, SourceRunner> sourceRunnerMap)
212
      throws InstantiationException {
259
      throws InstantiationException {
213

    
   
260

   
214
    Set<String> sources = agentConf.getSourceSet();
261
    Set<String> sourceNames = agentConf.getSourceSet();
215
    Map<String, ComponentConfiguration> compMap =
262
    Map<String, ComponentConfiguration> compMap =
216
        agentConf.getSourceConfigMap();
263
        agentConf.getSourceConfigMap();
217
    /*
264
    /*
218
     * Components which have a ComponentConfiguration object
265
     * Components which have a ComponentConfiguration object
219
     */
266
     */
220
    for (String sourceName : sources) {
267
    for (String sourceName : sourceNames) {
221
      ComponentConfiguration comp = compMap.get(sourceName);
268
      ComponentConfiguration comp = compMap.get(sourceName);
222
      if(comp != null) {
269
      if(comp != null) {
223
        SourceConfiguration config = (SourceConfiguration) comp;
270
        SourceConfiguration config = (SourceConfiguration) comp;
224

    
   
271

   
225
        Source source = sourceFactory.create(comp.getComponentName(),
272
        Source source = sourceFactory.create(comp.getComponentName(),
226
            comp.getType());
273
            comp.getType());
227

    
   
274
        try {
228
        Configurables.configure(source, config);
275
          Configurables.configure(source, config);
229
        Set<String> channelNames = config.getChannels();
276
          Set<String> channelNames = config.getChannels();
230
        List<Channel> channels = new ArrayList<Channel>();
277
          List<Channel> sourceChannels = new ArrayList<Channel>();
231
        for (String chName : channelNames) {
278
          for (String chName : channelNames) {
232
          channels.add(conf.getChannels().get(chName));
279
            ChannelComponent channelComponent = channelComponentMap.get(chName);

    
   
280
            if(channelComponent != null) {

    
   
281
              sourceChannels.add(channelComponent.channel);

    
   
282
            }

    
   
283
          }

    
   
284
          if(sourceChannels.isEmpty()) {

    
   
285
            String msg = String.format("Source %s is not connected to a " +

    
   
286
                "channel",  sourceName);

    
   
287
            throw new IllegalStateException(msg);
233
        }
288
          }
234

    
   

   
235
        ChannelSelectorConfiguration selectorConfig =
289
          ChannelSelectorConfiguration selectorConfig =
236
            config.getSelectorConfiguration();
290
              config.getSelectorConfiguration();
237

    
   
291

   
238
        ChannelSelector selector = ChannelSelectorFactory.create(
292
          ChannelSelector selector = ChannelSelectorFactory.create(
239
            channels, selectorConfig);
293
              sourceChannels, selectorConfig);
240

    
   
294

   
241
        ChannelProcessor channelProcessor = new ChannelProcessor(selector);
295
          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
242
        Configurables.configure(channelProcessor, config);
296
          Configurables.configure(channelProcessor, config);
243

    
   
297

   
244
        source.setChannelProcessor(channelProcessor);
298
          source.setChannelProcessor(channelProcessor);
245
        conf.addSourceRunner(comp.getComponentName(),
299
          sourceRunnerMap.put(comp.getComponentName(),
246
            SourceRunner.forSource(source));
300
              SourceRunner.forSource(source));

    
   
301
          for(Channel channel : sourceChannels) {

    
   
302
            ChannelComponent channelComponent = Preconditions.

    
   
303
                checkNotNull(channelComponentMap.get(channel.getName()),

    
   
304
                    String.format("Channel %s", channel.getName()));

    
   
305
            channelComponent.components.add(sourceName);

    
   
306
          }

    
   
307
        } catch (Exception e) {

    
   
308
          String msg = String.format("Source %s has been removed due to an " +

    
   
309
              "error during configuration", sourceName);

    
   
310
          LOGGER.error(msg, e);

    
   
311
        }
247
      }
312
      }
248
    }
313
    }
249
    /*
314
    /*
250
     * Components which DO NOT have a ComponentConfiguration object
315
     * Components which DO NOT have a ComponentConfiguration object
251
     * and use only Context
316
     * and use only Context
252
     */
317
     */
253
    Map<String, Context> sourceContexts = agentConf.getSourceContext();
318
    Map<String, Context> sourceContexts = agentConf.getSourceContext();
254
    for (String sourceName : sources) {
319
    for (String sourceName : sourceNames) {
255
      Context context = sourceContexts.get(sourceName);
320
      Context context = sourceContexts.get(sourceName);
256
      if(context != null){
321
      if(context != null){
257
        Source source =
322
        Source source =
258
            sourceFactory.create(sourceName,
323
            sourceFactory.create(sourceName,
259
                context.getString(BasicConfigurationConstants.CONFIG_TYPE));
324
                context.getString(BasicConfigurationConstants.CONFIG_TYPE));
260
        List<Channel> channels = new ArrayList<Channel>();
325
        try {
261
        Configurables.configure(source, context);
326
          Configurables.configure(source, context);

    
   
327
          List<Channel> sourceChannels = new ArrayList<Channel>();
262
        String[] channelNames = context.getString(
328
          String[] channelNames = context.getString(
263
            BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
329
              BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
264
        for (String chName : channelNames) {
330
          for (String chName : channelNames) {
265
          channels.add(conf.getChannels().get(chName));
331
            ChannelComponent channelComponent = channelComponentMap.get(chName);

    
   
332
            if(channelComponent != null) {

    
   
333
              sourceChannels.add(channelComponent.channel);

    
   
334
            }

    
   
335
          }

    
   
336
          if(sourceChannels.isEmpty()) {

    
   
337
            String msg = String.format("Source %s is not connected to a " +

    
   
338
                "channel",  sourceName);

    
   
339
            throw new IllegalStateException(msg);
266
        }
340
          }
267

    
   

   
268
        Map<String, String> selectorConfig = context.getSubProperties(
341
          Map<String, String> selectorConfig = context.getSubProperties(
269
            BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
342
              BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
270

    
   
343

   
271
        ChannelSelector selector = ChannelSelectorFactory.create(
344
          ChannelSelector selector = ChannelSelectorFactory.create(
272
            channels, selectorConfig);
345
              sourceChannels, selectorConfig);
273

    
   
346

   
274
        ChannelProcessor channelProcessor = new ChannelProcessor(selector);
347
          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
275
        Configurables.configure(channelProcessor, context);
348
          Configurables.configure(channelProcessor, context);
276

    
   

   
277
        source.setChannelProcessor(channelProcessor);
349
          source.setChannelProcessor(channelProcessor);
278
        conf.addSourceRunner(sourceName,
350
          sourceRunnerMap.put(sourceName,
279
            SourceRunner.forSource(source));
351
              SourceRunner.forSource(source));
280

    
   
352
          for(Channel channel : sourceChannels) {

    
   
353
            ChannelComponent channelComponent = Preconditions.

    
   
354
                checkNotNull(channelComponentMap.get(channel.getName()),

    
   
355
                    String.format("Channel %s", channel.getName()));

    
   
356
            channelComponent.components.add(sourceName);

    
   
357
          }

    
   
358
        } catch (Exception e) {

    
   
359
          String msg = String.format("Source %s has been removed due to an " +

    
   
360
              "error during configuration", sourceName);

    
   
361
          LOGGER.error(msg, e);

    
   
362
        }
281
      }
363
      }
282
    }
364
    }
283
  }
365
  }
284

    
   
366

   
285
  private void loadSinks(AgentConfiguration agentConf, MaterializedConfiguration conf)
367
  private void loadSinks(AgentConfiguration agentConf,

    
   
368
      Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
286
      throws InstantiationException {
369
      throws InstantiationException {
287
    Set<String> sinkNames = agentConf.getSinkSet();
370
    Set<String> sinkNames = agentConf.getSinkSet();
288
    ImmutableMap<String,Channel> channels = conf.getChannels();

   
289
    Map<String, ComponentConfiguration> compMap =
371
    Map<String, ComponentConfiguration> compMap =
290
        agentConf.getSinkConfigMap();
372
        agentConf.getSinkConfigMap();
291
    Map<String, Sink> sinks = new HashMap<String, Sink>();
373
    Map<String, Sink> sinks = new HashMap<String, Sink>();
292
    /*
374
    /*
293
     * Components which have a ComponentConfiguration object
375
     * Components which have a ComponentConfiguration object
294
     */
376
     */
295
    for (String sinkName : sinkNames) {
377
    for (String sinkName : sinkNames) {
296
      ComponentConfiguration comp = compMap.get(sinkName);
378
      ComponentConfiguration comp = compMap.get(sinkName);
297
      if(comp != null) {
379
      if(comp != null) {
298
        SinkConfiguration config = (SinkConfiguration) comp;
380
        SinkConfiguration config = (SinkConfiguration) comp;
299
        Sink sink = sinkFactory.create(comp.getComponentName(),
381
        Sink sink = sinkFactory.create(comp.getComponentName(),
300
            comp.getType());
382
            comp.getType());
301

    
   
383
        try {
302
        Configurables.configure(sink, config);
384
          Configurables.configure(sink, config);
303

    
   
385
          ChannelComponent channelComponent = channelComponentMap.
304
        sink.setChannel(channels.get(config.getChannel()));
386
              get(config.getChannel());

    
   
387
          if(channelComponent == null) {

    
   
388
            String msg = String.format("Sink %s is not connected to a " +

    
   
389
                "channel",  sinkName);

    
   
390
            throw new IllegalStateException(msg);

    
   
391
          }

    
   
392
          sink.setChannel(channelComponent.channel);
305
        sinks.put(comp.getComponentName(), sink);
393
          sinks.put(comp.getComponentName(), sink);

    
   
394
          channelComponent.components.add(sinkName);

    
   
395
        } catch (Exception e) {

    
   
396
          String msg = String.format("Sink %s has been removed due to an " +

    
   
397
              "error during configuration", sinkName);

    
   
398
          LOGGER.error(msg, e);

    
   
399
        }
306
      }
400
      }
307
    }
401
    }
308
    /*
402
    /*
309
     * Components which DO NOT have a ComponentConfiguration object
403
     * Components which DO NOT have a ComponentConfiguration object
310
     * and use only Context
404
     * and use only Context
311
     */
405
     */
312
    Map<String, Context> sinkContexts = agentConf.getSinkContext();
406
    Map<String, Context> sinkContexts = agentConf.getSinkContext();
313
    for (String sinkName : sinkNames) {
407
    for (String sinkName : sinkNames) {
314
      Context context = sinkContexts.get(sinkName);
408
      Context context = sinkContexts.get(sinkName);
315
      if(context != null) {
409
      if(context != null) {
316
        Sink sink = sinkFactory.create(sinkName, context.getString(
410
        Sink sink = sinkFactory.create(sinkName, context.getString(
317
            BasicConfigurationConstants.CONFIG_TYPE));
411
            BasicConfigurationConstants.CONFIG_TYPE));

    
   
412
        try {
318
        Configurables.configure(sink, context);
413
          Configurables.configure(sink, context);
319

    
   
414
          ChannelComponent channelComponent = channelComponentMap.
320
        sink.setChannel(channels.get(context.getString(
415
              get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
321
            BasicConfigurationConstants.CONFIG_CHANNEL)));
416
          if(channelComponent == null) {

    
   
417
            String msg = String.format("Sink %s is not connected to a " +

    
   
418
                "channel",  sinkName);

    
   
419
            throw new IllegalStateException(msg);

    
   
420
          }

    
   
421
          sink.setChannel(channelComponent.channel);
322
        sinks.put(sinkName, sink);
422
          sinks.put(sinkName, sink);

    
   
423
          channelComponent.components.add(sinkName);

    
   
424
        } catch (Exception e) {

    
   
425
          String msg = String.format("Sink %s has been removed due to an " +

    
   
426
              "error during configuration", sinkName);

    
   
427
          LOGGER.error(msg, e);

    
   
428
        }
323
      }
429
      }
324
    }
430
    }
325

    
   
431

   
326
    loadSinkGroups(agentConf, sinks, conf);
432
    loadSinkGroups(agentConf, sinks, sinkRunnerMap);
327
  }
433
  }
328

    
   
434

   
329
  private void loadSinkGroups(AgentConfiguration agentConf,
435
  private void loadSinkGroups(AgentConfiguration agentConf,
330
      Map<String, Sink> sinks, MaterializedConfiguration conf)
436
      Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
331
          throws InstantiationException {
437
          throws InstantiationException {
332
    Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
438
    Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
333
    Map<String, ComponentConfiguration> compMap =
439
    Map<String, ComponentConfiguration> compMap =
334
        agentConf.getSinkGroupConfigMap();
440
        agentConf.getSinkGroupConfigMap();
335
    Map<String, String> usedSinks = new HashMap<String, String>();
441
    Map<String, String> usedSinks = new HashMap<String, String>();
336
    for (String groupName: sinkgroupNames) {
442
    for (String groupName: sinkGroupNames) {
337
      ComponentConfiguration comp = compMap.get(groupName);
443
      ComponentConfiguration comp = compMap.get(groupName);
338
      if(comp != null) {
444
      if(comp != null) {
339
        SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
445
        SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
340
        List<String> groupSinkList = groupConf.getSinks();

   
341
        List<Sink> groupSinks = new ArrayList<Sink>();
446
        List<Sink> groupSinks = new ArrayList<Sink>();
342
        for (String sink : groupSinkList) {
447
        for (String sink : groupConf.getSinks()) {
343
          Sink s = sinks.remove(sink);
448
          Sink s = sinks.remove(sink);
344
          if (s == null) {
449
          if (s == null) {
345
            String sinkUser = usedSinks.get(sink);
450
            String sinkUser = usedSinks.get(sink);
346
            if (sinkUser != null) {
451
            if (sinkUser != null) {
347
              throw new InstantiationException(String.format(
452
              throw new InstantiationException(String.format(
[+20] [20] 7 lines
[+20] private void loadSinks(AgentConfiguration agentConf, MaterializedConfiguration conf) [+] private void loadSinks(AgentConfiguration agentConf,
355
            }
460
            }
356
          }
461
          }
357
          groupSinks.add(s);
462
          groupSinks.add(s);
358
          usedSinks.put(sink, groupName);
463
          usedSinks.put(sink, groupName);
359
        }
464
        }

    
   
465
        try {
360
        SinkGroup group = new SinkGroup(groupSinks);
466
          SinkGroup group = new SinkGroup(groupSinks);
361
        Configurables.configure(group, groupConf);
467
          Configurables.configure(group, groupConf);
362
        conf.addSinkRunner(comp.getComponentName(),
468
          sinkRunnerMap.put(comp.getComponentName(),
363
            new SinkRunner(group.getProcessor()));
469
              new SinkRunner(group.getProcessor()));

    
   
470
        } catch (Exception e) {

    
   
471
          String msg = String.format("SinkGroup %s has been removed due to " +

    
   
472
              "an error during configuration", groupName);

    
   
473
          LOGGER.error(msg, e);

    
   
474
        }
364
      }
475
      }
365
    }
476
    }
366
    // add any unassigned sinks to solo collectors
477
    // add any unassigned sinks to solo collectors
367
    for(Entry<String, Sink> entry : sinks.entrySet()) {
478
    for(Entry<String, Sink> entry : sinks.entrySet()) {
368
      if (!usedSinks.containsValue(entry.getKey())) {
479
      if (!usedSinks.containsValue(entry.getKey())) {

    
   
480
        try {
369
        SinkProcessor pr = new DefaultSinkProcessor();
481
          SinkProcessor pr = new DefaultSinkProcessor();
370
        List<Sink> sinkMap = new ArrayList<Sink>();
482
          List<Sink> sinkMap = new ArrayList<Sink>();
371
        sinkMap.add(entry.getValue());
483
          sinkMap.add(entry.getValue());
372
        pr.setSinks(sinkMap);
484
          pr.setSinks(sinkMap);
373
        Configurables.configure(pr, new Context());
485
          Configurables.configure(pr, new Context());
374
        conf.addSinkRunner(entry.getKey(),
486
          sinkRunnerMap.put(entry.getKey(),
375
            new SinkRunner(pr));
487
              new SinkRunner(pr));

    
   
488
        } catch(Exception e) {

    
   
489
          String msg = String.format("SinkGroup %s has been removed due to " +

    
   
490
              "an error during configuration", entry.getKey());

    
   
491
          LOGGER.error(msg, e);

    
   
492
        }

    
   
493
      }

    
   
494
    }
376
      }
495
  }

    
   
496
  private static class ChannelComponent {

    
   
497
    final Channel channel;

    
   
498
    final List<String> components;

    
   
499
    ChannelComponent(Channel channel) {

    
   
500
      this.channel = channel;

    
   
501
      components = Lists.newArrayList();
377
    }
502
    }
378
  }
503
  }
379
}
504
}
flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
Revision 25001b1 New Change
 
  1. flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java: Loading...
  2. flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java: Loading...