Review Board 1.7.22


FLUME-1772: AbstractConfigurationProvider should remove component which throws exception from configure method.

Review Request #8421 - Created Dec. 7, 2012 and submitted

Brock Noland
trunk
FLUME-1772
Reviewers
Flume
flume-git
Catches Exception around all the configure() methods of components. If they throw an exception, they are removed from the configuration (not added).
Tests added for source, channel, and sink.
flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
Revision daef76b New Change
[20] 16 lines
[+20]
17
 */
17
 */
18
package org.apache.flume.node;
18
package org.apache.flume.node;
19

    
   
19

   
20
import java.util.ArrayList;
20
import java.util.ArrayList;
21
import java.util.HashMap;
21
import java.util.HashMap;

    
   
22
import java.util.HashSet;
22
import java.util.List;
23
import java.util.List;
23
import java.util.Map;
24
import java.util.Map;
24
import java.util.Map.Entry;
25
import java.util.Map.Entry;
25
import java.util.Set;
26
import java.util.Set;
26

    
   
27

   
[+20] [20] 27 lines
[+20]
54
import org.apache.flume.sink.SinkGroup;
55
import org.apache.flume.sink.SinkGroup;
55
import org.apache.flume.source.DefaultSourceFactory;
56
import org.apache.flume.source.DefaultSourceFactory;
56
import org.slf4j.Logger;
57
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
58
import org.slf4j.LoggerFactory;
58

    
   
59

   

    
   
60
import com.google.common.base.Preconditions;
59
import com.google.common.collect.ArrayListMultimap;
61
import com.google.common.collect.ArrayListMultimap;
60
import com.google.common.collect.ImmutableMap;

   
61
import com.google.common.collect.ListMultimap;
62
import com.google.common.collect.ListMultimap;

    
   
63
import com.google.common.collect.Lists;

    
   
64
import com.google.common.collect.Maps;
62

    
   
65

   
63
public abstract class AbstractConfigurationProvider implements
66
public abstract class AbstractConfigurationProvider implements
64
    ConfigurationProvider {
67
    ConfigurationProvider {
65

    
   
68

   
66
  private static final Logger LOGGER = LoggerFactory
69
  private static final Logger LOGGER = LoggerFactory
67
      .getLogger(AbstractConfigurationProvider.class);
70
      .getLogger(AbstractConfigurationProvider.class);
68

    
   
71

   
69
  private final String agentName;
72
  private final String agentName;
70
  private final SourceFactory sourceFactory;
73
  private final SourceFactory sourceFactory;
71
  private final SinkFactory sinkFactory;
74
  private final SinkFactory sinkFactory;
72
  private final ChannelFactory channelFactory;
75
  private final ChannelFactory channelFactory;
73

    
   
76

   
74
  private final Map<Class<? extends Channel>, Map<String, Channel>> channels;

   
75

    
   
77

   

    
   
78
  private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache;

    
   
79

   
76
  public AbstractConfigurationProvider(String agentName) {
80
  public AbstractConfigurationProvider(String agentName) {
77
    super();
81
    super();
78
    this.agentName = agentName;
82
    this.agentName = agentName;
79
    this.sourceFactory = new DefaultSourceFactory();
83
    this.sourceFactory = new DefaultSourceFactory();
80
    this.sinkFactory = new DefaultSinkFactory();
84
    this.sinkFactory = new DefaultSinkFactory();
81
    this.channelFactory = new DefaultChannelFactory();
85
    this.channelFactory = new DefaultChannelFactory();
82

    
   
86

   
83
    channels = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
87
    channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
84
  }
88
  }
85

    
   
89

   
86
  protected abstract FlumeConfiguration getFlumeConfiguration();
90
  protected abstract FlumeConfiguration getFlumeConfiguration();
87

    
   
91

   
88
  public MaterializedConfiguration getConfiguration() {
92
  public MaterializedConfiguration getConfiguration() {
89
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
93
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
90
    FlumeConfiguration fconfig = getFlumeConfiguration();
94
    FlumeConfiguration fconfig = getFlumeConfiguration();
91
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
95
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
92
    if (agentConf != null) {
96
    if (agentConf != null) {

    
   
97
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();

    
   
98
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();

    
   
99
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
93
      try {
100
      try {
94
        loadChannels(agentConf, conf);
101
        loadChannels(agentConf, channelComponentMap);
95
        loadSources(agentConf, conf);
102
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
96
        loadSinks(agentConf, conf);
103
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);

    
   
104
        Set<String> channelNames =

    
   
105
            new HashSet<String>(channelComponentMap.keySet());

    
   
106
        for(String channelName : channelNames) {

    
   
107
          ChannelComponent channelComponent = channelComponentMap.

    
   
108
              get(channelName);

    
   
109
          if(channelComponent.components.isEmpty()) {

    
   
110
            LOGGER.warn(String.format("Channel %s has no components connected" +

    
   
111
                " and has been removed.", channelName));

    
   
112
            channelComponentMap.remove(channelName);

    
   
113
            Map<String, Channel> nameChannelMap = channelCache.

    
   
114
                get(channelComponent.channel.getClass());

    
   
115
            if(nameChannelMap != null) {

    
   
116
              nameChannelMap.remove(channelName);

    
   
117
            }

    
   
118
          } else {

    
   
119
            LOGGER.info(String.format("Channel %s connected to %s",

    
   
120
                channelName, channelComponent.components.toString()));

    
   
121
            conf.addChannel(channelName, channelComponent.channel);

    
   
122
          }

    
   
123
        }

    
   
124
        for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {

    
   
125
          conf.addSourceRunner(entry.getKey(), entry.getValue());

    
   
126
        }

    
   
127
        for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {

    
   
128
          conf.addSinkRunner(entry.getKey(), entry.getValue());

    
   
129
        }
97
      } catch (InstantiationException ex) {
130
      } catch (InstantiationException ex) {
98
        LOGGER.error("Failed to instantiate component", ex);
131
        LOGGER.error("Failed to instantiate component", ex);

    
   
132
      } finally {

    
   
133
        channelComponentMap.clear();

    
   
134
        sourceRunnerMap.clear();

    
   
135
        sinkRunnerMap.clear();
99
      }
136
      }
100
    } else {
137
    } else {
101
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
138
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
102
    }
139
    }
103
    return conf;
140
    return conf;
104
  }
141
  }
105

    
   
142

   
106
  public String getAgentName() {
143
  public String getAgentName() {
107
    return agentName;
144
    return agentName;
108
  }
145
  }
109

    
   
146

   
110

    
   

   
111
  private void loadChannels(AgentConfiguration agentConf,
147
  private void loadChannels(AgentConfiguration agentConf,
112
      MaterializedConfiguration conf) throws InstantiationException {
148
      Map<String, ChannelComponent> channelComponentMap)

    
   
149
          throws InstantiationException {
113
    LOGGER.info("Creating channels");
150
    LOGGER.info("Creating channels");
114

    
   
151

   
115
    /*
152
    /*
116
     * Some channels will be reused across re-configurations. To handle this,
153
     * Some channels will be reused across re-configurations. To handle this,
117
     * we store all the names of current channels, perform the reconfiguration,
154
     * we store all the names of current channels, perform the reconfiguration,
118
     * and then if a channel was not used, we delete our reference to it.
155
     * and then if a channel was not used, we delete our reference to it.
119
     * This supports the scenario where you enable channel "ch0" then remove it
156
     * This supports the scenario where you enable channel "ch0" then remove it
120
     * and add it back. Without this, channels like memory channel would cause
157
     * and add it back. Without this, channels like memory channel would cause
121
     * the first instances data to show up in the seconds.
158
     * the first instances data to show up in the seconds.
122
     */
159
     */
123
    ListMultimap<Class<? extends Channel>, String> channelsNotReused =
160
    ListMultimap<Class<? extends Channel>, String> channelsNotReused =
124
        ArrayListMultimap.create();
161
        ArrayListMultimap.create();
125
    // assume all channels will not be re-used
162
    // assume all channels will not be re-used
126
    for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : channels.entrySet()) {
163
    for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : channelCache.entrySet()) {
127
      Class<? extends Channel> channelKlass = entry.getKey();
164
      Class<? extends Channel> channelKlass = entry.getKey();
128
      Set<String> channelNames = entry.getValue().keySet();
165
      Set<String> channelNames = entry.getValue().keySet();
129
      channelsNotReused.get(channelKlass).addAll(channelNames);
166
      channelsNotReused.get(channelKlass).addAll(channelNames);
130
    }
167
    }
131

    
   
168

   
[+20] [20] 6 lines
[+20] private void loadChannels(AgentConfiguration agentConf,
138
    for (String chName : channelNames) {
175
    for (String chName : channelNames) {
139
      ComponentConfiguration comp = compMap.get(chName);
176
      ComponentConfiguration comp = compMap.get(chName);
140
      if(comp != null) {
177
      if(comp != null) {
141
        Channel channel = getOrCreateChannel(channelsNotReused,
178
        Channel channel = getOrCreateChannel(channelsNotReused,
142
            comp.getComponentName(), comp.getType());
179
            comp.getComponentName(), comp.getType());

    
   
180
        try {
143
        Configurables.configure(channel, comp);
181
          Configurables.configure(channel, comp);
144
        conf.addChannel(comp.getComponentName(), channel);
182
          channelComponentMap.put(comp.getComponentName(),

    
   
183
              new ChannelComponent(channel));
145
        LOGGER.info("Created channel " + chName);
184
          LOGGER.info("Created channel " + chName);

    
   
185
        } catch (Exception e) {

    
   
186
          String msg = String.format("Channel %s has been removed due to an " +

    
   
187
              "error during configuration", chName);

    
   
188
          LOGGER.error(msg, e);

    
   
189
        }
146
      }
190
      }
147
    }
191
    }
148
    /*
192
    /*
149
     * Components which DO NOT have a ComponentConfiguration object
193
     * Components which DO NOT have a ComponentConfiguration object
150
     * and use only Context
194
     * and use only Context
151
     */
195
     */
152
    for (String chName : channelNames) {
196
    for (String chName : channelNames) {
153
      Context context = agentConf.getChannelContext().get(chName);
197
      Context context = agentConf.getChannelContext().get(chName);
154
      if(context != null){
198
      if(context != null){
155
        Channel channel =
199
        Channel channel =
156
            getOrCreateChannel(channelsNotReused, chName, context.getString(
200
            getOrCreateChannel(channelsNotReused, chName, context.getString(
157
                BasicConfigurationConstants.CONFIG_TYPE));
201
                BasicConfigurationConstants.CONFIG_TYPE));

    
   
202
        try {
158
        Configurables.configure(channel, context);
203
          Configurables.configure(channel, context);
159
        conf.addChannel(chName, channel);
204
          channelComponentMap.put(chName, new ChannelComponent(channel));
160
        LOGGER.info("Created channel " + chName);
205
          LOGGER.info("Created channel " + chName);

    
   
206
        } catch (Exception e) {

    
   
207
          String msg = String.format("Channel %s has been removed due to an " +

    
   
208
                "error during configuration", chName);

    
   
209
          LOGGER.error(msg, e);

    
   
210
        }
161
      }
211
      }
162
    }
212
    }
163
    /*
213
    /*
164
     * Any channel which was not re-used, will have it's reference removed
214
     * Any channel which was not re-used, will have it's reference removed
165
     */
215
     */
166
    for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
216
    for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
167
      Map<String, Channel> channelMap = channels.get(channelKlass);
217
      Map<String, Channel> channelMap = channelCache.get(channelKlass);
168
      if (channelMap != null) {
218
      if (channelMap != null) {
169
        for (String channelName : channelsNotReused.get(channelKlass)) {
219
        for (String channelName : channelsNotReused.get(channelKlass)) {
170
          if(channelMap.remove(channelName) != null) {
220
          if(channelMap.remove(channelName) != null) {
171
            LOGGER.info("Removed {} of type {}", channelName, channelKlass);
221
            LOGGER.info("Removed {} of type {}", channelName, channelKlass);
172
          }
222
          }
173
        }
223
        }
174
        if (channelMap.isEmpty()) {
224
        if (channelMap.isEmpty()) {
175
          channels.remove(channelKlass);
225
          channelCache.remove(channelKlass);
176
        }
226
        }
177
      }
227
      }
178
    }
228
    }
179
  }
229
  }
180

    
   
230

   
[+20] [20] 10 lines
[+20] [+] private Channel getOrCreateChannel(
191
    if(channelClass.isAnnotationPresent(Disposable.class)) {
241
    if(channelClass.isAnnotationPresent(Disposable.class)) {
192
      Channel channel = channelFactory.create(name, type);
242
      Channel channel = channelFactory.create(name, type);
193
      channel.setName(name);
243
      channel.setName(name);
194
      return channel;
244
      return channel;
195
    }
245
    }
196
    Map<String, Channel> channelMap = channels.get(channelClass);
246
    Map<String, Channel> channelMap = channelCache.get(channelClass);
197
    if (channelMap == null) {
247
    if (channelMap == null) {
198
      channelMap = new HashMap<String, Channel>();
248
      channelMap = new HashMap<String, Channel>();
199
      channels.put(channelClass, channelMap);
249
      channelCache.put(channelClass, channelMap);
200
    }
250
    }
201
    Channel channel = channelMap.get(name);
251
    Channel channel = channelMap.get(name);
202
    if(channel == null) {
252
    if(channel == null) {
203
      channel = channelFactory.create(name, type);
253
      channel = channelFactory.create(name, type);
204
      channel.setName(name);
254
      channel.setName(name);
205
      channelMap.put(name, channel);
255
      channelMap.put(name, channel);
206
    }
256
    }
207
    channelsNotReused.get(channelClass).remove(name);
257
    channelsNotReused.get(channelClass).remove(name);
208
    return channel;
258
    return channel;
209
  }
259
  }
210

    
   
260

   
211
  private void loadSources(AgentConfiguration agentConf, MaterializedConfiguration conf)
261
  private void loadSources(AgentConfiguration agentConf,

    
   
262
      Map<String, ChannelComponent> channelComponentMap,

    
   
263
      Map<String, SourceRunner> sourceRunnerMap)
212
      throws InstantiationException {
264
      throws InstantiationException {
213

    
   
265

   
214
    Set<String> sources = agentConf.getSourceSet();
266
    Set<String> sourceNames = agentConf.getSourceSet();
215
    Map<String, ComponentConfiguration> compMap =
267
    Map<String, ComponentConfiguration> compMap =
216
        agentConf.getSourceConfigMap();
268
        agentConf.getSourceConfigMap();
217
    /*
269
    /*
218
     * Components which have a ComponentConfiguration object
270
     * Components which have a ComponentConfiguration object
219
     */
271
     */
220
    for (String sourceName : sources) {
272
    for (String sourceName : sourceNames) {
221
      ComponentConfiguration comp = compMap.get(sourceName);
273
      ComponentConfiguration comp = compMap.get(sourceName);
222
      if(comp != null) {
274
      if(comp != null) {
223
        SourceConfiguration config = (SourceConfiguration) comp;
275
        SourceConfiguration config = (SourceConfiguration) comp;
224

    
   
276

   
225
        Source source = sourceFactory.create(comp.getComponentName(),
277
        Source source = sourceFactory.create(comp.getComponentName(),
226
            comp.getType());
278
            comp.getType());
227

    
   
279
        try {
228
        Configurables.configure(source, config);
280
          Configurables.configure(source, config);
229
        Set<String> channelNames = config.getChannels();
281
          Set<String> channelNames = config.getChannels();
230
        List<Channel> channels = new ArrayList<Channel>();
282
          List<Channel> sourceChannels = new ArrayList<Channel>();
231
        for (String chName : channelNames) {
283
          for (String chName : channelNames) {
232
          channels.add(conf.getChannels().get(chName));
284
            ChannelComponent channelComponent = channelComponentMap.get(chName);

    
   
285
            if(channelComponent != null) {

    
   
286
              sourceChannels.add(channelComponent.channel);

    
   
287
            }

    
   
288
          }

    
   
289
          if(sourceChannels.isEmpty()) {

    
   
290
            String msg = String.format("Source %s is not connected to a " +

    
   
291
                "channel",  sourceName);

    
   
292
            throw new IllegalStateException(msg);
233
        }
293
          }
234

    
   

   
235
        ChannelSelectorConfiguration selectorConfig =
294
          ChannelSelectorConfiguration selectorConfig =
236
            config.getSelectorConfiguration();
295
              config.getSelectorConfiguration();
237

    
   
296

   
238
        ChannelSelector selector = ChannelSelectorFactory.create(
297
          ChannelSelector selector = ChannelSelectorFactory.create(
239
            channels, selectorConfig);
298
              sourceChannels, selectorConfig);
240

    
   
299

   
241
        ChannelProcessor channelProcessor = new ChannelProcessor(selector);
300
          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
242
        Configurables.configure(channelProcessor, config);
301
          Configurables.configure(channelProcessor, config);
243

    
   
302

   
244
        source.setChannelProcessor(channelProcessor);
303
          source.setChannelProcessor(channelProcessor);
245
        conf.addSourceRunner(comp.getComponentName(),
304
          sourceRunnerMap.put(comp.getComponentName(),
246
            SourceRunner.forSource(source));
305
              SourceRunner.forSource(source));

    
   
306
          for(Channel channel : sourceChannels) {

    
   
307
            ChannelComponent channelComponent = Preconditions.

    
   
308
                checkNotNull(channelComponentMap.get(channel.getName()),

    
   
309
                    String.format("Channel %s", channel.getName()));

    
   
310
            channelComponent.components.add(sourceName);

    
   
311
          }

    
   
312
        } catch (Exception e) {

    
   
313
          String msg = String.format("Source %s has been removed due to an " +

    
   
314
              "error during configuration", sourceName);

    
   
315
          LOGGER.error(msg, e);

    
   
316
        }
247
      }
317
      }
248
    }
318
    }
249
    /*
319
    /*
250
     * Components which DO NOT have a ComponentConfiguration object
320
     * Components which DO NOT have a ComponentConfiguration object
251
     * and use only Context
321
     * and use only Context
252
     */
322
     */
253
    Map<String, Context> sourceContexts = agentConf.getSourceContext();
323
    Map<String, Context> sourceContexts = agentConf.getSourceContext();
254
    for (String sourceName : sources) {
324
    for (String sourceName : sourceNames) {
255
      Context context = sourceContexts.get(sourceName);
325
      Context context = sourceContexts.get(sourceName);
256
      if(context != null){
326
      if(context != null){
257
        Source source =
327
        Source source =
258
            sourceFactory.create(sourceName,
328
            sourceFactory.create(sourceName,
259
                context.getString(BasicConfigurationConstants.CONFIG_TYPE));
329
                context.getString(BasicConfigurationConstants.CONFIG_TYPE));
260
        List<Channel> channels = new ArrayList<Channel>();
330
        try {
261
        Configurables.configure(source, context);
331
          Configurables.configure(source, context);

    
   
332
          List<Channel> sourceChannels = new ArrayList<Channel>();
262
        String[] channelNames = context.getString(
333
          String[] channelNames = context.getString(
263
            BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
334
              BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
264
        for (String chName : channelNames) {
335
          for (String chName : channelNames) {
265
          channels.add(conf.getChannels().get(chName));
336
            ChannelComponent channelComponent = channelComponentMap.get(chName);

    
   
337
            if(channelComponent != null) {

    
   
338
              sourceChannels.add(channelComponent.channel);

    
   
339
            }

    
   
340
          }

    
   
341
          if(sourceChannels.isEmpty()) {

    
   
342
            String msg = String.format("Source %s is not connected to a " +

    
   
343
                "channel",  sourceName);

    
   
344
            throw new IllegalStateException(msg);
266
        }
345
          }
267

    
   

   
268
        Map<String, String> selectorConfig = context.getSubProperties(
346
          Map<String, String> selectorConfig = context.getSubProperties(
269
            BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
347
              BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
270

    
   
348

   
271
        ChannelSelector selector = ChannelSelectorFactory.create(
349
          ChannelSelector selector = ChannelSelectorFactory.create(
272
            channels, selectorConfig);
350
              sourceChannels, selectorConfig);
273

    
   
351

   
274
        ChannelProcessor channelProcessor = new ChannelProcessor(selector);
352
          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
275
        Configurables.configure(channelProcessor, context);
353
          Configurables.configure(channelProcessor, context);
276

    
   

   
277
        source.setChannelProcessor(channelProcessor);
354
          source.setChannelProcessor(channelProcessor);
278
        conf.addSourceRunner(sourceName,
355
          sourceRunnerMap.put(sourceName,
279
            SourceRunner.forSource(source));
356
              SourceRunner.forSource(source));
280

    
   
357
          for(Channel channel : sourceChannels) {

    
   
358
            ChannelComponent channelComponent = Preconditions.

    
   
359
                checkNotNull(channelComponentMap.get(channel.getName()),

    
   
360
                    String.format("Channel %s", channel.getName()));

    
   
361
            channelComponent.components.add(sourceName);

    
   
362
          }

    
   
363
        } catch (Exception e) {

    
   
364
          String msg = String.format("Source %s has been removed due to an " +

    
   
365
              "error during configuration", sourceName);

    
   
366
          LOGGER.error(msg, e);

    
   
367
        }
281
      }
368
      }
282
    }
369
    }
283
  }
370
  }
284

    
   
371

   
285
  private void loadSinks(AgentConfiguration agentConf, MaterializedConfiguration conf)
372
  private void loadSinks(AgentConfiguration agentConf,

    
   
373
      Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
286
      throws InstantiationException {
374
      throws InstantiationException {
287
    Set<String> sinkNames = agentConf.getSinkSet();
375
    Set<String> sinkNames = agentConf.getSinkSet();
288
    ImmutableMap<String,Channel> channels = conf.getChannels();

   
289
    Map<String, ComponentConfiguration> compMap =
376
    Map<String, ComponentConfiguration> compMap =
290
        agentConf.getSinkConfigMap();
377
        agentConf.getSinkConfigMap();
291
    Map<String, Sink> sinks = new HashMap<String, Sink>();
378
    Map<String, Sink> sinks = new HashMap<String, Sink>();
292
    /*
379
    /*
293
     * Components which have a ComponentConfiguration object
380
     * Components which have a ComponentConfiguration object
294
     */
381
     */
295
    for (String sinkName : sinkNames) {
382
    for (String sinkName : sinkNames) {
296
      ComponentConfiguration comp = compMap.get(sinkName);
383
      ComponentConfiguration comp = compMap.get(sinkName);
297
      if(comp != null) {
384
      if(comp != null) {
298
        SinkConfiguration config = (SinkConfiguration) comp;
385
        SinkConfiguration config = (SinkConfiguration) comp;
299
        Sink sink = sinkFactory.create(comp.getComponentName(),
386
        Sink sink = sinkFactory.create(comp.getComponentName(),
300
            comp.getType());
387
            comp.getType());
301

    
   
388
        try {
302
        Configurables.configure(sink, config);
389
          Configurables.configure(sink, config);
303

    
   
390
          ChannelComponent channelComponent = channelComponentMap.
304
        sink.setChannel(channels.get(config.getChannel()));
391
              get(config.getChannel());

    
   
392
          if(channelComponent == null) {

    
   
393
            String msg = String.format("Sink %s is not connected to a " +

    
   
394
                "channel",  sinkName);

    
   
395
            throw new IllegalStateException(msg);

    
   
396
          }

    
   
397
          sink.setChannel(channelComponent.channel);
305
        sinks.put(comp.getComponentName(), sink);
398
          sinks.put(comp.getComponentName(), sink);

    
   
399
          channelComponent.components.add(sinkName);

    
   
400
        } catch (Exception e) {

    
   
401
          String msg = String.format("Sink %s has been removed due to an " +

    
   
402
              "error during configuration", sinkName);

    
   
403
          LOGGER.error(msg, e);

    
   
404
        }
306
      }
405
      }
307
    }
406
    }
308
    /*
407
    /*
309
     * Components which DO NOT have a ComponentConfiguration object
408
     * Components which DO NOT have a ComponentConfiguration object
310
     * and use only Context
409
     * and use only Context
311
     */
410
     */
312
    Map<String, Context> sinkContexts = agentConf.getSinkContext();
411
    Map<String, Context> sinkContexts = agentConf.getSinkContext();
313
    for (String sinkName : sinkNames) {
412
    for (String sinkName : sinkNames) {
314
      Context context = sinkContexts.get(sinkName);
413
      Context context = sinkContexts.get(sinkName);
315
      if(context != null) {
414
      if(context != null) {
316
        Sink sink = sinkFactory.create(sinkName, context.getString(
415
        Sink sink = sinkFactory.create(sinkName, context.getString(
317
            BasicConfigurationConstants.CONFIG_TYPE));
416
            BasicConfigurationConstants.CONFIG_TYPE));

    
   
417
        try {
318
        Configurables.configure(sink, context);
418
          Configurables.configure(sink, context);
319

    
   
419
          ChannelComponent channelComponent = channelComponentMap.
320
        sink.setChannel(channels.get(context.getString(
420
              get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
321
            BasicConfigurationConstants.CONFIG_CHANNEL)));
421
          if(channelComponent == null) {

    
   
422
            String msg = String.format("Sink %s is not connected to a " +

    
   
423
                "channel",  sinkName);

    
   
424
            throw new IllegalStateException(msg);

    
   
425
          }

    
   
426
          sink.setChannel(channelComponent.channel);
322
        sinks.put(sinkName, sink);
427
          sinks.put(sinkName, sink);

    
   
428
          channelComponent.components.add(sinkName);

    
   
429
        } catch (Exception e) {

    
   
430
          String msg = String.format("Sink %s has been removed due to an " +

    
   
431
              "error during configuration", sinkName);

    
   
432
          LOGGER.error(msg, e);

    
   
433
        }
323
      }
434
      }
324
    }
435
    }
325

    
   
436

   
326
    loadSinkGroups(agentConf, sinks, conf);
437
    loadSinkGroups(agentConf, sinks, sinkRunnerMap);
327
  }
438
  }
328

    
   
439

   
329
  private void loadSinkGroups(AgentConfiguration agentConf,
440
  private void loadSinkGroups(AgentConfiguration agentConf,
330
      Map<String, Sink> sinks, MaterializedConfiguration conf)
441
      Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
331
          throws InstantiationException {
442
          throws InstantiationException {
332
    Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
443
    Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
333
    Map<String, ComponentConfiguration> compMap =
444
    Map<String, ComponentConfiguration> compMap =
334
        agentConf.getSinkGroupConfigMap();
445
        agentConf.getSinkGroupConfigMap();
335
    Map<String, String> usedSinks = new HashMap<String, String>();
446
    Map<String, String> usedSinks = new HashMap<String, String>();
336
    for (String groupName: sinkgroupNames) {
447
    for (String groupName: sinkGroupNames) {
337
      ComponentConfiguration comp = compMap.get(groupName);
448
      ComponentConfiguration comp = compMap.get(groupName);
338
      if(comp != null) {
449
      if(comp != null) {
339
        SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
450
        SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
340
        List<String> groupSinkList = groupConf.getSinks();

   
341
        List<Sink> groupSinks = new ArrayList<Sink>();
451
        List<Sink> groupSinks = new ArrayList<Sink>();
342
        for (String sink : groupSinkList) {
452
        for (String sink : groupConf.getSinks()) {
343
          Sink s = sinks.remove(sink);
453
          Sink s = sinks.remove(sink);
344
          if (s == null) {
454
          if (s == null) {
345
            String sinkUser = usedSinks.get(sink);
455
            String sinkUser = usedSinks.get(sink);
346
            if (sinkUser != null) {
456
            if (sinkUser != null) {
347
              throw new InstantiationException(String.format(
457
              throw new InstantiationException(String.format(
[+20] [20] 7 lines
[+20] private void loadSinks(AgentConfiguration agentConf, MaterializedConfiguration conf) [+] private void loadSinks(AgentConfiguration agentConf,
355
            }
465
            }
356
          }
466
          }
357
          groupSinks.add(s);
467
          groupSinks.add(s);
358
          usedSinks.put(sink, groupName);
468
          usedSinks.put(sink, groupName);
359
        }
469
        }

    
   
470
        try {
360
        SinkGroup group = new SinkGroup(groupSinks);
471
          SinkGroup group = new SinkGroup(groupSinks);
361
        Configurables.configure(group, groupConf);
472
          Configurables.configure(group, groupConf);
362
        conf.addSinkRunner(comp.getComponentName(),
473
          sinkRunnerMap.put(comp.getComponentName(),
363
            new SinkRunner(group.getProcessor()));
474
              new SinkRunner(group.getProcessor()));

    
   
475
        } catch (Exception e) {

    
   
476
          String msg = String.format("SinkGroup %s has been removed due to " +

    
   
477
              "an error during configuration", groupName);

    
   
478
          LOGGER.error(msg, e);

    
   
479
        }
364
      }
480
      }
365
    }
481
    }
366
    // add any unassigned sinks to solo collectors
482
    // add any unassigned sinks to solo collectors
367
    for(Entry<String, Sink> entry : sinks.entrySet()) {
483
    for(Entry<String, Sink> entry : sinks.entrySet()) {
368
      if (!usedSinks.containsValue(entry.getKey())) {
484
      if (!usedSinks.containsValue(entry.getKey())) {

    
   
485
        try {
369
        SinkProcessor pr = new DefaultSinkProcessor();
486
          SinkProcessor pr = new DefaultSinkProcessor();
370
        List<Sink> sinkMap = new ArrayList<Sink>();
487
          List<Sink> sinkMap = new ArrayList<Sink>();
371
        sinkMap.add(entry.getValue());
488
          sinkMap.add(entry.getValue());
372
        pr.setSinks(sinkMap);
489
          pr.setSinks(sinkMap);
373
        Configurables.configure(pr, new Context());
490
          Configurables.configure(pr, new Context());
374
        conf.addSinkRunner(entry.getKey(),
491
          sinkRunnerMap.put(entry.getKey(),
375
            new SinkRunner(pr));
492
              new SinkRunner(pr));

    
   
493
        } catch(Exception e) {

    
   
494
          String msg = String.format("SinkGroup %s has been removed due to " +

    
   
495
              "an error during configuration", entry.getKey());

    
   
496
          LOGGER.error(msg, e);

    
   
497
        }

    
   
498
      }

    
   
499
    }
376
      }
500
  }

    
   
501
  private static class ChannelComponent {

    
   
502
    final Channel channel;

    
   
503
    final List<String> components;

    
   
504
    ChannelComponent(Channel channel) {

    
   
505
      this.channel = channel;

    
   
506
      components = Lists.newArrayList();
377
    }
507
    }
378
  }
508
  }
379
}
509
}
flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
Revision 25001b1 New Change
 
  1. flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java: Loading...
  2. flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java: Loading...