Review Board 1.7.22


FLUME-2015 ElasticSearchSink: need access to IndexRequestBuilder instance during flume event processing

Review Request #10835 - Created April 29, 2013 and updated

Tim Bacon
flume-1.4
FLUME-2015
Reviewers
Flume
iekpo
flume-git
This change adds an ElasticSearchIndexRequestBuilderFactory interface to allow users of an ElasticSearchSink to have greater control over the actual indexing of events that arrive at the sink. It is intended to meet the needs of:
- my own non-Kibana/non-logging-event use case
- the specific "allow for indexing an id-field" needs of FLUME-1972
- the (UTC) determination of the index to write to per FLUME-1782

This patch is backwards-compatible and imposes no changes on existing users of the sink.
Unit tests added and run.
Patch applied and run successfully in my own flume test environment.
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 * http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing,

    
   
13
 * software distributed under the License is distributed on an

    
   
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

    
   
15
 * KIND, either express or implied.  See the License for the

    
   
16
 * specific language governing permissions and limitations

    
   
17
 * under the License.

    
   
18
 */

    
   
19
package org.apache.flume.sink.elasticsearch;

    
   
20

   

    
   
21
import java.io.IOException;

    
   
22
import java.util.Map;

    
   
23

   

    
   
24
import org.apache.commons.lang.StringUtils;

    
   
25
import org.apache.commons.lang.time.FastDateFormat;

    
   
26
import org.apache.flume.Event;

    
   
27
import org.apache.flume.event.SimpleEvent;

    
   
28
import org.elasticsearch.action.index.IndexRequestBuilder;

    
   
29
import org.elasticsearch.client.Client;

    
   
30
import org.joda.time.DateTimeUtils;

    
   
31

   

    
   
32
import com.google.common.annotations.VisibleForTesting;

    
   
33
import com.google.common.collect.Maps;

    
   
34

   

    
   
35
/**

    
   
36
 * Abstract base class for custom implementations of

    
   
37
 * {@link ElasticSearchIndexRequestBuilderFactory}.

    
   
38
 */

    
   
39
public abstract class AbstractElasticSearchIndexRequestBuilderFactory

    
   
40
    implements ElasticSearchIndexRequestBuilderFactory {

    
   
41

   

    
   
42
  /**

    
   
43
   * {@link FastDateFormat} to use for index names

    
   
44
   *   in {@link #getIndexName(String, long)}

    
   
45
   */

    
   
46
  protected final FastDateFormat fastDateFormat;

    
   
47

   

    
   
48
  /**

    
   
49
   * Constructor for subclasses

    
   
50
   * @param fastDateFormat {@link FastDateFormat} to use for index names

    
   
51
   */

    
   
52
  protected AbstractElasticSearchIndexRequestBuilderFactory(

    
   
53
    FastDateFormat fastDateFormat) {

    
   
54
    this.fastDateFormat = fastDateFormat;

    
   
55
  }

    
   
56

   

    
   
57
  /**

    
   
58
   * Creates and prepares an {@link IndexRequestBuilder} from the supplied

    
   
59
   * {@link Client} via delegation to the subclass-hook template methods

    
   
60
   * {@link #getIndexName(String, long)} and

    
   
61
   * {@link #prepareIndexRequest(IndexRequestBuilder, String, String, Event)}

    
   
62
   */

    
   
63
  @Override

    
   
64
  public IndexRequestBuilder createIndexRequest(Client client,

    
   
65
        String indexPrefix, String indexType, Event event) throws IOException {

    
   
66
    IndexRequestBuilder request = prepareIndex(client);

    
   
67
    TimestampedEvent timestampedEvent = new TimestampedEvent(event);

    
   
68
    long timestamp = timestampedEvent.getTimestamp();

    
   
69
    String indexName = getIndexName(indexPrefix, timestamp);

    
   
70
    prepareIndexRequest(request, indexName, indexType, timestampedEvent);

    
   
71
    return request;

    
   
72
  }

    
   
73

   

    
   
74
  @VisibleForTesting

    
   
75
  IndexRequestBuilder prepareIndex(Client client) {

    
   
76
    return client.prepareIndex();

    
   
77
  }

    
   
78

   

    
   
79
  /**

    
   
80
   * Gets the name of the index to use for an index request

    
   
81
   * @return index name of the form 'indexPrefix-formattedTimestamp'

    
   
82
   * @param indexPrefix

    
   
83
   *          Prefix of index name to use -- as configured on the sink

    
   
84
   * @param timestamp

    
   
85
   *          timestamp (millis) to format / use

    
   
86
   */

    
   
87
  protected String getIndexName(String indexPrefix, long timestamp) {

    
   
88
    return new StringBuilder(indexPrefix).append('-')

    
   
89
      .append(fastDateFormat.format(timestamp)).toString();

    
   
90
  }

    
   
91

   

    
   
92
  /**

    
   
93
   * Prepares an ElasticSearch {@link IndexRequestBuilder} instance

    
   
94
   * @param indexRequest

    
   
95
   *          The (empty) ElasticSearch {@link IndexRequestBuilder} to prepare

    
   
96
   * @param indexName

    
   
97
   *          Index name to use -- as per {@link #getIndexName(String, long)}

    
   
98
   * @param indexType

    
   
99
   *          Index type to use -- as configured on the sink

    
   
100
   * @param event

    
   
101
   *          Flume event to serialize and add to index request

    
   
102
   * @throws IOException

    
   
103
   *           If an error occurs e.g. during serialization

    
   
104
  */

    
   
105
  protected abstract void prepareIndexRequest(

    
   
106
      IndexRequestBuilder indexRequest, String indexName,

    
   
107
      String indexType, Event event) throws IOException;

    
   
108

   

    
   
109
}

    
   
110

   

    
   
111
/**

    
   
112
 * {@link Event} implementation that has a timestamp.

    
   
113
 * The timestamp is taken from (in order of precedence):<ol>

    
   
114
 * <li>The "timestamp" header of the base event, if present</li>

    
   
115
 * <li>The "@timestamp" header of the base event, if present</li>

    
   
116
 * <li>The current time in millis, otherwise</li>

    
   
117
 * </ol>

    
   
118
 */

    
   
119
final class TimestampedEvent extends SimpleEvent {

    
   
120

   

    
   
121
    private final long timestamp;

    
   
122

   

    
   
123
    TimestampedEvent(Event base) {

    
   
124
      setBody(base.getBody());

    
   
125
      Map<String, String> headers = Maps.newHashMap(base.getHeaders());

    
   
126
      String timestampString = headers.get("timestamp");

    
   
127
      if (StringUtils.isBlank(timestampString)) {

    
   
128
        timestampString = headers.get("@timestamp");

    
   
129
      }

    
   
130
      if (StringUtils.isBlank(timestampString)) {

    
   
131
        this.timestamp = DateTimeUtils.currentTimeMillis();

    
   
132
        headers.put("timestamp", String.valueOf(timestamp ));

    
   
133
      } else {

    
   
134
        this.timestamp = Long.valueOf(timestampString);

    
   
135
      }

    
   
136
      setHeaders(headers);

    
   
137
    }

    
   
138

   

    
   
139
    long getTimestamp() {

    
   
140
        return timestamp;

    
   
141
    }

    
   
142
}
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
Revision dc6a093 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
Revision 1b3db14 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
Revision 2edacdc New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
Revision 94b95b1 New Change
 
  1. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java: Loading...
  2. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java: Loading...
  3. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java: Loading...
  4. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java: Loading...
  5. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java: Loading...
  6. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java: Loading...
  7. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java: Loading...
  8. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java: Loading...