Review Board 1.7.22


FLUME-1121: Recoverable Memory Channel cannot recover data

Review Request #4713 - Created April 13, 2012 and submitted

Brock Noland
trunk
FLUME-1121
Reviewers
Flume
flume-git
Channel.start() is not being called. This is fixed in DefaultLogicalNodeManager.

Additionally RecoverableMemoryChannel now tracks it's own capacity due to the MemoryChannel semantics being completely different. Basically, if we rely on MemoryChannel capacity, then an error will be thrown when we commit the MemoryChannelTransaction. However, we will have already committed this data to disk. If we commit to MemoryChannel first (there by checking capacity) we could fail to write to disk resulting in data which is only in memory.

Also, I ran cleanup on modules touched. This removes whitespace, unused imports, and adds @Override tags where needed. This is one time cleanup which allows automated cleanup in the future.
All unit tests pass and manual testing passes as well.

Diff revision 2

This is not the most recent revision of the diff. The latest diff is revision 3. See what's changed.

1 2 3
1 2 3

  1. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java: Loading...
  2. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java: Loading...
  3. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java: Loading...
  4. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java: Loading...
  5. flume-ng-core/src/main/java/org/apache/flume/SinkProcessorType.java: Loading...
  6. flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java: Loading...
  7. flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java: Loading...
  8. flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java: Loading...
  9. flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java: Loading...
  10. flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java: Loading...
  11. flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java: Loading...
  12. flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java: Loading...
  13. flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java: Loading...
  14. flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java: Loading...
  15. flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java: Loading...
  16. flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java: Loading...
  17. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java: Loading...
  18. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java: Loading...
  19. flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java: Loading...
  20. flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java: Loading...
  21. flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java: Loading...
  22. flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java: Loading...
  23. flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java: Loading...
flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
Revision 0622f27 New Change
[20] 19 lines
[+20]
20
package org.apache.flume.channel.recoverable.memory;
20
package org.apache.flume.channel.recoverable.memory;
21

    
   
21

   
22
import java.io.File;
22
import java.io.File;
23
import java.io.IOException;
23
import java.io.IOException;
24
import java.util.List;
24
import java.util.List;

    
   
25
import java.util.concurrent.Semaphore;

    
   
26
import java.util.concurrent.TimeUnit;
25
import java.util.concurrent.atomic.AtomicLong;
27
import java.util.concurrent.atomic.AtomicLong;
26

    
   
28

   
27
import org.apache.flume.Channel;
29
import org.apache.flume.Channel;
28
import org.apache.flume.ChannelException;
30
import org.apache.flume.ChannelException;
29
import org.apache.flume.Context;
31
import org.apache.flume.Context;
[+20] [20] 27 lines
[+20] [+] public class RecoverableMemoryChannel extends BasicChannelSemantics {
57
  public static final String WAL_DATA_DIR = "wal.dataDir";
59
  public static final String WAL_DATA_DIR = "wal.dataDir";
58
  public static final String WAL_ROLL_SIZE = "wal.rollSize";
60
  public static final String WAL_ROLL_SIZE = "wal.rollSize";
59
  public static final String WAL_MAX_LOGS_SIZE = "wal.maxLogsSize";
61
  public static final String WAL_MAX_LOGS_SIZE = "wal.maxLogsSize";
60
  public static final String WAL_MIN_RENTENTION_PERIOD = "wal.minRententionPeriod";
62
  public static final String WAL_MIN_RENTENTION_PERIOD = "wal.minRententionPeriod";
61
  public static final String WAL_WORKER_INTERVAL = "wal.workerInterval";
63
  public static final String WAL_WORKER_INTERVAL = "wal.workerInterval";

    
   
64
  public static final String CAPACITY = "capacity";

    
   
65
  public static final String KEEPALIVE = "keep-alive";

    
   
66

   

    
   
67
  public static final int DEFAULT_CAPACITY = 100;

    
   
68
  public static final int DEFAULT_KEEPALIVE = 3;
62

    
   
69

   
63
  private MemoryChannel memoryChannel = new MemoryChannel();
70
  private MemoryChannel memoryChannel = new MemoryChannel();
64
  private AtomicLong seqidGenerator = new AtomicLong(0);
71
  private AtomicLong seqidGenerator = new AtomicLong(0);
65
  private WAL<RecoverableMemoryChannelEvent> wal;
72
  private WAL<RecoverableMemoryChannelEvent> wal;

    
   
73
  /**

    
   
74
   * MemoryChannel checks to ensure the capacity is available

    
   
75
   * on commit. That is a problem because we need to write to

    
   
76
   * disk before we commit the data to MemoryChannel. As such

    
   
77
   * we keep track of capacity ourselves.

    
   
78
   */

    
   
79
  private Semaphore queueRemaining;

    
   
80
  private int capacity;

    
   
81
  private int keepAlive;

    
   
82
  private volatile boolean open;
66

    
   
83

   
67
  @Override
84
  @Override
68
  public void configure(Context context) {
85
  public void configure(Context context) {

    
   
86
    open = false;
69
    memoryChannel.configure(context);
87
    memoryChannel.configure(context);
70

    
   
88
    int capacity = context.getInteger(CAPACITY, DEFAULT_CAPACITY);

    
   
89
    if(queueRemaining == null) {

    
   
90
      queueRemaining = new Semaphore(capacity, true);

    
   
91
    } else if(capacity > this.capacity) {

    
   
92
      // capacity increase

    
   
93
      queueRemaining.release(capacity - this.capacity);

    
   
94
    } else if(capacity < this.capacity) {

    
   
95
      queueRemaining.acquireUninterruptibly(this.capacity - capacity);

    
   
96
    }

    
   
97
    this.capacity = capacity;

    
   
98
    keepAlive = context.getInteger(KEEPALIVE, DEFAULT_KEEPALIVE);
71
    String homePath = System.getProperty("user.home").replace('\\', '/');
99
    String homePath = System.getProperty("user.home").replace('\\', '/');
72
    String dataDir = context.getString(WAL_DATA_DIR, homePath + "/.flume/recoverable-memory-channel");
100
    String dataDir = context.getString(WAL_DATA_DIR, homePath + "/.flume/recoverable-memory-channel");
73
    if(wal != null) {
101
    if(wal != null) {
74
      try {
102
      try {
75
        wal.close();
103
        wal.close();
[+20] [20] 14 lines
[+20] public class RecoverableMemoryChannel extends BasicChannelSemantics {
90
    }
118
    }
91
  }
119
  }
92

    
   
120

   
93
  @Override
121
  @Override
94
  public synchronized void start() {
122
  public synchronized void start() {

    
   
123
    LOG.info("Starting " + this);
95
    try {
124
    try {
96
      WALReplayResult<RecoverableMemoryChannelEvent> results = wal.replay();
125
      WALReplayResult<RecoverableMemoryChannelEvent> results = wal.replay();
97
      Preconditions.checkArgument(results.getSequenceID() >= 0);
126
      Preconditions.checkArgument(results.getSequenceID() >= 0);
98
      LOG.info("Replay SequenceID " + results.getSequenceID());
127
      LOG.info("Replay SequenceID " + results.getSequenceID());
99
      seqidGenerator.set(results.getSequenceID());
128
      seqidGenerator.set(results.getSequenceID());
100
      Transaction transaction = memoryChannel.getTransaction();
129
      int numResults = results.getResults().size();
101
      transaction.begin();
130
      if(numResults > capacity) {
102
      LOG.info("Replay Events " + results.getResults().size());
131
        LOG.error("Capacity " + capacity + ", but we need to replay " + numResults);

    
   
132
      } else {

    
   
133
        LOG.info("Replay Events " + numResults);

    
   
134
      }
103
      for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
135
      for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
104
        memoryChannel.put(entry.getData());

   
105
        seqidGenerator.set(Math.max(entry.getSequenceID(),seqidGenerator.get()));
136
        seqidGenerator.set(Math.max(entry.getSequenceID(),seqidGenerator.get()));
106
      }
137
      }

    
   
138
      for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {

    
   
139
        Transaction transaction = memoryChannel.getTransaction();

    
   
140
        transaction.begin();

    
   
141
        memoryChannel.put(entry.getData());
107
      transaction.commit();
142
        transaction.commit();
108
      transaction.close();
143
        transaction.close();

    
   
144
      }
109
    } catch (IOException e) {
145
    } catch (IOException e) {
110
      Throwables.propagate(e);
146
      Throwables.propagate(e);
111
    }
147
    }
112
    super.start();
148
    super.start();

    
   
149
    open = true;
113
  }
150
  }
114

    
   
151

   
115
  @Override
152
  @Override
116
  public synchronized void stop() {
153
  public synchronized void stop() {

    
   
154
    open = false;

    
   
155
    LOG.info("Stopping " + this);
117
    try {
156
    try {
118
      close();
157
      close();
119
    } catch (IOException e) {
158
    } catch (IOException e) {
120
      Throwables.propagate(e);
159
      Throwables.propagate(e);
121
    }
160
    }
122
    super.stop();
161
    super.stop();
123
  }
162
  }
124

    
   
163

   
125
  @Override
164
  @Override
126
  protected BasicTransactionSemantics createTransaction() {
165
  protected BasicTransactionSemantics createTransaction() {
127
    return new FileBackedTransaction(this, memoryChannel);
166
    return new RecoverableMemoryTransaction(this, memoryChannel);
128
  }
167
  }
129

    
   
168

   
130
  private void commitEvents(List<RecoverableMemoryChannelEvent> events)
169
  private void commitEvents(List<RecoverableMemoryChannelEvent> events)
131
      throws IOException {
170
      throws IOException {
132
    List<WALEntry<RecoverableMemoryChannelEvent>> entries = Lists.newArrayList();
171
    List<WALEntry<RecoverableMemoryChannelEvent>> entries = Lists.newArrayList();
[+20] [20] 20 lines
[+20] [+] private long nextSequenceID() {
153
  /**
192
  /**
154
   * <p>
193
   * <p>
155
   * An implementation of {@link Transaction} for {@link RecoverableMemoryChannel}s.
194
   * An implementation of {@link Transaction} for {@link RecoverableMemoryChannel}s.
156
   * </p>
195
   * </p>
157
   */
196
   */
158
  private static class FileBackedTransaction extends BasicTransactionSemantics {
197
  private static class RecoverableMemoryTransaction extends BasicTransactionSemantics {
159

    
   
198

   
160
    private Transaction transaction;
199
    private Transaction transaction;
161
    private MemoryChannel memoryChannel;
200
    private MemoryChannel memoryChannel;
162
    private RecoverableMemoryChannel fileChannel;
201
    private RecoverableMemoryChannel channel;
163
    private List<Long> sequenceIds = Lists.newArrayList();
202
    private List<Long> sequenceIds = Lists.newArrayList();
164
    private List<RecoverableMemoryChannelEvent> events = Lists.newArrayList();
203
    private List<RecoverableMemoryChannelEvent> events = Lists.newArrayList();
165
    private FileBackedTransaction(RecoverableMemoryChannel fileChannel, MemoryChannel memoryChannel) {
204
    private int takes;
166
      this.fileChannel = fileChannel;
205

   

    
   
206
    private RecoverableMemoryTransaction(RecoverableMemoryChannel channel,

    
   
207
        MemoryChannel memoryChannel) {

    
   
208
      this.channel = channel;
167
      this.memoryChannel = memoryChannel;
209
      this.memoryChannel = memoryChannel;
168
      this.transaction = this.memoryChannel.getTransaction();
210
      this.transaction = this.memoryChannel.getTransaction();

    
   
211
      this.takes = 0;
169
    }
212
    }
170
    @Override
213
    @Override
171
    protected void doBegin() throws InterruptedException {
214
    protected void doBegin() throws InterruptedException {
172
      transaction.begin();
215
      transaction.begin();
173
    }
216
    }
174
    @Override
217
    @Override
175
    protected void doPut(Event event) throws InterruptedException {
218
    protected void doPut(Event event) throws InterruptedException {
176
      RecoverableMemoryChannelEvent sequencedEvent = new RecoverableMemoryChannelEvent(event, fileChannel.nextSequenceID());
219
      if(!channel.open) {

    
   
220
        throw new ChannelException("Channel not open");

    
   
221
      }

    
   
222
      if(!channel.queueRemaining.tryAcquire(channel.keepAlive, TimeUnit.SECONDS)) {

    
   
223
        throw new ChannelException("Cannot acquire capacity");

    
   
224
      }

    
   
225
      RecoverableMemoryChannelEvent sequencedEvent =

    
   
226
          new RecoverableMemoryChannelEvent(event, channel.nextSequenceID());
177
      memoryChannel.put(sequencedEvent);
227
      memoryChannel.put(sequencedEvent);
178
      events.add(sequencedEvent);
228
      events.add(sequencedEvent);
179
    }
229
    }
180

    
   
230

   
181
    @Override
231
    @Override
182
    protected Event doTake() throws InterruptedException {
232
    protected Event doTake() throws InterruptedException {

    
   
233
      if(!channel.open) {

    
   
234
        throw new ChannelException("Channel not open");

    
   
235
      }
183
      RecoverableMemoryChannelEvent event = (RecoverableMemoryChannelEvent)memoryChannel.take();
236
      RecoverableMemoryChannelEvent event = (RecoverableMemoryChannelEvent)memoryChannel.take();
184
      if(event != null) {
237
      if(event != null) {
185
        sequenceIds.add(event.sequenceId);
238
        sequenceIds.add(event.sequenceId);

    
   
239
        takes++;
186
        return event.event;
240
        return event.event;
187
      }
241
      }
188
      return null;
242
      return null;
189
    }
243
    }
190

    
   
244

   
191
    @Override
245
    @Override
192
    protected void doCommit() throws InterruptedException {
246
    protected void doCommit() throws InterruptedException {

    
   
247
      if(!channel.open) {

    
   
248
        throw new ChannelException("Channel not open");

    
   
249
      }
193
      if(sequenceIds.size() > 0) {
250
      if(sequenceIds.size() > 0) {
194
        try {
251
        try {
195
          fileChannel.commitSequenceID(sequenceIds);
252
          channel.commitSequenceID(sequenceIds);
196
        } catch (IOException e) {
253
        } catch (IOException e) {
197
          throw new ChannelException("Unable to commit", e);
254
          throw new ChannelException("Unable to commit", e);
198
        }
255
        }
199
      }
256
      }
200
      if(!events.isEmpty()) {
257
      if(!events.isEmpty()) {
201
        try {
258
        try {
202
          fileChannel.commitEvents(events);
259
          channel.commitEvents(events);
203
        } catch (IOException e) {
260
        } catch (IOException e) {
204
          throw new ChannelException("Unable to commit", e);
261
          throw new ChannelException("Unable to commit", e);
205
        }
262
        }
206
      }
263
      }
207
      transaction.commit();
264
      transaction.commit();

    
   
265
      channel.queueRemaining.release(takes);
208
    }
266
    }
209

    
   
267

   
210
    @Override
268
    @Override
211
    protected void doRollback() throws InterruptedException {
269
    protected void doRollback() throws InterruptedException {
212
      sequenceIds.clear();
270
      sequenceIds.clear();
213
      events.clear();
271
      events.clear();

    
   
272
      channel.queueRemaining.release(events.size());
214
      transaction.rollback();
273
      transaction.rollback();
215
    }
274
    }
216

    
   
275

   
217
    @Override
276
    @Override
218
    protected void doClose() {
277
    protected void doClose() {
219
      sequenceIds.clear();
278
      sequenceIds.clear();
220
      events.clear();
279
      events.clear();
221
      transaction.close();
280
      transaction.close();
222
    }
281
    }
223
  }
282
  }
224
}
283
}
flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
Revision fa63b73 New Change
 
flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
Revision 97ef796 New Change
 
flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
Revision 9d4a1fd New Change
 
flume-ng-core/src/main/java/org/apache/flume/SinkProcessorType.java
Revision be1891b New Change
 
flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java
Revision 84492e5 New Change
 
flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
Revision 4722819 New Change
 
flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
Revision f179de0 New Change
 
flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
Revision 2334059 New Change
 
flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
Revision ca5212e New Change
 
flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java
Revision 6b08c09 New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
Revision b0485b1 New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
Revision 732cce5 New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
Revision 6e71e46 New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java
Revision 80020fc New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java
Revision 1421449 New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
Revision 8dad0b2 New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
Revision bc81f26 New Change
 
flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java
Revision d66f6d1 New Change
 
flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
Revision 1f0e8c6 New Change
 
flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Revision 07c3d0b New Change
 
flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java
Revision 225cd34 New Change
 
flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
Revision 0b8a2c0 New Change
 
  1. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java: Loading...
  2. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java: Loading...
  3. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java: Loading...
  4. flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java: Loading...
  5. flume-ng-core/src/main/java/org/apache/flume/SinkProcessorType.java: Loading...
  6. flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java: Loading...
  7. flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java: Loading...
  8. flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java: Loading...
  9. flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java: Loading...
  10. flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java: Loading...
  11. flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java: Loading...
  12. flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java: Loading...
  13. flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java: Loading...
  14. flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java: Loading...
  15. flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java: Loading...
  16. flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java: Loading...
  17. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java: Loading...
  18. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java: Loading...
  19. flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java: Loading...
  20. flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java: Loading...
  21. flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java: Loading...
  22. flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java: Loading...
  23. flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java: Loading...