Review Board 1.7.22


FLUME-1121: Recoverable Memory Channel cannot recover data

Review Request #4713 - Created April 13, 2012 and submitted

Brock Noland
trunk
FLUME-1121
Reviewers
Flume
flume-git
Channel.start() is not being called. This is fixed in DefaultLogicalNodeManager.

Additionally RecoverableMemoryChannel now tracks it's own capacity due to the MemoryChannel semantics being completely different. Basically, if we rely on MemoryChannel capacity, then an error will be thrown when we commit the MemoryChannelTransaction. However, we will have already committed this data to disk. If we commit to MemoryChannel first (there by checking capacity) we could fail to write to disk resulting in data which is only in memory.

Also, I ran cleanup on modules touched. This removes whitespace, unused imports, and adds @Override tags where needed. This is one time cleanup which allows automated cleanup in the future.
All unit tests pass and manual testing passes as well.

Diff revision 3 (Latest)

1 2 3
1 2 3

  1. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java: Loading...
  2. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java: Loading...
  3. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java: Loading...
  4. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java: Loading...
  5. flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java: Loading...
  6. flume-ng-core/src/main/java/org/apache/flume/SinkProcessorType.java: Loading...
  7. flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java: Loading...
  8. flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java: Loading...
  9. flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java: Loading...
  10. flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java: Loading...
  11. flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java: Loading...
  12. flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java: Loading...
  13. flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java: Loading...
  14. flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java: Loading...
  15. flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java: Loading...
  16. flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java: Loading...
  17. flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java: Loading...
  18. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java: Loading...
  19. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java: Loading...
  20. flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java: Loading...
  21. flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java: Loading...
  22. flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java: Loading...
  23. flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java: Loading...
  24. flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java: Loading...
flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
Revision 0622f27 New Change
[20] 19 lines
[+20]
20
package org.apache.flume.channel.recoverable.memory;
20
package org.apache.flume.channel.recoverable.memory;
21

    
   
21

   
22
import java.io.File;
22
import java.io.File;
23
import java.io.IOException;
23
import java.io.IOException;
24
import java.util.List;
24
import java.util.List;

    
   
25
import java.util.concurrent.Semaphore;

    
   
26
import java.util.concurrent.TimeUnit;
25
import java.util.concurrent.atomic.AtomicLong;
27
import java.util.concurrent.atomic.AtomicLong;
26

    
   
28

   
27
import org.apache.flume.Channel;
29
import org.apache.flume.Channel;
28
import org.apache.flume.ChannelException;
30
import org.apache.flume.ChannelException;
29
import org.apache.flume.Context;
31
import org.apache.flume.Context;
[+20] [20] 27 lines
[+20] [+] public class RecoverableMemoryChannel extends BasicChannelSemantics {
57
  public static final String WAL_DATA_DIR = "wal.dataDir";
59
  public static final String WAL_DATA_DIR = "wal.dataDir";
58
  public static final String WAL_ROLL_SIZE = "wal.rollSize";
60
  public static final String WAL_ROLL_SIZE = "wal.rollSize";
59
  public static final String WAL_MAX_LOGS_SIZE = "wal.maxLogsSize";
61
  public static final String WAL_MAX_LOGS_SIZE = "wal.maxLogsSize";
60
  public static final String WAL_MIN_RENTENTION_PERIOD = "wal.minRententionPeriod";
62
  public static final String WAL_MIN_RENTENTION_PERIOD = "wal.minRententionPeriod";
61
  public static final String WAL_WORKER_INTERVAL = "wal.workerInterval";
63
  public static final String WAL_WORKER_INTERVAL = "wal.workerInterval";

    
   
64
  public static final String CAPACITY = "capacity";

    
   
65
  public static final String KEEPALIVE = "keep-alive";

    
   
66

   

    
   
67
  public static final int DEFAULT_CAPACITY = 100;

    
   
68
  public static final int DEFAULT_KEEPALIVE = 3;
62

    
   
69

   
63
  private MemoryChannel memoryChannel = new MemoryChannel();
70
  private MemoryChannel memoryChannel = new MemoryChannel();
64
  private AtomicLong seqidGenerator = new AtomicLong(0);
71
  private AtomicLong seqidGenerator = new AtomicLong(0);
65
  private WAL<RecoverableMemoryChannelEvent> wal;
72
  private WAL<RecoverableMemoryChannelEvent> wal;

    
   
73
  /**

    
   
74
   * MemoryChannel checks to ensure the capacity is available

    
   
75
   * on commit. That is a problem because we need to write to

    
   
76
   * disk before we commit the data to MemoryChannel. As such

    
   
77
   * we keep track of capacity ourselves.

    
   
78
   */

    
   
79
  private Semaphore queueRemaining;

    
   
80
  private int capacity;

    
   
81
  private int keepAlive;

    
   
82
  private volatile boolean open;

    
   
83

   

    
   
84
  public RecoverableMemoryChannel() {

    
   
85
    open = false;

    
   
86
  }
66

    
   
87

   
67
  @Override
88
  @Override
68
  public void configure(Context context) {
89
  public void configure(Context context) {
69
    memoryChannel.configure(context);
90
    memoryChannel.configure(context);
70

    
   
91
    int capacity = context.getInteger(CAPACITY, DEFAULT_CAPACITY);
71
    String homePath = System.getProperty("user.home").replace('\\', '/');
92
    if(queueRemaining == null) {
72
    String dataDir = context.getString(WAL_DATA_DIR, homePath + "/.flume/recoverable-memory-channel");
93
      queueRemaining = new Semaphore(capacity, true);
73
    if(wal != null) {
94
    } else if(capacity > this.capacity) {
74
      try {
95
      // capacity increase
75
        wal.close();
96
      queueRemaining.release(capacity - this.capacity);
76
      } catch (IOException e) {
97
    } else if(capacity < this.capacity) {
77
        LOG.error("Error closing existing wal during reconfigure", e);
98
      queueRemaining.acquireUninterruptibly(this.capacity - capacity);
78
      }

   
79
    }
99
    }

    
   
100
    this.capacity = capacity;

    
   
101
    keepAlive = context.getInteger(KEEPALIVE, DEFAULT_KEEPALIVE);
80
    long rollSize = context.getLong(WAL_ROLL_SIZE, WAL.DEFAULT_ROLL_SIZE);
102
    long rollSize = context.getLong(WAL_ROLL_SIZE, WAL.DEFAULT_ROLL_SIZE);
81
    long maxLogsSize = context.getLong(WAL_MAX_LOGS_SIZE, WAL.DEFAULT_MAX_LOGS_SIZE);
103
    long maxLogsSize = context.getLong(WAL_MAX_LOGS_SIZE, WAL.DEFAULT_MAX_LOGS_SIZE);
82
    long minRententionPeriod = context.getLong(WAL_MIN_RENTENTION_PERIOD, WAL.DEFAULT_MIN_LOG_RENTENTION_PERIOD);
104
    long minLogRetentionPeriod = context.getLong(WAL_MIN_RENTENTION_PERIOD, WAL.DEFAULT_MIN_LOG_RENTENTION_PERIOD);
83
    long workerInterval = context.getLong(WAL_WORKER_INTERVAL, WAL.DEFAULT_WORKER_INTERVAL);
105
    long workerInterval = context.getLong(WAL_WORKER_INTERVAL, WAL.DEFAULT_WORKER_INTERVAL);

    
   
106
    if(wal == null) {

    
   
107
      String homePath = System.getProperty("user.home").replace('\\', '/');

    
   
108
      String dataDir = context.getString(WAL_DATA_DIR, homePath + "/.flume/recoverable-memory-channel");
84
    try {
109
      try {
85
      wal = new WAL<RecoverableMemoryChannelEvent>(new File(dataDir),
110
        wal = new WAL<RecoverableMemoryChannelEvent>(new File(dataDir),
86
          RecoverableMemoryChannelEvent.class, rollSize, maxLogsSize,
111
            RecoverableMemoryChannelEvent.class, rollSize, maxLogsSize,
87
          minRententionPeriod, workerInterval);
112
            minLogRetentionPeriod, workerInterval);
88
    } catch (IOException e) {
113
      } catch (IOException e) {
89
      Throwables.propagate(e);
114
        Throwables.propagate(e);
90
    }
115
      }

    
   
116
    } else {

    
   
117
      wal.setRollSize(rollSize);

    
   
118
      wal.setMaxLogsSize(maxLogsSize);

    
   
119
      wal.setMinLogRetentionPeriod(minLogRetentionPeriod);

    
   
120
      wal.setWorkerInterval(workerInterval);

    
   
121
      LOG.warn(this.getClass().getSimpleName() + " only supports " +

    
   
122
      		"partial reconfiguration.");

    
   
123
    }
91
  }
124
  }
92

    
   
125

   
93
  @Override
126
  @Override
94
  public synchronized void start() {
127
  public synchronized void start() {

    
   
128
    LOG.info("Starting " + this);
95
    try {
129
    try {
96
      WALReplayResult<RecoverableMemoryChannelEvent> results = wal.replay();
130
      WALReplayResult<RecoverableMemoryChannelEvent> results = wal.replay();
97
      Preconditions.checkArgument(results.getSequenceID() >= 0);
131
      Preconditions.checkArgument(results.getSequenceID() >= 0);
98
      LOG.info("Replay SequenceID " + results.getSequenceID());
132
      LOG.info("Replay SequenceID " + results.getSequenceID());
99
      seqidGenerator.set(results.getSequenceID());
133
      seqidGenerator.set(results.getSequenceID());
100
      Transaction transaction = memoryChannel.getTransaction();
134
      int numResults = results.getResults().size();
101
      transaction.begin();
135
      Preconditions.checkState(numResults <= capacity, "Capacity " + capacity +
102
      LOG.info("Replay Events " + results.getResults().size());
136
          ", but we need to replay " + numResults);

    
   
137
      LOG.info("Replay Events " + numResults);
103
      for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
138
      for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
104
        memoryChannel.put(entry.getData());

   
105
        seqidGenerator.set(Math.max(entry.getSequenceID(),seqidGenerator.get()));
139
        seqidGenerator.set(Math.max(entry.getSequenceID(),seqidGenerator.get()));
106
      }
140
      }

    
   
141
      for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {

    
   
142
        Transaction transaction = memoryChannel.getTransaction();

    
   
143
        transaction.begin();

    
   
144
        memoryChannel.put(entry.getData());
107
      transaction.commit();
145
        transaction.commit();
108
      transaction.close();
146
        transaction.close();

    
   
147
      }
109
    } catch (IOException e) {
148
    } catch (IOException e) {
110
      Throwables.propagate(e);
149
      Throwables.propagate(e);
111
    }
150
    }
112
    super.start();
151
    super.start();

    
   
152
    open = true;
113
  }
153
  }
114

    
   
154

   
115
  @Override
155
  @Override
116
  public synchronized void stop() {
156
  public synchronized void stop() {

    
   
157
    open = false;

    
   
158
    LOG.info("Stopping " + this);
117
    try {
159
    try {
118
      close();
160
      close();
119
    } catch (IOException e) {
161
    } catch (IOException e) {
120
      Throwables.propagate(e);
162
      Throwables.propagate(e);
121
    }
163
    }
122
    super.stop();
164
    super.stop();
123
  }
165
  }
124

    
   
166

   
125
  @Override
167
  @Override
126
  protected BasicTransactionSemantics createTransaction() {
168
  protected BasicTransactionSemantics createTransaction() {
127
    return new FileBackedTransaction(this, memoryChannel);
169
    return new RecoverableMemoryTransaction(this, memoryChannel);
128
  }
170
  }
129

    
   
171

   
130
  private void commitEvents(List<RecoverableMemoryChannelEvent> events)
172
  private void commitEvents(List<RecoverableMemoryChannelEvent> events)
131
      throws IOException {
173
      throws IOException {
132
    List<WALEntry<RecoverableMemoryChannelEvent>> entries = Lists.newArrayList();
174
    List<WALEntry<RecoverableMemoryChannelEvent>> entries = Lists.newArrayList();
[+20] [20] 20 lines
[+20] [+] private long nextSequenceID() {
153
  /**
195
  /**
154
   * <p>
196
   * <p>
155
   * An implementation of {@link Transaction} for {@link RecoverableMemoryChannel}s.
197
   * An implementation of {@link Transaction} for {@link RecoverableMemoryChannel}s.
156
   * </p>
198
   * </p>
157
   */
199
   */
158
  private static class FileBackedTransaction extends BasicTransactionSemantics {
200
  private static class RecoverableMemoryTransaction extends BasicTransactionSemantics {
159

    
   
201

   
160
    private Transaction transaction;
202
    private Transaction transaction;
161
    private MemoryChannel memoryChannel;
203
    private MemoryChannel memoryChannel;
162
    private RecoverableMemoryChannel fileChannel;
204
    private RecoverableMemoryChannel channel;
163
    private List<Long> sequenceIds = Lists.newArrayList();
205
    private List<Long> sequenceIds = Lists.newArrayList();
164
    private List<RecoverableMemoryChannelEvent> events = Lists.newArrayList();
206
    private List<RecoverableMemoryChannelEvent> events = Lists.newArrayList();
165
    private FileBackedTransaction(RecoverableMemoryChannel fileChannel, MemoryChannel memoryChannel) {
207
    private int takes;
166
      this.fileChannel = fileChannel;
208

   

    
   
209
    private RecoverableMemoryTransaction(RecoverableMemoryChannel channel,

    
   
210
        MemoryChannel memoryChannel) {

    
   
211
      this.channel = channel;
167
      this.memoryChannel = memoryChannel;
212
      this.memoryChannel = memoryChannel;
168
      this.transaction = this.memoryChannel.getTransaction();
213
      this.transaction = this.memoryChannel.getTransaction();

    
   
214
      this.takes = 0;
169
    }
215
    }
170
    @Override
216
    @Override
171
    protected void doBegin() throws InterruptedException {
217
    protected void doBegin() throws InterruptedException {
172
      transaction.begin();
218
      transaction.begin();
173
    }
219
    }
174
    @Override
220
    @Override
175
    protected void doPut(Event event) throws InterruptedException {
221
    protected void doPut(Event event) throws InterruptedException {
176
      RecoverableMemoryChannelEvent sequencedEvent = new RecoverableMemoryChannelEvent(event, fileChannel.nextSequenceID());
222
      if(!channel.open) {

    
   
223
        throw new ChannelException("Channel not open");

    
   
224
      }

    
   
225
      if(!channel.queueRemaining.tryAcquire(channel.keepAlive, TimeUnit.SECONDS)) {

    
   
226
        throw new ChannelException("Cannot acquire capacity");

    
   
227
      }

    
   
228
      RecoverableMemoryChannelEvent sequencedEvent =

    
   
229
          new RecoverableMemoryChannelEvent(event, channel.nextSequenceID());
177
      memoryChannel.put(sequencedEvent);
230
      memoryChannel.put(sequencedEvent);
178
      events.add(sequencedEvent);
231
      events.add(sequencedEvent);
179
    }
232
    }
180

    
   
233

   
181
    @Override
234
    @Override
182
    protected Event doTake() throws InterruptedException {
235
    protected Event doTake() throws InterruptedException {

    
   
236
      if(!channel.open) {

    
   
237
        throw new ChannelException("Channel not open");

    
   
238
      }
183
      RecoverableMemoryChannelEvent event = (RecoverableMemoryChannelEvent)memoryChannel.take();
239
      RecoverableMemoryChannelEvent event = (RecoverableMemoryChannelEvent)memoryChannel.take();
184
      if(event != null) {
240
      if(event != null) {
185
        sequenceIds.add(event.sequenceId);
241
        sequenceIds.add(event.sequenceId);

    
   
242
        takes++;
186
        return event.event;
243
        return event.event;
187
      }
244
      }
188
      return null;
245
      return null;
189
    }
246
    }
190

    
   
247

   
191
    @Override
248
    @Override
192
    protected void doCommit() throws InterruptedException {
249
    protected void doCommit() throws InterruptedException {

    
   
250
      if(!channel.open) {

    
   
251
        throw new ChannelException("Channel not open");

    
   
252
      }
193
      if(sequenceIds.size() > 0) {
253
      if(sequenceIds.size() > 0) {
194
        try {
254
        try {
195
          fileChannel.commitSequenceID(sequenceIds);
255
          channel.commitSequenceID(sequenceIds);
196
        } catch (IOException e) {
256
        } catch (IOException e) {
197
          throw new ChannelException("Unable to commit", e);
257
          throw new ChannelException("Unable to commit", e);
198
        }
258
        }
199
      }
259
      }
200
      if(!events.isEmpty()) {
260
      if(!events.isEmpty()) {
201
        try {
261
        try {
202
          fileChannel.commitEvents(events);
262
          channel.commitEvents(events);
203
        } catch (IOException e) {
263
        } catch (IOException e) {
204
          throw new ChannelException("Unable to commit", e);
264
          throw new ChannelException("Unable to commit", e);
205
        }
265
        }
206
      }
266
      }
207
      transaction.commit();
267
      transaction.commit();

    
   
268
      channel.queueRemaining.release(takes);
208
    }
269
    }
209

    
   
270

   
210
    @Override
271
    @Override
211
    protected void doRollback() throws InterruptedException {
272
    protected void doRollback() throws InterruptedException {
212
      sequenceIds.clear();
273
      sequenceIds.clear();
213
      events.clear();
274
      events.clear();

    
   
275
      channel.queueRemaining.release(events.size());
214
      transaction.rollback();
276
      transaction.rollback();
215
    }
277
    }
216

    
   
278

   
217
    @Override
279
    @Override
218
    protected void doClose() {
280
    protected void doClose() {
219
      sequenceIds.clear();
281
      sequenceIds.clear();
220
      events.clear();
282
      events.clear();
221
      transaction.close();
283
      transaction.close();
222
    }
284
    }
223
  }
285
  }
224
}
286
}
flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
Revision fa63b73 New Change
 
flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
Revision 97ef796 New Change
 
flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
Revision 9d4a1fd New Change
 
flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
Revision edd8a8b New Change
 
flume-ng-core/src/main/java/org/apache/flume/SinkProcessorType.java
Revision be1891b New Change
 
flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java
Revision 84492e5 New Change
 
flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
Revision 4722819 New Change
 
flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
Revision f179de0 New Change
 
flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
Revision 2334059 New Change
 
flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
Revision ca5212e New Change
 
flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java
Revision 6b08c09 New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
Revision b0485b1 New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
Revision 732cce5 New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
Revision 6e71e46 New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java
Revision 80020fc New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java
Revision 1421449 New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
Revision 8dad0b2 New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
Revision bc81f26 New Change
 
flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java
Revision d66f6d1 New Change
 
flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
Revision 1f0e8c6 New Change
 
flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Revision 07c3d0b New Change
 
flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java
Revision 225cd34 New Change
 
flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
Revision 0b8a2c0 New Change
 
  1. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java: Loading...
  2. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java: Loading...
  3. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java: Loading...
  4. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java: Loading...
  5. flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java: Loading...
  6. flume-ng-core/src/main/java/org/apache/flume/SinkProcessorType.java: Loading...
  7. flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java: Loading...
  8. flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java: Loading...
  9. flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java: Loading...
  10. flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java: Loading...
  11. flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java: Loading...
  12. flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java: Loading...
  13. flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java: Loading...
  14. flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java: Loading...
  15. flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java: Loading...
  16. flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java: Loading...
  17. flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java: Loading...
  18. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java: Loading...
  19. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java: Loading...
  20. flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java: Loading...
  21. flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java: Loading...
  22. flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java: Loading...
  23. flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java: Loading...
  24. flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java: Loading...