Review Board 1.7.22


FLUME-1782 Elastic Search sink does not use UTC to determine the correct index to write to

Review Request #10379 - Created April 9, 2013 and discarded

Edward Sargisson
flume-1.4
FLUME-1782
Reviewers
Flume
hshreedharan
flume-git
This change gets the timestamp from the event and uses it (in UTC) to determine the name of the index to write to. This is required to match the behaviour of Logstash so that Kibana can find the log events.
The previous code would use the current time and would do it in the timezone of the Flume agent's host.
All unit tests and integration tests pass. A snapshot using commit 5b9d31f1ad228 and the patch for flume-1972 has passed our internal integration tests using customisations.

Diff revision 5 (Latest)

1 2 3 4 5
1 2 3 4 5

  1. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...
  2. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java: Loading...
  3. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java: Loading...
  4. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer2.java: Loading...
  5. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchExtractIdFromHeaderIdProvider.java: Loading...
  6. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIdProvider.java: Loading...
  7. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java: Loading...
  8. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchNullIdProvider.java: Loading...
  9. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java: Loading...
  10. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java: Loading...
  11. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java: Loading...
  12. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java: Loading...
  13. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchExtractIdFromHeaderIdProvider.java: Loading...
  14. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java: Loading...
  15. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java: Loading...
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 38f2205 New Change
[20] 1745 lines
[+20]
1746
  a1.sinks.k1.columnFamily = bar_cf
1746
  a1.sinks.k1.columnFamily = bar_cf
1747
  a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
1747
  a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
1748
  a1.sinks.k1.channel = c1
1748
  a1.sinks.k1.channel = c1
1749

    
   
1749

   
1750
ElasticSearchSink
1750
ElasticSearchSink
1751
'''''''''''''''''
1751
~~~~~~~~~~~~~~~~~
1752

    
   
1752

   
1753
This sink writes data to ElasticSearch. A class implementing
1753
This sink writes data to elasticsearch. A class implementing
1754
ElasticSearchEventSerializer which is specified by the configuration is used to convert the events into
1754
ElasticSearchEventSerializer2 which is specified by the configuration is used to convert the events into
1755
XContentBuilder which detail the fields and mappings which will be indexed. These are then then written
1755
XContentBuilder which detail the fields and mappings which will be indexed. These are then then written
1756
to ElasticSearch. The sink will generate an index per day allowing easier management instead of dealing with
1756
to ElasticSearch. The sink will generate an index per day allowing easier management instead of dealing with
1757
a single large index
1757
a single large index. Note that the day is the UTC day. There is no provision for rolling the index in different timezones.
1758
The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
1758
The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
1759
Required properties are in **bold**.
1759
Required properties are in **bold**.
1760

    
   
1760

   
1761
================  ==================================================================  =======================================================================================================
1761
================  ======================================================================== =======================================================================================================
1762
Property Name     Default                                                             Description
1762
Property Name     Default                                                                  Description
1763
================  ==================================================================  =======================================================================================================
1763
================  ======================================================================== =======================================================================================================
1764
**channel**       --
1764
**channel**       --
1765
**type**          --                                                                  The component type name, needs to be ``org.apache.flume.sink.elasticsearch.ElasticSearchSink``
1765
**type**          --                                                                       The component type name, needs to be ``org.apache.flume.sink.elasticsearch.ElasticSearchSink``
1766
**hostNames**     --                                                                  Comma separated list of hostname:port, if the port is not present the default port '9300' will be used
1766
**hostNames**     --                                                                       Comma separated list of hostname:port, if the port is not present the default port '9300' will be used
1767
indexName         flume                                                               The name of the index which the date will be appended to. Example 'flume' -> 'flume-yyyy-MM-dd'
1767
indexName         flume                                                                    The name of the index which the date will be appended to. Example 'flume' -> 'flume-yyyy-MM-dd'
1768
indexType         logs                                                                The type to index the document to, defaults to 'log'
1768
indexType         logs                                                                     The type to index the document to, defaults to 'log'
1769
clusterName       elasticsearch                                                       Name of the ElasticSearch cluster to connect to
1769
clusterName       elasticsearch                                                            Name of the ElasticSearch cluster to connect to
1770
batchSize         100                                                                 Number of events to be written per txn.
1770
batchSize         100                                                                      Number of events to be written per txn.
1771
ttl               --                                                                  TTL in days, when set will cause the expired documents to be deleted automatically,
1771
ttl               --                                                                       TTL in days, when set will cause the expired documents to be deleted automatically,
1772
                                                                                      if not set documents will never be automatically deleted
1772
                                                                                           if not set documents will never be automatically deleted
1773
serializer        org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
1773
serializer        org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer The class to use to serialize the Flume event to be stored in elasticsearch. This class must implement 
1774
serializer.*      --                                                                  Properties to be passed to the serializer.
1774
                                                                                           either org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer2. Implementing 
1775
================  ==================================================================  =======================================================================================================
1775
                                                                                           org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer is supported but deprecated as that 

    
   
1776
                                                                                           interface is not passed the timestamp this sink will provide if there is no timestamp header.

    
   
1777
serializer.*      --                                                                       Properties to be passed to the serializer.

    
   
1778
idProvider        org.apache.flume.sink.elasticsearch.ElasticSearchNullIdProvider          The class to use to calculate the id to give the document.

    
   
1779
idProvider.*      --                                                                       Properties to be passed to the idProvider.
1776

    
   
1780

   
1777
Example for agent named a1:
1781
Example for agent named a1:
1778

    
   
1782

   
1779
.. code-block:: properties
1783
.. code-block:: properties
1780

    
   
1784

   
[+20] [20] 6 lines
[+20]
1787
  a1.sinks.k1.clusterName = foobar_cluster
1791
  a1.sinks.k1.clusterName = foobar_cluster
1788
  a1.sinks.k1.batchSize = 500
1792
  a1.sinks.k1.batchSize = 500
1789
  a1.sinks.k1.ttl = 5
1793
  a1.sinks.k1.ttl = 5
1790
  a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
1794
  a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
1791
  a1.sinks.k1.channel = c1
1795
  a1.sinks.k1.channel = c1

    
   
1796
  

    
   
1797
Example for agent named a2 - which uses the provided ElasticSearchExtractIdFromHeaderIdProvider class to read a header called id and use that as the document id:

    
   
1798

   

    
   
1799
.. code-block:: properties

    
   
1800

   

    
   
1801
  a2.channels = c1

    
   
1802
  a2.sinks = k1

    
   
1803
  a2.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink

    
   
1804
  a2.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300

    
   
1805
  a2.sinks.k1.indexName = foo_index

    
   
1806
  a2.sinks.k1.indexType = bar_type

    
   
1807
  a2.sinks.k1.clusterName = foobar_cluster

    
   
1808
  a2.sinks.k1.batchSize = 500

    
   
1809
  a2.sinks.k1.ttl = 5

    
   
1810
  a2.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

    
   
1811
  a2.sinks.k1.idProvider = com.globalrelay.castellan.flume.sink.ElasticSearchExtractIdFromHeaderIdProvider

    
   
1812
  a2.sinks.k1.channel = c1
1792

    
   
1813

   
1793
Custom Sink
1814
Custom Sink
1794
~~~~~~~~~~~
1815
~~~~~~~~~~~
1795

    
   
1816

   
1796
A custom sink is your own implementation of the Sink interface. A custom
1817
A custom sink is your own implementation of the Sink interface. A custom
[+20] [20] 1229 lines
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java
Revision aa7ad39 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
Revision dc6a093 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer2.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchExtractIdFromHeaderIdProvider.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIdProvider.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java
Revision 3638368 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchNullIdProvider.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
Revision 1b3db14 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
Revision 7f75e22 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
Revision 2edacdc New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
Revision 43a4b12 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchExtractIdFromHeaderIdProvider.java
New File
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
Revision 9dff4b0 New Change
 
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
Revision 94b95b1 New Change
 
  1. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...
  2. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java: Loading...
  3. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java: Loading...
  4. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer2.java: Loading...
  5. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchExtractIdFromHeaderIdProvider.java: Loading...
  6. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIdProvider.java: Loading...
  7. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java: Loading...
  8. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchNullIdProvider.java: Loading...
  9. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java: Loading...
  10. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java: Loading...
  11. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java: Loading...
  12. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java: Loading...
  13. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchExtractIdFromHeaderIdProvider.java: Loading...
  14. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java: Loading...
  15. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java: Loading...