Review Board 1.7.22


FLUME-1859. Fast replay can use slightly less memory.

Review Request #9015 - Created Jan. 18, 2013 and updated

Hari Shreedharan
FLUME-1859
Reviewers
Flume
flume-git
Changed the FlumeEventPointer to fileId,offset in ComparableFlumeEventPointer.
Ran full build, all unit tests pass.

Diff revision 2 (Latest)

1 2
1 2

  1. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java: Loading...
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
Revision 6e64003 New Change
[20] 15 lines
[+20]
16
 * specific language governing permissions and limitations
16
 * specific language governing permissions and limitations
17
 * under the License.
17
 * under the License.
18
 */
18
 */
19
package org.apache.flume.channel.file;
19
package org.apache.flume.channel.file;
20

    
   
20

   
21
import com.google.common.base.Preconditions;

   
22
import com.google.common.collect.HashMultimap;
21
import com.google.common.collect.HashMultimap;
23
import com.google.common.collect.Lists;
22
import com.google.common.collect.Lists;
24
import com.google.common.collect.SetMultimap;
23
import com.google.common.collect.SetMultimap;
25
import com.google.common.collect.Sets;
24
import com.google.common.collect.Sets;
26

    
   

   
27
import java.io.EOFException;
Moved to 33

   
28
import java.io.File;
Moved to 34

   
29
import java.io.IOException;
Moved to 35

   
30
import java.util.Arrays;
Moved to 36

   
31
import java.util.List;
Moved to 37

   
32
import java.util.Set;
Moved to 38

   
33
import org.apache.commons.cli.CommandLine;
25
import org.apache.commons.cli.CommandLine;
34
import org.apache.commons.cli.CommandLineParser;
26
import org.apache.commons.cli.CommandLineParser;
35
import org.apache.commons.cli.GnuParser;
27
import org.apache.commons.cli.GnuParser;
36
import org.apache.commons.cli.Option;
28
import org.apache.commons.cli.Option;
37
import org.apache.commons.cli.Options;
29
import org.apache.commons.cli.Options;
38
import org.slf4j.Logger;
30
import org.slf4j.Logger;
39
import org.slf4j.LoggerFactory;
31
import org.slf4j.LoggerFactory;
40

    
   
32

   
Moved from 27

    
   
33
import java.io.EOFException;
Moved from 28

    
   
34
import java.io.File;
Moved from 29

    
   
35
import java.io.IOException;
Moved from 30

    
   
36
import java.util.Arrays;
Moved from 31

    
   
37
import java.util.List;
Moved from 32

    
   
38
import java.util.Set;

    
   
39

   
41
public class CheckpointRebuilder {
40
public class CheckpointRebuilder {
42

    
   
41

   
43
  private final List<File> logFiles;
42
  private final List<File> logFiles;
44
  private final FlumeEventQueue queue;
43
  private final FlumeEventQueue queue;
45
  private final Set<ComparableFlumeEventPointer> committedPuts =
44
  private final Set<ComparableFlumeEventPointer> committedPuts =
[+20] [20] 37 lines
[+20] [+] public boolean rebuild() throws IOException, Exception {
83
          long writeOrderID = record.getLogWriteOrderID();
82
          long writeOrderID = record.getLogWriteOrderID();
84
            transactionIDSeed = Math.max(trans, transactionIDSeed);
83
            transactionIDSeed = Math.max(trans, transactionIDSeed);
85
            writeOrderIDSeed = Math.max(writeOrderID, writeOrderIDSeed);
84
            writeOrderIDSeed = Math.max(writeOrderID, writeOrderIDSeed);
86
          if (record.getRecordType() == TransactionEventRecord.Type.PUT.get()) {
85
          if (record.getRecordType() == TransactionEventRecord.Type.PUT.get()) {
87
            uncommittedPuts.put(record.getTransactionID(),
86
            uncommittedPuts.put(record.getTransactionID(),
88
                    new ComparableFlumeEventPointer(
87
                    new ComparableFlumeEventPointer(fileID, offset,
89
                    new FlumeEventPointer(fileID, offset),

   
90
                    record.getLogWriteOrderID()));
88
                    record.getLogWriteOrderID()));
91
          } else if (record.getRecordType()
89
          } else if (record.getRecordType()
92
                  == TransactionEventRecord.Type.TAKE.get()) {
90
                  == TransactionEventRecord.Type.TAKE.get()) {
93
            Take take = (Take) record;
91
            Take take = (Take) record;
94
            uncommittedTakes.put(record.getTransactionID(),
92
            uncommittedTakes.put(record.getTransactionID(),
95
                    new ComparableFlumeEventPointer(
93
                    new ComparableFlumeEventPointer(take.getFileID(),
96
                    new FlumeEventPointer(take.getFileID(), take.getOffset()),
94
                      take.getOffset(), record.getLogWriteOrderID()));
97
                    record.getLogWriteOrderID()));

   
98
          } else if (record.getRecordType()
95
          } else if (record.getRecordType()
99
                  == TransactionEventRecord.Type.COMMIT.get()) {
96
                  == TransactionEventRecord.Type.COMMIT.get()) {
100
            Commit commit = (Commit) record;
97
            Commit commit = (Commit) record;
101
            if (commit.getType()
98
            if (commit.getType()
102
                    == TransactionEventRecord.Type.PUT.get()) {
99
                    == TransactionEventRecord.Type.PUT.get()) {
[+20] [20] 40 lines
[+20] public boolean rebuild() throws IOException, Exception {
143
    }
140
    }
144
    Set<ComparableFlumeEventPointer> sortedPuts =
141
    Set<ComparableFlumeEventPointer> sortedPuts =
145
            Sets.newTreeSet(committedPuts);
142
            Sets.newTreeSet(committedPuts);
146
    int count = 0;
143
    int count = 0;
147
    for (ComparableFlumeEventPointer put : sortedPuts) {
144
    for (ComparableFlumeEventPointer put : sortedPuts) {
148
      queue.addTail(put.pointer);
145
      queue.addTail(new FlumeEventPointer(put.fileID, put.offset));
149
      count++;
146
      count++;
150
    }
147
    }
151
    LOG.info("Replayed {} events using fast replay logic.", count);
148
    LOG.info("Replayed {} events using fast replay logic.", count);
152
    return true;
149
    return true;
153
  }
150
  }
[+20] [20] 24 lines
[+20] [+] private void writeCheckpoint() throws IOException {
178
  }
175
  }
179

    
   
176

   
180
  private final class ComparableFlumeEventPointer
177
  private final class ComparableFlumeEventPointer
181
          implements Comparable<ComparableFlumeEventPointer> {
178
          implements Comparable<ComparableFlumeEventPointer> {
182

    
   
179

   
183
    private final FlumeEventPointer pointer;
180
    private final int fileID;

    
   
181
    private final int offset;
184
    private final long orderID;
182
    private final long orderID;
185

    
   
183

   
186
    public ComparableFlumeEventPointer(FlumeEventPointer pointer, long orderID){
184
    public ComparableFlumeEventPointer(int fileID, int offset, long orderID){
187
      Preconditions.checkNotNull(pointer, "FlumeEventPointer cannot be"

   
188
              + "null while creating a ComparableFlumeEventPointer");

   
189
      this.pointer = pointer;

   
190
      this.orderID = orderID;
185
      this.orderID = orderID;

    
   
186
      this.fileID = fileID;

    
   
187
      this.offset = offset;
191
    }
188
    }
192

    
   
189

   
193
    @Override
190
    @Override
194
    public int compareTo(ComparableFlumeEventPointer o) {
191
    public int compareTo(ComparableFlumeEventPointer o) {
195
      if (orderID < o.orderID) {
192
      if (orderID < o.orderID) {
[+20] [20] 4 lines
[+20] public int compareTo(ComparableFlumeEventPointer o) {
200
      }
197
      }
201
    }
198
    }
202

    
   
199

   
203
    @Override
200
    @Override
204
    public int hashCode(){
201
    public int hashCode(){
205
      return pointer.hashCode();
202
      final int prime = 31;

    
   
203
      int result = 1;

    
   
204
      result = prime * result + fileID;

    
   
205
      result = prime * result + offset;

    
   
206
      return result;
206
    }
207
    }
207

    
   
208

   
208
    @Override
209
    @Override
209
    public boolean equals(Object o){
210
    public boolean equals(Object o){
210
      if(this == o){
211
      if(this == o){
211
        return true;
212
        return true;
212
      }
213
      }
213
      if(o == null){
214
      if(o == null){
214
        return false;
215
        return false;
215
      }
216
      }
216
      if(o.getClass() != this.getClass()){
217
      if(o.getClass() != this.getClass()){
217
        return false;
218
        return false;
218
      }
219
      }
219
      return pointer.equals(((ComparableFlumeEventPointer)o).pointer);
220
      ComparableFlumeEventPointer other = (ComparableFlumeEventPointer)o;

    
   
221
      if(other.fileID != this.fileID) {

    
   
222
        return false;

    
   
223
      }

    
   
224
      if(other.offset != this.offset) {

    
   
225
        return false;

    
   
226
      }

    
   
227
      return true;
220
    }
228
    }
221
  }
229
  }
222

    
   
230

   
223
  public static void main(String[] args) throws Exception {
231
  public static void main(String[] args) throws Exception {
224
    Options options = new Options();
232
    Options options = new Options();
[+20] [20] 40 lines
  1. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java: Loading...