Review Board 1.7.22


Dump RollingFileSink's counter status when agent stops

Review Request #7022 - Created Sept. 11, 2012 and submitted

Ted Malaska
trunk
FLUME-1537
Reviewers
Flume
flume-git
Added counting for:
1. events
2. transaction success
3. transaction failure
Also added logging of name and counters at start and stop. Just like NullSink.

 

Diff revision 4 (Latest)

1 2 3 4
1 2 3 4

  1. flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java: Loading...
flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
Revision a94eea1 New Change
[20] 25 lines
[+20]
26
import java.util.concurrent.ScheduledExecutorService;
26
import java.util.concurrent.ScheduledExecutorService;
27
import java.util.concurrent.TimeUnit;
27
import java.util.concurrent.TimeUnit;
28

    
   
28

   
29
import org.apache.flume.Channel;
29
import org.apache.flume.Channel;
30
import org.apache.flume.Context;
30
import org.apache.flume.Context;
31
import org.apache.flume.CounterGroup;

   
32
import org.apache.flume.Event;
31
import org.apache.flume.Event;
33
import org.apache.flume.EventDeliveryException;
32
import org.apache.flume.EventDeliveryException;
34
import org.apache.flume.Transaction;
33
import org.apache.flume.Transaction;
35
import org.apache.flume.conf.Configurable;
34
import org.apache.flume.conf.Configurable;
36
import org.apache.flume.formatter.output.PathManager;
35
import org.apache.flume.formatter.output.PathManager;

    
   
36
import org.apache.flume.instrumentation.SinkCounter;
37
import org.slf4j.Logger;
37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
38
import org.slf4j.LoggerFactory;
39

    
   
39

   
40
import com.google.common.base.Preconditions;
40
import com.google.common.base.Preconditions;
41
import com.google.common.util.concurrent.ThreadFactoryBuilder;
41
import com.google.common.util.concurrent.ThreadFactoryBuilder;
[+20] [20] 12 lines
[+20] [+] public class RollingFileSink extends AbstractSink implements Configurable {
54
  private File directory;
54
  private File directory;
55
  private long rollInterval;
55
  private long rollInterval;
56
  private OutputStream outputStream;
56
  private OutputStream outputStream;
57
  private ScheduledExecutorService rollService;
57
  private ScheduledExecutorService rollService;
58

    
   
58

   
59
  private Context context;

   
60

    
   

   
61
  private String serializerType;
59
  private String serializerType;
62
  private Context serializerContext;
60
  private Context serializerContext;
63
  private EventSerializer serializer;
61
  private EventSerializer serializer;
64

    
   
62

   
65
  private CounterGroup counterGroup;
63
  private SinkCounter sinkCounter;
66

    
   
64

   
67
  private PathManager pathController;
65
  private PathManager pathController;
68
  private volatile boolean shouldRotate;
66
  private volatile boolean shouldRotate;
69

    
   
67

   
70
  public RollingFileSink() {
68
  public RollingFileSink() {
71
    counterGroup = new CounterGroup();

   
72
    pathController = new PathManager();
69
    pathController = new PathManager();
73
    shouldRotate = false;
70
    shouldRotate = false;
74
  }
71
  }
75

    
   
72

   
76
  @Override
73
  @Override
77
  public void configure(Context context) {
74
  public void configure(Context context) {
78
    this.context = context;

   
79

    
   
75

   
80
    String directory = context.getString("sink.directory");
76
    String directory = context.getString("sink.directory");
81
    String rollInterval = context.getString("sink.rollInterval");
77
    String rollInterval = context.getString("sink.rollInterval");
82

    
   
78

   
83
    serializerType = context.getString("sink.serializer", "TEXT");
79
    serializerType = context.getString("sink.serializer", "TEXT");
[+20] [20] 10 lines
[+20] public class RollingFileSink extends AbstractSink implements Configurable {
94
    }
90
    }
95

    
   
91

   
96
    batchSize = context.getInteger("sink.batchSize", defaultBatchSize);
92
    batchSize = context.getInteger("sink.batchSize", defaultBatchSize);
97

    
   
93

   
98
    this.directory = new File(directory);
94
    this.directory = new File(directory);

    
   
95

   

    
   
96
    if (sinkCounter == null) {

    
   
97
      sinkCounter = new SinkCounter(getName());

    
   
98
    }
99
  }
99
  }
100

    
   
100

   
101
  @Override
101
  @Override
102
  public void start() {
102
  public void start() {
103

    
   
103
    logger.info("Starting {}...", this);

    
   
104
    sinkCounter.start();
104
    super.start();
105
    super.start();
105

    
   
106

   
106
    pathController.setBaseDirectory(directory);
107
    pathController.setBaseDirectory(directory);
107
    if(rollInterval > 0){
108
    if(rollInterval > 0){
108

    
   
109

   
[+20] [20] 20 lines
[+20] [+] public void run() {
129

    
   
130

   
130
      }, rollInterval, rollInterval, TimeUnit.SECONDS);
131
      }, rollInterval, rollInterval, TimeUnit.SECONDS);
131
    } else{
132
    } else{
132
      logger.info("RollInterval is not valid, file rolling will not happen.");
133
      logger.info("RollInterval is not valid, file rolling will not happen.");
133
    }
134
    }

    
   
135
    logger.info("RollingFileSink {} started.", getName());
134
  }
136
  }
135

    
   
137

   
136
  @Override
138
  @Override
137
  public Status process() throws EventDeliveryException {
139
  public Status process() throws EventDeliveryException {
138
    if (shouldRotate) {
140
    if (shouldRotate) {
[+20] [20] 5 lines
[+20] public Status process() throws EventDeliveryException {
144
        try {
146
        try {
145
          serializer.flush();
147
          serializer.flush();
146
          serializer.beforeClose();
148
          serializer.beforeClose();
147
          outputStream.flush();
149
          outputStream.flush();
148
          outputStream.close();
150
          outputStream.close();

    
   
151
          sinkCounter.incrementConnectionClosedCount();
149
          shouldRotate = false;
152
          shouldRotate = false;
150
        } catch (IOException e) {
153
        } catch (IOException e) {

    
   
154
          sinkCounter.incrementConnectionFailedCount();
151
          throw new EventDeliveryException("Unable to rotate file "
155
          throw new EventDeliveryException("Unable to rotate file "
152
              + pathController.getCurrentFile() + " while delivering event", e);
156
              + pathController.getCurrentFile() + " while delivering event", e);
153
        }
157
        }
154

    
   
158

   
155
        serializer = null;
159
        serializer = null;
[+20] [20] 9 lines
[+20] public Status process() throws EventDeliveryException {
165
        outputStream = new BufferedOutputStream(
169
        outputStream = new BufferedOutputStream(
166
            new FileOutputStream(currentFile));
170
            new FileOutputStream(currentFile));
167
        serializer = EventSerializerFactory.getInstance(
171
        serializer = EventSerializerFactory.getInstance(
168
            serializerType, serializerContext, outputStream);
172
            serializerType, serializerContext, outputStream);
169
        serializer.afterCreate();
173
        serializer.afterCreate();

    
   
174
        sinkCounter.incrementConnectionCreatedCount();
170
      } catch (IOException e) {
175
      } catch (IOException e) {

    
   
176
        sinkCounter.incrementConnectionFailedCount();
171
        throw new EventDeliveryException("Failed to open file "
177
        throw new EventDeliveryException("Failed to open file "
172
            + pathController.getCurrentFile() + " while delivering event", e);
178
            + pathController.getCurrentFile() + " while delivering event", e);
173
      }
179
      }
174
    }
180
    }
175

    
   
181

   
176
    Channel channel = getChannel();
182
    Channel channel = getChannel();
177
    Transaction transaction = channel.getTransaction();
183
    Transaction transaction = channel.getTransaction();
178
    Event event = null;
184
    Event event = null;
179
    Status result = Status.READY;
185
    Status result = Status.READY;
180

    
   
186

   
181
    try {
187
    try {
182
      transaction.begin();
188
      transaction.begin();

    
   
189
      int eventAttemptCounter = 0;
183
      for (int i = 0; i < batchSize; i++) {
190
      for (int i = 0; i < batchSize; i++) {
184
        event = channel.take();
191
        event = channel.take();
185
        if (event != null) {
192
        if (event != null) {

    
   
193
          sinkCounter.incrementEventDrainAttemptCount();

    
   
194
          eventAttemptCounter++;
186
          serializer.write(event);
195
          serializer.write(event);
187

    
   
196

   
188
          /*
197
          /*
189
           * FIXME: Feature: Rotate on size and time by checking bytes written and
198
           * FIXME: Feature: Rotate on size and time by checking bytes written and
190
           * setting shouldRotate = true if we're past a threshold.
199
           * setting shouldRotate = true if we're past a threshold.
[+20] [20] 10 lines
[+20] public Status process() throws EventDeliveryException {
201
        }
210
        }
202
      }
211
      }
203
      serializer.flush();
212
      serializer.flush();
204
      outputStream.flush();
213
      outputStream.flush();
205
      transaction.commit();
214
      transaction.commit();

    
   
215
      sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
206
    } catch (Exception ex) {
216
    } catch (Exception ex) {
207
      transaction.rollback();
217
      transaction.rollback();
208
      throw new EventDeliveryException("Failed to process transaction", ex);
218
      throw new EventDeliveryException("Failed to process transaction", ex);
209
    } finally {
219
    } finally {
210
      transaction.close();
220
      transaction.close();
211
    }
221
    }
212

    
   
222

   
213
    return result;
223
    return result;
214
  }
224
  }
215

    
   
225

   
216
  @Override
226
  @Override
217
  public void stop() {
227
  public void stop() {
218

    
   
228
    logger.info("RollingFile sink {} stopping...", getName());

    
   
229
    sinkCounter.stop();
219
    super.stop();
230
    super.stop();
220

    
   
231

   
221
    if (outputStream != null) {
232
    if (outputStream != null) {
222
      logger.debug("Closing file {}", pathController.getCurrentFile());
233
      logger.debug("Closing file {}", pathController.getCurrentFile());
223

    
   
234

   
224
      try {
235
      try {
225
        serializer.flush();
236
        serializer.flush();
226
        serializer.beforeClose();
237
        serializer.beforeClose();
227
        outputStream.flush();
238
        outputStream.flush();
228
        outputStream.close();
239
        outputStream.close();

    
   
240
        sinkCounter.incrementConnectionClosedCount();
229
      } catch (IOException e) {
241
      } catch (IOException e) {

    
   
242
        sinkCounter.incrementConnectionFailedCount();
230
        logger.error("Unable to close output stream. Exception follows.", e);
243
        logger.error("Unable to close output stream. Exception follows.", e);
231
      }
244
      }
232
    }
245
    }
233
    if(rollInterval > 0){
246
    if(rollInterval > 0){
234
      rollService.shutdown();
247
      rollService.shutdown();
[+20] [20] 7 lines
[+20] public Status process() throws EventDeliveryException {
242
              "Interrupted while waiting for roll service to stop. " +
255
              "Interrupted while waiting for roll service to stop. " +
243
              "Please report this.", e);
256
              "Please report this.", e);
244
        }
257
        }
245
      }
258
      }
246
    }
259
    }

    
   
260
    logger.info("RollingFile sink {} stopped. Event metrics: {}",

    
   
261
        getName(), sinkCounter);
247
  }
262
  }
248

    
   
263

   
249
  public File getDirectory() {
264
  public File getDirectory() {
250
    return directory;
265
    return directory;
251
  }
266
  }
[+20] [20] 14 lines
  1. flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java: Loading...