Review Board 1.7.22


Ability to specify the capacity of MemoryChannel in bytes

Review Request #6982 - Created Sept. 10, 2012 and submitted

Ted Malaska
trunk
FLUME-1535
Reviewers
Flume
flume-git
1. The user will be able to define a byteCapacity and a byteCapacityBufferPercentage.
2. Events byte size will be estimated from there body contents
3. On put bytes are added to current total
4. On commit any uncommitted takes are removed from the current byte total
5. On rollover any uncommitted puts are removed from the current byte total

 
flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
Revision c72e97c New Change
[20] 34 lines
[+20]
35

    
   
35

   
36
public class MemoryChannel extends BasicChannelSemantics {
36
public class MemoryChannel extends BasicChannelSemantics {
37
  private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
37
  private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
38
  private static final Integer defaultCapacity = 100;
38
  private static final Integer defaultCapacity = 100;
39
  private static final Integer defaultTransCapacity = 100;
39
  private static final Integer defaultTransCapacity = 100;

    
   
40
  private static final double byteCapacitySlotSize = 100;

    
   
41
  private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80);

    
   
42
  private static final Integer defaultByteCapacityBufferPercentage = 20;

    
   
43

   
40
  private static final Integer defaultKeepAlive = 3;
44
  private static final Integer defaultKeepAlive = 3;
41

    
   
45

   
42
  public class MemoryTransaction extends BasicTransactionSemantics {
46
  public class MemoryTransaction extends BasicTransactionSemantics {
43
    private LinkedBlockingDeque<Event> takeList;
47
    private LinkedBlockingDeque<Event> takeList;
44
    private LinkedBlockingDeque<Event> putList;
48
    private LinkedBlockingDeque<Event> putList;
45
    private final ChannelCounter channelCounter;
49
    private final ChannelCounter channelCounter;

    
   
50
    private int putByteCounter = 0;

    
   
51
    private int takeByteCounter = 0;
46

    
   
52

   
47
    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
53
    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
48
      putList = new LinkedBlockingDeque<Event>(transCapacity);
54
      putList = new LinkedBlockingDeque<Event>(transCapacity);
49
      takeList = new LinkedBlockingDeque<Event>(transCapacity);
55
      takeList = new LinkedBlockingDeque<Event>(transCapacity);
50

    
   
56

   
51
      channelCounter = counter;
57
      channelCounter = counter;
52
    }
58
    }
53

    
   
59

   
54
    @Override
60
    @Override
55
    protected void doPut(Event event) {
61
    protected void doPut(Event event) throws InterruptedException {
56
      channelCounter.incrementEventPutAttemptCount();
62
      channelCounter.incrementEventPutAttemptCount();

    
   
63
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);

    
   
64

   

    
   
65
      if (bytesRemaining.tryAcquire(eventByteSize, keepAlive, TimeUnit.SECONDS)) {
57
      if(!putList.offer(event)) {
66
        if(!putList.offer(event)) {
58
        throw new ChannelException("Put queue for MemoryTransaction of capacity " +
67
          throw new ChannelException("Put queue for MemoryTransaction of capacity " +
59
            putList.size() + " full, consider committing more frequently, " +
68
              putList.size() + " full, consider committing more frequently, " +
60
            "increasing capacity or increasing thread count");
69
              "increasing capacity or increasing thread count");
61
      }
70
        }

    
   
71
      } else {

    
   
72
        throw new ChannelException("Put queue for MemoryTransaction of byteCapacity " +

    
   
73
            (lastByteCapacity * (int)byteCapacitySlotSize) + " bytes cannot add an " +

    
   
74
            " event of size " + estimateEventSize(event) + " bytes because " +

    
   
75
             (bytesRemaining.availablePermits() * (int)byteCapacitySlotSize) + " bytes are already used." +

    
   
76
            " Try consider comitting more frequently, increasing byteCapacity or increasing thread count");

    
   
77
      }

    
   
78
      putByteCounter += eventByteSize;
62
    }
79
    }
63

    
   
80

   
64
    @Override
81
    @Override
65
    protected Event doTake() throws InterruptedException {
82
    protected Event doTake() throws InterruptedException {
66
      channelCounter.incrementEventTakeAttemptCount();
83
      channelCounter.incrementEventTakeAttemptCount();
[+20] [20] 11 lines
[+20] protected Event doTake() throws InterruptedException {
78
      }
95
      }
79
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
96
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
80
          "signalling existence of entry");
97
          "signalling existence of entry");
81
      takeList.put(event);
98
      takeList.put(event);
82

    
   
99

   

    
   
100
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);

    
   
101
      takeByteCounter += eventByteSize;

    
   
102

   
83
      return event;
103
      return event;
84
    }
104
    }
85

    
   
105

   
86
    @Override
106
    @Override
87
    protected void doCommit() throws InterruptedException {
107
    protected void doCommit() throws InterruptedException {
[+20] [20] 15 lines
[+20] protected Event doTake() throws InterruptedException {
103
          }
123
          }
104
        }
124
        }
105
        putList.clear();
125
        putList.clear();
106
        takeList.clear();
126
        takeList.clear();
107
      }
127
      }

    
   
128
      bytesRemaining.release(takeByteCounter);

    
   
129
      takeByteCounter = 0;

    
   
130
      putByteCounter = 0;

    
   
131

   
108
      queueStored.release(puts);
132
      queueStored.release(puts);
109
      if(remainingChange > 0) {
133
      if(remainingChange > 0) {
110
        queueRemaining.release(remainingChange);
134
        queueRemaining.release(remainingChange);
111
      }
135
      }
112
      if (puts > 0) {
136
      if (puts > 0) {
[+20] [20] 15 lines
[+20] [+] protected void doRollback() {
128
        while(!takeList.isEmpty()) {
152
        while(!takeList.isEmpty()) {
129
          queue.addFirst(takeList.removeLast());
153
          queue.addFirst(takeList.removeLast());
130
        }
154
        }
131
        putList.clear();
155
        putList.clear();
132
      }
156
      }

    
   
157
      bytesRemaining.release(putByteCounter);

    
   
158
      putByteCounter = 0;

    
   
159
      takeByteCounter = 0;

    
   
160

   
133
      queueStored.release(takes);
161
      queueStored.release(takes);
134
      channelCounter.setChannelSize(queue.size());
162
      channelCounter.setChannelSize(queue.size());
135
    }
163
    }
136

    
   
164

   
137
  }
165
  }
[+20] [20] 15 lines
[+20] protected void doRollback() {
153
  // like we would if we tried to use a blocking call on queue
181
  // like we would if we tried to use a blocking call on queue
154
  private Semaphore queueStored;
182
  private Semaphore queueStored;
155
  // maximum items in a transaction queue
183
  // maximum items in a transaction queue
156
  private volatile Integer transCapacity;
184
  private volatile Integer transCapacity;
157
  private volatile int keepAlive;
185
  private volatile int keepAlive;

    
   
186
  private volatile int byteCapacity;

    
   
187
  private volatile int lastByteCapacity;

    
   
188
  private volatile int byteCapacityBufferPercentage;

    
   
189
  private Semaphore bytesRemaining;
158
  private ChannelCounter channelCounter;
190
  private ChannelCounter channelCounter;
159

    
   
191

   
160

    
   
192

   
161
  public MemoryChannel() {
193
  public MemoryChannel() {
162
    super();
194
    super();
163
    queueLock = 0;
195
    queueLock = 0;
164
  }
196
  }
165

    
   
197

   

    
   
198
  /**

    
   
199
   * Read parameters from context

    
   
200
   * <li>capacity = type long that defines the total number of events allowed at one time in the queue.

    
   
201
   * <li>transactionCapacity = type long that defines the total number of events allowed in one transaction.

    
   
202
   * <li>byteCapacity = type long that defines the max number of bytes used for events in the queue.

    
   
203
   * <li>byteCapacityBufferPercentage = type int that defines the percent of buffer between byteCapacity and the estimated event size.

    
   
204
   * <li>keep-alive = type int that defines the number of second to wait for a queue permit

    
   
205
   */
166
  @Override
206
  @Override
167
  public void configure(Context context) {
207
  public void configure(Context context) {
168
    String strCapacity = context.getString("capacity");

   
169
    Integer capacity = null;
208
    Integer capacity = null;
170
    if(strCapacity == null) {

   
171
      capacity = defaultCapacity;

   
172
    } else {

   
173
      try {
209
    try {
174
        capacity = Integer.parseInt(strCapacity);
210
      capacity = context.getInteger("capacity", defaultCapacity);
175
      } catch(NumberFormatException e) {
211
    } catch(NumberFormatException e) {
176
        capacity = defaultCapacity;
212
      capacity = defaultCapacity;
177
      }
213
    }
178
    }
214

   
179
    String strTransCapacity = context.getString("transactionCapacity");

   
180
    if(strTransCapacity == null) {

   
181
      transCapacity = defaultTransCapacity;

   
182
    } else {

   
183
      try {
215
    try {
184
        transCapacity = Integer.parseInt(strTransCapacity);
216
      transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
185
      } catch(NumberFormatException e) {
217
    } catch(NumberFormatException e) {
186
        transCapacity = defaultTransCapacity;
218
      transCapacity = defaultTransCapacity;
187
      }
219
    }
188
    }
220

   
189
    Preconditions.checkState(transCapacity <= capacity);
221
    Preconditions.checkState(transCapacity <= capacity);
190

    
   
222

   
191
    String strKeepAlive = context.getString("keep-alive");
223
    try {

    
   
224
      byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage);

    
   
225
    } catch(NumberFormatException e) {

    
   
226
      byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage;

    
   
227
    }

    
   
228

   

    
   
229
    try {

    
   
230
      byteCapacity = (int)(context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 ) /byteCapacitySlotSize);

    
   
231
      if (byteCapacity < 1) {

    
   
232
        byteCapacity = Integer.MAX_VALUE;

    
   
233
      }

    
   
234
    } catch(NumberFormatException e) {

    
   
235
      byteCapacity = (int)(defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 ) /byteCapacitySlotSize);

    
   
236
    }
192

    
   
237

   
193
    if (strKeepAlive == null) {
238
    try {

    
   
239
      keepAlive = context.getInteger("keep-alive", defaultKeepAlive);

    
   
240
    } catch(NumberFormatException e) {
194
      keepAlive = defaultKeepAlive;
241
      keepAlive = defaultKeepAlive;
195
    } else {

   
196
      keepAlive = Integer.parseInt(strKeepAlive);

   
197
    }
242
    }
198

    
   
243

   
199
    if(queue != null) {
244
    if(queue != null) {
200
      try {
245
      try {
201
        resizeQueue(capacity);
246
        resizeQueue(capacity);
[+20] [20] 6 lines
[+20] protected void doRollback() {
208
        queueRemaining = new Semaphore(capacity);
253
        queueRemaining = new Semaphore(capacity);
209
        queueStored = new Semaphore(0);
254
        queueStored = new Semaphore(0);
210
      }
255
      }
211
    }
256
    }
212

    
   
257

   

    
   
258
    if (bytesRemaining == null) {

    
   
259
      bytesRemaining = new Semaphore(byteCapacity);

    
   
260
      lastByteCapacity = byteCapacity;

    
   
261
    } else {

    
   
262
      if (byteCapacity > lastByteCapacity) {

    
   
263
        bytesRemaining.release(byteCapacity - lastByteCapacity);

    
   
264
        lastByteCapacity = byteCapacity;

    
   
265
      } else {

    
   
266
        try {

    
   
267
          if(!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity, keepAlive, TimeUnit.SECONDS)) {

    
   
268
            LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, resizing has been aborted");

    
   
269
          } else {

    
   
270
            lastByteCapacity = byteCapacity;

    
   
271
          }

    
   
272
        } catch (InterruptedException e) {

    
   
273
          Thread.currentThread().interrupt();

    
   
274
        }

    
   
275
      }

    
   
276
    }

    
   
277

   
213
    if (channelCounter == null) {
278
    if (channelCounter == null) {
214
      channelCounter = new ChannelCounter(getName());
279
      channelCounter = new ChannelCounter(getName());
215
    }
280
    }
216
  }
281
  }
217

    
   
282

   
[+20] [20] 43 lines
[+20] [+] public synchronized void stop() {
261

    
   
326

   
262
  @Override
327
  @Override
263
  protected BasicTransactionSemantics createTransaction() {
328
  protected BasicTransactionSemantics createTransaction() {
264
    return new MemoryTransaction(transCapacity, channelCounter);
329
    return new MemoryTransaction(transCapacity, channelCounter);
265
  }
330
  }

    
   
331

   

    
   
332
  private long estimateEventSize(Event event)

    
   
333
  {

    
   
334
    return event.getBody().length;

    
   
335
  }
266
}
336
}
flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
Revision e070864 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java: Loading...