Review Board 1.7.22


FLUME-2015 ElasticSearchSink: need access to IndexRequestBuilder instance during flume event processing

Review Request #10835 - Created April 29, 2013 and updated

Tim Bacon
flume-1.4
FLUME-2015
Reviewers
Flume
iekpo
flume-git
This change adds an ElasticSearchIndexRequestBuilderFactory interface to allow users of an ElasticSearchSink to have greater control over the actual indexing of events that arrive at the sink. It is intended to meet the needs of:
- my own non-Kibana/non-logging-event use case
- the specific "allow for indexing an id-field" needs of FLUME-1972
- the (UTC) determination of the index to write to per FLUME-1782

This patch is backwards-compatible and imposes no changes on existing users of the sink.
Unit tests added and run.
Patch applied and run successfully in my own flume test environment.
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 * http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing,

    
   
13
 * software distributed under the License is distributed on an

    
   
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

    
   
15
 * KIND, either express or implied.  See the License for the

    
   
16
 * specific language governing permissions and limitations

    
   
17
 * under the License.

    
   
18
 */

    
   
19
package org.apache.flume.sink.elasticsearch;

    
   
20

   

    
   
21
import java.io.IOException;

    
   
22
import java.util.Map;

    
   
23

   

    
   
24
import org.apache.commons.lang.StringUtils;

    
   
25
import org.apache.commons.lang.time.FastDateFormat;

    
   
26
import org.apache.flume.Context;

    
   
27
import org.apache.flume.Event;

    
   
28
import org.apache.flume.conf.ComponentConfiguration;

    
   
29
import org.apache.flume.conf.Configurable;

    
   
30
import org.apache.flume.conf.ConfigurableComponent;

    
   
31
import org.apache.flume.event.SimpleEvent;

    
   
32
import org.elasticsearch.action.index.IndexRequestBuilder;

    
   
33
import org.elasticsearch.client.Client;

    
   
34
import org.joda.time.DateTimeUtils;

    
   
35

   

    
   
36
import com.google.common.annotations.VisibleForTesting;

    
   
37
import com.google.common.collect.Maps;

    
   
38

   

    
   
39
/**

    
   
40
 * Abstract base class for custom implementations of

    
   
41
 * {@link ElasticSearchIndexRequestBuilderFactory}.

    
   
42
 */

    
   
43
public abstract class AbstractElasticSearchIndexRequestBuilderFactory

    
   
44
    implements ElasticSearchIndexRequestBuilderFactory {

    
   
45

   

    
   
46
  /**

    
   
47
   * {@link FastDateFormat} to use for index names

    
   
48
   *   in {@link #getIndexName(String, long)}

    
   
49
   */

    
   
50
  protected final FastDateFormat fastDateFormat;

    
   
51

   

    
   
52
  /**

    
   
53
   * Constructor for subclasses

    
   
54
   * @param fastDateFormat {@link FastDateFormat} to use for index names

    
   
55
   */

    
   
56
  protected AbstractElasticSearchIndexRequestBuilderFactory(

    
   
57
    FastDateFormat fastDateFormat) {

    
   
58
    this.fastDateFormat = fastDateFormat;

    
   
59
  }

    
   
60

   

    
   
61
  /**

    
   
62
   * @see Configurable

    
   
63
   */

    
   
64
  @Override

    
   
65
  public abstract void configure(Context arg0);

    
   
66

   

    
   
67
  /**

    
   
68
   * @see ConfigurableComponent

    
   
69
   */

    
   
70
  @Override

    
   
71
  public abstract void configure(ComponentConfiguration arg0);

    
   
72

   

    
   
73
  /**

    
   
74
   * Creates and prepares an {@link IndexRequestBuilder} from the supplied

    
   
75
   * {@link Client} via delegation to the subclass-hook template methods

    
   
76
   * {@link #getIndexName(String, long)} and

    
   
77
   * {@link #prepareIndexRequest(IndexRequestBuilder, String, String, Event)}

    
   
78
   */

    
   
79
  @Override

    
   
80
  public IndexRequestBuilder createIndexRequest(Client client,

    
   
81
        String indexPrefix, String indexType, Event event) throws IOException {

    
   
82
    IndexRequestBuilder request = prepareIndex(client);

    
   
83
    TimestampedEvent timestampedEvent = new TimestampedEvent(event);

    
   
84
    long timestamp = timestampedEvent.getTimestamp();

    
   
85
    String indexName = getIndexName(indexPrefix, timestamp);

    
   
86
    prepareIndexRequest(request, indexName, indexType, timestampedEvent);

    
   
87
    return request;

    
   
88
  }

    
   
89

   

    
   
90
  @VisibleForTesting

    
   
91
  IndexRequestBuilder prepareIndex(Client client) {

    
   
92
    return client.prepareIndex();

    
   
93
  }

    
   
94

   

    
   
95
  /**

    
   
96
   * Gets the name of the index to use for an index request

    
   
97
   * @return index name of the form 'indexPrefix-formattedTimestamp'

    
   
98
   * @param indexPrefix

    
   
99
   *          Prefix of index name to use -- as configured on the sink

    
   
100
   * @param timestamp

    
   
101
   *          timestamp (millis) to format / use

    
   
102
   */

    
   
103
  protected String getIndexName(String indexPrefix, long timestamp) {

    
   
104
    return new StringBuilder(indexPrefix).append('-')

    
   
105
      .append(fastDateFormat.format(timestamp)).toString();

    
   
106
  }

    
   
107

   

    
   
108
  /**

    
   
109
   * Prepares an ElasticSearch {@link IndexRequestBuilder} instance

    
   
110
   * @param indexRequest

    
   
111
   *          The (empty) ElasticSearch {@link IndexRequestBuilder} to prepare

    
   
112
   * @param indexName

    
   
113
   *          Index name to use -- as per {@link #getIndexName(String, long)}

    
   
114
   * @param indexType

    
   
115
   *          Index type to use -- as configured on the sink

    
   
116
   * @param event

    
   
117
   *          Flume event to serialize and add to index request

    
   
118
   * @throws IOException

    
   
119
   *           If an error occurs e.g. during serialization

    
   
120
  */

    
   
121
  protected abstract void prepareIndexRequest(

    
   
122
      IndexRequestBuilder indexRequest, String indexName,

    
   
123
      String indexType, Event event) throws IOException;

    
   
124

   

    
   
125
}

    
   
126

   

    
   
127
/**

    
   
128
 * {@link Event} implementation that has a timestamp.

    
   
129
 * The timestamp is taken from (in order of precedence):<ol>

    
   
130
 * <li>The "timestamp" header of the base event, if present</li>

    
   
131
 * <li>The "@timestamp" header of the base event, if present</li>

    
   
132
 * <li>The current time in millis, otherwise</li>

    
   
133
 * </ol>

    
   
134
 */

    
   
135
final class TimestampedEvent extends SimpleEvent {

    
   
136

   

    
   
137
    private final long timestamp;

    
   
138

   

    
   
139
    TimestampedEvent(Event base) {

    
   
140
      setBody(base.getBody());

    
   
141
      Map<String, String> headers = Maps.newHashMap(base.getHeaders());

    
   
142
      String timestampString = headers.get("timestamp");

    
   
143
      if (StringUtils.isBlank(timestampString)) {

    
   
144
        timestampString = headers.get("@timestamp");

    
   
145
      }

    
   
146
      if (StringUtils.isBlank(timestampString)) {

    
   
147
        this.timestamp = DateTimeUtils.currentTimeMillis();

    
   
148
        headers.put("timestamp", String.valueOf(timestamp ));

    
   
149
      } else {

    
   
150
        this.timestamp = Long.valueOf(timestampString);

    
   
151
      }

    
   
152
      setHeaders(headers);

    
   
153
    }

    
   
154

   

    
   
155
    long getTimestamp() {

    
   
156
        return timestamp;

    
   
157
    }

    
   
158
}
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
Revision dc6a093 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
Revision 1b3db14 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
Revision 2edacdc New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
Revision 94b95b1 New Change
 
  1. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java: Loading...
  2. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java: Loading...
  3. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java: Loading...
  4. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java: Loading...
  5. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java: Loading...
  6. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java: Loading...
  7. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java: Loading...
  8. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java: Loading...