Review Board 1.7.22


FLUME-1534: ComparableFlumeEventPointer#equals method does not work.

Review Request #6892 - Created Sept. 2, 2012 and submitted

Hari Shreedharan
FLUME-1534
Reviewers
Flume
flume-git
The equals() method in ComparableFlumeEventPointer was not working correctly, and hence remove() calls from sets were not removing anything.

 

Diff revision 2 (Latest)

1 2
1 2

  1. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java: Loading...
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
Revision 4db1b9c New Change
[20] 15 lines
[+20]
16
 * specific language governing permissions and limitations
16
 * specific language governing permissions and limitations
17
 * under the License.
17
 * under the License.
18
 */
18
 */
19
package org.apache.flume.channel.file;
19
package org.apache.flume.channel.file;
20

    
   
20

   

    
   
21
import com.google.common.base.Preconditions;
21
import com.google.common.collect.HashMultimap;
22
import com.google.common.collect.HashMultimap;
22
import com.google.common.collect.Lists;
23
import com.google.common.collect.Lists;
23
import com.google.common.collect.SetMultimap;
24
import com.google.common.collect.SetMultimap;
24
import com.google.common.collect.Sets;
25
import com.google.common.collect.Sets;
25
import java.io.File;
26
import java.io.File;
[+20] [20] 118 lines
[+20] [+] public boolean rebuild() throws IOException, Exception {
144
        reader.close();
145
        reader.close();
145
      }
146
      }
146
    }
147
    }
147
    Set<ComparableFlumeEventPointer> sortedPuts =
148
    Set<ComparableFlumeEventPointer> sortedPuts =
148
            Sets.newTreeSet(committedPuts);
149
            Sets.newTreeSet(committedPuts);

    
   
150
    int count = 0;
149
    for (ComparableFlumeEventPointer put : sortedPuts) {
151
    for (ComparableFlumeEventPointer put : sortedPuts) {
150
      queue.addTail(put.pointer);
152
      queue.addTail(put.pointer);

    
   
153
      count++;
151
    }
154
    }

    
   
155
    LOG.info("Replayed {} events using fast replay logic.", count);
152
    return true;
156
    return true;
153
  }
157
  }
154

    
   
158

   
155
  private void writeCheckpoint() throws IOException {
159
  private void writeCheckpoint() throws IOException {
156
    long checkpointLogOrderID = 0;
160
    long checkpointLogOrderID = 0;
[+20] [20] 19 lines
[+20] private void writeCheckpoint() throws IOException {
176
        logWriter.close();
180
        logWriter.close();
177
      }
181
      }
178
    }
182
    }
179
  }
183
  }
180

    
   
184

   
181
  private class ComparableFlumeEventPointer
185
  private final class ComparableFlumeEventPointer
182
          implements Comparable<ComparableFlumeEventPointer> {
186
          implements Comparable<ComparableFlumeEventPointer> {
183

    
   
187

   
184
    private final FlumeEventPointer pointer;
188
    private final FlumeEventPointer pointer;
185
    private final long orderID;
189
    private final long orderID;
186

    
   
190

   
187
    public ComparableFlumeEventPointer(FlumeEventPointer pointer, long orderID){
191
    public ComparableFlumeEventPointer(FlumeEventPointer pointer, long orderID){

    
   
192
      Preconditions.checkNotNull(pointer, "FlumeEventPointer cannot be"

    
   
193
              + "null while creating a ComparableFlumeEventPointer");
188
      this.pointer = pointer;
194
      this.pointer = pointer;
189
      this.orderID = orderID;
195
      this.orderID = orderID;
190
    }
196
    }
191

    
   
197

   
192
    @Override
198
    @Override
[+20] [20] 11 lines
[+20] [+] public int compareTo(ComparableFlumeEventPointer o) {
204
      return pointer.hashCode();
210
      return pointer.hashCode();
205
    }
211
    }
206

    
   
212

   
207
    @Override
213
    @Override
208
    public boolean equals(Object o){
214
    public boolean equals(Object o){
209
      return pointer.equals(o);
215
      if(this == o){

    
   
216
        return true;

    
   
217
      }

    
   
218
      if(o == null){

    
   
219
        return false;

    
   
220
      }

    
   
221
      if(o.getClass() != this.getClass()){

    
   
222
        return false;

    
   
223
      }

    
   
224
      return pointer.equals(((ComparableFlumeEventPointer)o).pointer);
210
    }
225
    }
211
  }
226
  }
212

    
   
227

   
213
  public static void main(String[] args) throws Exception {
228
  public static void main(String[] args) throws Exception {
214
    Options options = new Options();
229
    Options options = new Options();
[+20] [20] 37 lines
  1. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java: Loading...