Review Board 1.7.22


Enhance NettyAvroRpcClient and the use of NettyServer to optionally use compression

Review Request #9427 - Created Feb. 13, 2013 and updated

Ted Malaska
truck
FLUME-1915
Reviewers
Flume
flume-git
Enhance NettyAvroRpcClient and the use of NettyServer to optionally use compression

Description: This JIRA will update AvroSource and NettyAvroRpcClient to optionally use ZlibEncoder and ZlibDecoder to compress communications between themselves.

Patch Comment: Added compression options for both client and server.
Note compress has be turn on both client and server for it to work.
Client can have compression with out compression if compression level is set to 0
Yes
flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
Revision 52bd49b New Change
[20] 32 lines
[+20]
33
import org.apache.flume.instrumentation.SinkCounter;
33
import org.apache.flume.instrumentation.SinkCounter;
34
import org.slf4j.Logger;
34
import org.slf4j.Logger;
35
import org.slf4j.LoggerFactory;
35
import org.slf4j.LoggerFactory;
36

    
   
36

   
37
import java.util.List;
37
import java.util.List;

    
   
38
import java.util.Map.Entry;
38
import java.util.Properties;
39
import java.util.Properties;
39

    
   
40

   
40
/**
41
/**
41
 * This sink provides the basic RPC functionality for Flume. This sink takes
42
 * This sink provides the basic RPC functionality for Flume. This sink takes
42
 * several arguments which are used in RPC.
43
 * several arguments which are used in RPC.
[+20] [20] 58 lines
[+20]
101
 * <td><tt>request-timeout</tt></td>
102
 * <td><tt>request-timeout</tt></td>
102
 * <td>Maximum time to wait RPC requests after the first</td>
103
 * <td>Maximum time to wait RPC requests after the first</td>
103
 * <td>milliseconds (long)</td>
104
 * <td>milliseconds (long)</td>
104
 * <td>20000</td>
105
 * <td>20000</td>
105
 * </tr>
106
 * </tr>

    
   
107
 * <tr>

    
   
108
 * <td><tt>compression-type</tt></td>

    
   
109
 * <td>Select compression type.  Default is "none" and the only compression type available is "deflate"</td>

    
   
110
 * <td>compression type</td>

    
   
111
 * <td>none</td>

    
   
112
 * </tr>

    
   
113
 * <tr>

    
   
114
 * <td><tt>compression-level</tt></td>

    
   
115
 * <td>In the case compression type is "deflate" this value can be between 0-9.  0 being no compression and

    
   
116
 * 1-9 is compression.  The higher the number the better the compression.  6 is the default.</td>

    
   
117
 * <td>compression level</td>

    
   
118
 * <td>6</td>

    
   
119
 * </tr>
106
 * </table>
120
 * </table>
107
 * <p>
121
 * <p>
108
 * <b>Metrics</b>
122
 * <b>Metrics</b>
109
 * </p>
123
 * </p>
110
 * <p>
124
 * <p>
[+20] [20] 28 lines
[+20] [+] public void configure(Context context) {
139

    
   
153

   
140
    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
154
    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
141
    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +
155
    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +
142
        "h1", hostname + ":" + port);
156
        "h1", hostname + ":" + port);
143

    
   
157

   
144
    Integer batchSize = context.getInteger("batch-size");
158
    for (Entry<String, String> entry: context.getParameters().entrySet()) {
145
    if (batchSize != null) {
159
      clientProps.setProperty(entry.getKey(), entry.getValue());
146
      clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,

   
147
          String.valueOf(batchSize));

   
148
    }

   
149

    
   

   
150
    Long connectTimeout = context.getLong("connect-timeout");

   
151
    if (connectTimeout != null) {

   
152
      clientProps.setProperty(

   
153
          RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,

   
154
          String.valueOf(connectTimeout));

   
155
    }

   
156

    
   

   
157
    Long requestTimeout = context.getLong("request-timeout");

   
158
    if (requestTimeout != null) {

   
159
      clientProps.setProperty(

   
160
          RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,

   
161
          String.valueOf(requestTimeout));

   
162
    }
160
    }
163

    
   
161

   
164
    if (sinkCounter == null) {
162
    if (sinkCounter == null) {
165
      sinkCounter = new SinkCounter(getName());
163
      sinkCounter = new SinkCounter(getName());
166
    }
164
    }
[+20] [20] 177 lines
flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
Revision dc18c5d New Change
 
flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
Revision b9e59ef New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
Revision 4bf36e6 New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 8eb3734 New Change
 
flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
Revision cf9724c New Change
 
flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
Revision 1e642d8 New Change
 
flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
Revision 5042d11 New Change
 
flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
Revision 99ae010 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java: Loading...
  3. flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java: Loading...
  4. flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java: Loading...
  5. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...
  6. flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java: Loading...
  7. flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java: Loading...
  8. flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java: Loading...
  9. flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java: Loading...