Review Board 1.7.22


FLUME-2233. MemoryChannel lock contention on every put due to bytesRemaining Semaphore

Review Request #15239 - Created Nov. 5, 2013 and submitted

Hari Shreedharan
FLUME-2233
Reviewers
Flume
flume-git
Added a fix for this issue using an additional parameter, which can be used to disable memory checks.
All current tests pass.
flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
Revision 688323d New Change
[20] 76 lines
[+20] [+] private class MemoryTransaction extends BasicTransactionSemantics {
77
    @Override
77
    @Override
78
    protected void doPut(Event event) throws InterruptedException {
78
    protected void doPut(Event event) throws InterruptedException {
79
      channelCounter.incrementEventPutAttemptCount();
79
      channelCounter.incrementEventPutAttemptCount();
80
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
80
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
81

    
   
81

   
82
      if (bytesRemaining.tryAcquire(eventByteSize, keepAlive, TimeUnit.SECONDS)) {
82
      if (!putList.offer(event)) {
83
        if(!putList.offer(event)) {
83
        throw new ChannelException(
84
          throw new ChannelException("Put queue for MemoryTransaction of capacity " +
84
          "Put queue for MemoryTransaction of capacity " +
85
              putList.size() + " full, consider committing more frequently, " +
85
            putList.size() + " full, consider committing more frequently, " +
86
              "increasing capacity or increasing thread count");
86
            "increasing capacity or increasing thread count");
87
        }
87
      }
88
      } else {

   
89
        throw new ChannelException("Put queue for MemoryTransaction of byteCapacity " +

   
90
            (lastByteCapacity * (int)byteCapacitySlotSize) + " bytes cannot add an " +

   
91
            " event of size " + estimateEventSize(event) + " bytes because " +

   
92
             (bytesRemaining.availablePermits() * (int)byteCapacitySlotSize) + " bytes are already used." +

   
93
            " Try consider comitting more frequently, increasing byteCapacity or increasing thread count");

   
94
      }

   
95
      putByteCounter += eventByteSize;
88
      putByteCounter += eventByteSize;
96
    }
89
    }
97

    
   
90

   
98
    @Override
91
    @Override
99
    protected Event doTake() throws InterruptedException {
92
    protected Event doTake() throws InterruptedException {
[+20] [20] 22 lines
[+20] protected void doPut(Event event) throws InterruptedException {
122

    
   
115

   
123
    @Override
116
    @Override
124
    protected void doCommit() throws InterruptedException {
117
    protected void doCommit() throws InterruptedException {
125
      int remainingChange = takeList.size() - putList.size();
118
      int remainingChange = takeList.size() - putList.size();
126
      if(remainingChange < 0) {
119
      if(remainingChange < 0) {

    
   
120
        if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,

    
   
121
          TimeUnit.SECONDS)) {

    
   
122
          throw new ChannelException("Cannot commit transaction. Heap space " +

    
   
123
            "limit of " + byteCapacity + "reached. Please increase heap space" +

    
   
124
            " allocated to the channel as the sinks may not be keeping up " +

    
   
125
            "with the sources");

    
   
126
        }
127
        if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
127
        if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {

    
   
128
          bytesRemaining.release(putByteCounter);
128
          throw new ChannelException("Space for commit to queue couldn't be acquired" +
129
          throw new ChannelException("Space for commit to queue couldn't be acquired" +
129
              " Sinks are likely not keeping up with sources, or the buffer size is too tight");
130
              " Sinks are likely not keeping up with sources, or the buffer size is too tight");
130
        }
131
        }
131
      }
132
      }
132
      int puts = putList.size();
133
      int puts = putList.size();
[+20] [20] 241 lines
flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
Revision a78581a New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java: Loading...