Review Board 1.7.22


RollingFileSink need to be able to construct directory path based on escape sequence

Review Request #7131 - Created Sept. 17, 2012 and updated

Ted Malaska
trunk
FLUME-1295
Reviewers
Flume
flume-git
OK, first note this is still a work in progress, but I'm at the point where I need feedback.

Initial goal was to allow RollingFileSink to do Escape Sequence the same way as HDFSEventSink.

In order to reuse code, my change attempt to reuse bucketWriter and BucketWriterLinkedHashMap.

BucketWriter has been broken up into three files.

> AbstractBucketWriter : Contains all the rollover and escape sequence logic.
> BucketWriter: Contains the IO logic to HDFS
> FileBucketWriter: contains the IO logic to normal files

I also added a couple unit tests to RollingFileSink for new types of roll overs and escape sequences.


 
flume-ng-core/src/main/java/org/apache/flume/sink/AbstractBucketWriter.java
New File

    
   
1
package org.apache.flume.sink;

    
   
2

   

    
   
3
import java.io.IOException;

    
   
4
import java.security.PrivilegedExceptionAction;

    
   
5
import java.util.concurrent.Callable;

    
   
6
import java.util.concurrent.ScheduledExecutorService;

    
   
7
import java.util.concurrent.ScheduledFuture;

    
   
8
import java.util.concurrent.TimeUnit;

    
   
9
import java.util.concurrent.atomic.AtomicLong;

    
   
10

   

    
   
11
import org.apache.flume.Context;

    
   
12
import org.apache.flume.Event;

    
   
13
import org.apache.flume.instrumentation.SinkCounter;

    
   
14
import org.slf4j.Logger;

    
   
15
import org.slf4j.LoggerFactory;

    
   
16

   

    
   
17
public abstract class AbstractBucketWriter {

    
   
18

   

    
   
19
  protected static final Logger LOG = LoggerFactory

    
   
20
      .getLogger(AbstractBucketWriter.class);

    
   
21

   

    
   
22
  /**

    
   
23
   * This lock ensures that only one thread can open a file at a time.

    
   
24
   */

    
   
25
  protected static final Integer staticLock = new Integer(1);

    
   
26

   

    
   
27
  protected static final String IN_USE_EXT = ".tmp";

    
   
28

   

    
   
29
  private final long rollInterval;

    
   
30
  private final long rollSize;

    
   
31
  private final long rollCount;

    
   
32
  private final long batchSize;

    
   
33
  private final ScheduledExecutorService timedRollerPool;

    
   
34

   

    
   
35
  private final AtomicLong fileExtensionCounter;

    
   
36
  private long eventCounter;

    
   
37
  private long processSize;

    
   
38

   

    
   
39
  private volatile long batchCounter;

    
   
40
  private volatile boolean isOpen;

    
   
41

   

    
   
42
  private volatile ScheduledFuture<Void> timedRollFuture;

    
   
43

   

    
   
44
  private SinkCounter sinkCounter;

    
   
45

   

    
   
46
  private volatile String filePath;

    
   
47
  private volatile String bucketPath;

    
   
48

   

    
   
49
  protected AbstractBucketWriter(long rollInterval, long rollSize,

    
   
50
      long rollCount, long batchSize, String filePath,

    
   
51
      ScheduledExecutorService timedRollerPool, SinkCounter sinkCounter) {

    
   
52
    this.rollInterval = rollInterval;

    
   
53
    this.rollSize = rollSize;

    
   
54
    this.rollCount = rollCount;

    
   
55
    this.batchSize = batchSize;

    
   
56
    this.filePath = filePath;

    
   
57
    this.timedRollerPool = timedRollerPool;

    
   
58
    this.sinkCounter = sinkCounter;

    
   
59
    fileExtensionCounter = new AtomicLong(System.currentTimeMillis());

    
   
60
    isOpen = false;

    
   
61
  }

    
   
62

   

    
   
63
  /**

    
   
64
   * Clear the class counters

    
   
65
   */

    
   
66
  private void resetCounters() {

    
   
67
    eventCounter = 0;

    
   
68
    processSize = 0;

    
   
69
    batchCounter = 0;

    
   
70
  }

    
   
71

   

    
   
72
  protected abstract <T> T runPrivileged(final PrivilegedExceptionAction<T> action)

    
   
73
      throws IOException, InterruptedException;

    
   
74

   

    
   
75
  /**

    
   
76
   * open() is called by append()

    
   
77
   * @throws IOException

    
   
78
   */

    
   
79
  private void open() throws IOException, InterruptedException {

    
   
80
    runPrivileged(new PrivilegedExceptionAction<Void>() {

    
   
81
      @Override

    
   
82
      public Void run() throws Exception {

    
   
83

   

    
   
84
        try

    
   
85
        {

    
   
86
          bucketPath = doOpen(filePath, fileExtensionCounter.incrementAndGet());

    
   
87
        } catch (IOException ex)

    
   
88
        {

    
   
89
          sinkCounter.incrementConnectionFailedCount();

    
   
90
          //TODO old one doesn't re-throw

    
   
91
          throw ex;

    
   
92
        }

    
   
93

   

    
   
94
        sinkCounter.incrementConnectionCreatedCount();

    
   
95
        resetCounters();

    
   
96

   

    
   
97
        // if time-based rolling is enabled, schedule the roll

    
   
98
        if (rollInterval > 0) {

    
   
99
          Callable<Void> action = new Callable<Void>() {

    
   
100
            @Override

    
   
101
            public Void call() throws Exception {

    
   
102
              LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",

    
   
103
                  bucketPath + IN_USE_EXT, rollInterval);

    
   
104
              try {

    
   
105
                close();

    
   
106
              } catch(IOException ex) {

    
   
107
                sinkCounter.incrementConnectionFailedCount();

    
   
108
                //TODO old one doesn't re-throw

    
   
109
                throw ex;

    
   
110
              } catch(Throwable t) {

    
   
111
                LOG.error("Unexpected error", t);

    
   
112
              }

    
   
113
              return null;

    
   
114
            }

    
   
115
          };

    
   
116
          timedRollFuture = timedRollerPool.schedule(action, rollInterval,

    
   
117
              TimeUnit.SECONDS);

    
   
118
        }

    
   
119

   

    
   
120
        isOpen = true;

    
   
121

   

    
   
122
        return null;

    
   
123
      }

    
   
124
    });

    
   
125
  }

    
   
126

   

    
   
127
  /**

    
   
128
   * Close the file handle and rename the temp file to the permanent filename.

    
   
129
   * Safe to call multiple times. Logs HDFSWriter.close() exceptions.

    
   
130
   * @throws IOException On failure to rename if temp file exists.

    
   
131
   */

    
   
132
  public synchronized void close() throws IOException, InterruptedException {

    
   
133
    runPrivileged(new PrivilegedExceptionAction<Void>() {

    
   
134
      @Override

    
   
135
      public Void run() throws Exception {

    
   
136
        try

    
   
137
        {

    
   
138
          if (isOpen) {

    
   
139
            LOG.debug("Closing {}", bucketPath + IN_USE_EXT);

    
   
140
            doClose(bucketPath);

    
   
141
            isOpen = false;

    
   
142
          } else {

    
   
143
            LOG.info("Writer is already closed: {}", bucketPath + IN_USE_EXT);

    
   
144
          }

    
   
145
          sinkCounter.incrementConnectionClosedCount();

    
   
146
        } catch (Exception ex) {

    
   
147
          sinkCounter.incrementConnectionFailedCount();

    
   
148
          throw ex;

    
   
149
        }

    
   
150

   

    
   
151
        // NOTE: timed rolls go through this codepath as well as other roll types

    
   
152
        if (timedRollFuture != null && !timedRollFuture.isDone()) {

    
   
153
          timedRollFuture.cancel(false); // do not cancel myself if running!

    
   
154
          timedRollFuture = null;

    
   
155
        }

    
   
156

   

    
   
157
        if (bucketPath != null ) {

    
   
158
          doRenameBucket(bucketPath); // could block or throw IOException

    
   
159
        }

    
   
160

   

    
   
161
        return null;

    
   
162
      }

    
   
163
    });

    
   
164
  }

    
   
165

   

    
   
166
  /**

    
   
167
   * flush the data

    
   
168
   */

    
   
169
  public synchronized void flush() throws IOException, InterruptedException {

    
   
170
    runPrivileged(new PrivilegedExceptionAction<Void>() {

    
   
171
      @Override

    
   
172
      public Void run() throws Exception {

    
   
173
        doFlush();

    
   
174
        batchCounter = 0;

    
   
175
        return null;

    
   
176
      }

    
   
177
    });

    
   
178
  }

    
   
179

   

    
   
180
  /**

    
   
181
   * check if time to rotate the file

    
   
182
   */

    
   
183
  private boolean shouldRotate() {

    
   
184
    boolean doRotate = false;

    
   
185

   

    
   
186
    if ((rollCount > 0) && (rollCount <= eventCounter)) {

    
   
187
      LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);

    
   
188
      doRotate = true;

    
   
189
    }

    
   
190
    if ((rollSize > 0) && (rollSize <= processSize)) {

    
   
191
      LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);

    
   
192
      doRotate = true;

    
   
193
    }

    
   
194

   

    
   
195
    return doRotate;

    
   
196
  }

    
   
197

   

    
   
198
  /**

    
   
199
   * Open file handles, write data, update stats, handle file rolling and

    
   
200
   * batching / flushing. <br />

    
   
201
   * If the write fails, the file is implicitly closed and then the IOException

    
   
202
   * is rethrown. <br />

    
   
203
   * We rotate before append, and not after, so that the active file rolling

    
   
204
   * mechanism will never roll an empty file. This also ensures that the file

    
   
205
   * creation time reflects when the first event was written.

    
   
206
   */

    
   
207
  public synchronized void append(Event event) throws IOException, InterruptedException {

    
   
208
    if (!isOpen) {

    
   
209
      open();

    
   
210
    }

    
   
211

   

    
   
212
    // check if it's time to rotate the file

    
   
213
    if (shouldRotate()) {

    
   
214
      close();

    
   
215
      open();

    
   
216
    }

    
   
217

   

    
   
218
    // write the event

    
   
219
    try {

    
   
220
      sinkCounter.incrementEventDrainAttemptCount();

    
   
221
      doAppend(event);

    
   
222
    } catch (IOException e) {

    
   
223
      LOG.warn("Caught IOException writing to ({}). Closing file (" +

    
   
224
          bucketPath + IN_USE_EXT + ") and rethrowing exception.",

    
   
225
          e.getMessage());

    
   
226
      try {

    
   
227
        close();

    
   
228
      } catch (IOException e2) {

    
   
229
        LOG.warn("Caught IOException while closing file (" +

    
   
230
             bucketPath + IN_USE_EXT + "). Exception follows.", e2);

    
   
231
      }

    
   
232
      throw e;

    
   
233
    }

    
   
234

   

    
   
235
    // update statistics

    
   
236
    processSize += event.getBody().length;

    
   
237
    eventCounter++;

    
   
238
    batchCounter++;

    
   
239

   

    
   
240
    if (batchCounter == batchSize) {

    
   
241
      flush();

    
   
242
    }

    
   
243
  }

    
   
244

   

    
   
245
  public boolean isBatchComplete() {

    
   
246
    return (batchCounter == 0);

    
   
247
  }

    
   
248

   

    
   
249
  @Override

    
   
250
  public String toString() {

    
   
251
    return "[ " + this.getClass().getSimpleName() + " filePath = " + filePath +

    
   
252
        ", bucketPath = " + bucketPath + " ]";

    
   
253
  }

    
   
254

   

    
   
255
  /**

    
   
256
   * doOpen() must only be called by open()

    
   
257
   * @throws IOException

    
   
258
   * @returns bucketPath

    
   
259
   */

    
   
260
  protected abstract String doOpen(String filePath, long extensionCounter) throws IOException;

    
   
261

   

    
   
262
  /**

    
   
263
   * doClose() must only be called by close()

    
   
264
   * @throws IOException

    
   
265
   */

    
   
266
  protected abstract void doClose(String bucketPath) throws IOException;

    
   
267

   

    
   
268
  protected abstract void doRenameBucket(String bucketPath) throws IOException;

    
   
269

   

    
   
270
  protected abstract void doFlush() throws IOException;

    
   
271

   

    
   
272
  protected abstract void doAppend(Event event) throws IOException;

    
   
273

   

    
   
274

   

    
   
275
}
flume-ng-core/src/main/java/org/apache/flume/sink/BucketWriterLinkedHashMap.java
New File
 
flume-ng-core/src/main/java/org/apache/flume/sink/FileBucketWriter.java
New File
 
flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
Revision be640bb New Change
 
flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
Revision 07fa644 New Change
 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
Revision 6408eb9 New Change
 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
Revision 9a76ecb New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/sink/AbstractBucketWriter.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/sink/BucketWriterLinkedHashMap.java: Loading...
  3. flume-ng-core/src/main/java/org/apache/flume/sink/FileBucketWriter.java: Loading...
  4. flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java: Loading...
  5. flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java: Loading...
  6. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java: Loading...
  7. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java: Loading...