Review Board 1.7.22


Coprocessors: Support aggregate functions

Review Request #585 - Created April 12, 2011 and updated

Ted Yu
trunk
HBASE-1512
Reviewers
hbase
ghelmling
hbase
This patch provides reference implementation for aggregate function support through Coprocessor framework.
ColumnInterpreter interface allows client to specify how the value's byte array is interpreted.
Some of the thoughts are summarized at http://zhihongyu.blogspot.com/2011/03/genericizing-endpointcoprocessor.html

Himanshu Vashishtha started the work. I provided some review comments and some of the code.
TestAggFunctions passes.
/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
New File

    
   
1
/*

    
   
2
 * Copyright 2011 The Apache Software Foundation

    
   
3
 *

    
   
4
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
5
 * or more contributor license agreements.  See the NOTICE file

    
   
6
 * distributed with this work for additional information

    
   
7
 * regarding copyright ownership.  The ASF licenses this file

    
   
8
 * to you under the Apache License, Version 2.0 (the

    
   
9
 * "License"); you may not use this file except in compliance

    
   
10
 * with the License.  You may obtain a copy of the License at

    
   
11
 *

    
   
12
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
13
 *

    
   
14
 * Unless required by applicable law or agreed to in writing, software

    
   
15
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
17
 * See the License for the specific language governing permissions and

    
   
18
 * limitations under the License.

    
   
19
 */

    
   
20

   

    
   
21
package org.apache.hadoop.hbase.client.coprocessor;

    
   
22

   

    
   
23
import java.io.IOException;

    
   
24
import java.util.ArrayList;

    
   
25
import java.util.List;

    
   
26

   

    
   
27
import org.apache.commons.logging.Log;

    
   
28
import org.apache.commons.logging.LogFactory;

    
   
29
import org.apache.hadoop.conf.Configuration;

    
   
30
import org.apache.hadoop.hbase.HConstants;

    
   
31
import org.apache.hadoop.hbase.client.HTable;

    
   
32
import org.apache.hadoop.hbase.coprocessor.AggregateCpProtocol;

    
   
33
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;

    
   
34
import org.apache.hadoop.hbase.filter.Filter;

    
   
35
import org.apache.hadoop.hbase.util.Bytes;

    
   
36

   

    
   
37
/**

    
   
38
 * This client class is for invoking the agg functions deployed on the RS side

    
   
39
 * via the cp impls. This class will implement the supporting functionality for

    
   
40
 * summing/processing the individual results obtained from the cp impl for each

    
   
41
 * region server.

    
   
42
 * <p>

    
   
43
 * This will serve as the client side handler for invoking the agg functions.

    
   
44
 * <ul>

    
   
45
 * For all aggregate functions,

    
   
46
 * <li>start row < end row is an essential condition (if they are not

    
   
47
 * {@link HConstants#EMPTY_BYTE_ARRAY})

    
   
48
 * <li>Column family can't be null.

    
   
49
 * <li>For methods to find maximum, minimum, sum, rowcount, it returns the

    
   
50
 * parameter type. For average and std, it returns a double value. To achieve

    
   
51
 * this, public methods getAvg and getStd call corresponding private methods

    
   
52
 * getAvgArgs and getStdArgs and compute the result.

    
   
53
 */

    
   
54
public class AggregationClient {

    
   
55

   

    
   
56
  private static final Log log = LogFactory.getLog(AggregationClient.class);

    
   
57
  Configuration conf;

    
   
58

   

    
   
59
  /**

    
   
60
   * Constructor with Conf object

    
   
61
   * 

    
   
62
   * @param cfg

    
   
63
   */

    
   
64
  public AggregationClient(Configuration cfg) {

    
   
65
    this.conf = cfg;

    
   
66
  }

    
   
67

   

    
   
68
  /**

    
   
69
   * It gives the maximum value of a column for a given column family for the

    
   
70
   * given range. In case qualifier is null, a max of all values for the given

    
   
71
   * family is returned.

    
   
72
   * @param tableName

    
   
73
   * @param startKey

    
   
74
   * @param endKey

    
   
75
   * @param colFamily

    
   
76
   * @param colQualifier

    
   
77
   * @param ci

    
   
78
   * @param f

    
   
79
   * @return max val <R>

    
   
80
   * @throws Throwable

    
   
81
   *           The caller is supposed to handle the exception as they are thrown

    
   
82
   *           & propagated to it.

    
   
83
   */

    
   
84
  public <R> R getMaximum(final byte[] tableName, final byte[] startKey,

    
   
85
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
86
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
87
    validateStartEndRows(startKey, endKey);

    
   
88
    HTable table = new HTable(conf, tableName);

    
   
89

   

    
   
90
    class MaxCallBack implements Batch.Callback<R> {

    
   
91
      R max = ci.getMinValue();

    
   
92

   

    
   
93
      R getMax() {

    
   
94
        return max;

    
   
95
      }

    
   
96

   

    
   
97
      @Override

    
   
98
      public void update(byte[] region, byte[] row, R result) {

    
   
99
        if (ci.compare(max, result) < 0) {

    
   
100
          max = result;

    
   
101
        }

    
   
102
      }

    
   
103
    }

    
   
104
    MaxCallBack aMaxCallBack = new MaxCallBack();

    
   
105
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
106
        new Batch.Call<AggregateCpProtocol, R>() {

    
   
107
          @Override

    
   
108
          public R call(AggregateCpProtocol instance) throws IOException {

    
   
109
            return instance.getMax(colFamily, colQualifier, startKey, endKey,

    
   
110
                ci, f);

    
   
111
          }

    
   
112
        }, aMaxCallBack);

    
   
113
    return aMaxCallBack.getMax();

    
   
114
  }

    
   
115

   

    
   
116
  private void validateStartEndRows(final byte[] startKey, final byte[] endKey)

    
   
117
      throws IOException {

    
   
118
    if ((Bytes.equals(startKey, endKey) && (!Bytes.equals(startKey,

    
   
119
        HConstants.EMPTY_START_ROW)))

    
   
120
        || Bytes.compareTo(startKey, endKey) > 0) {

    
   
121
      throw new IOException(

    
   
122
          "Agg client Exception: Startrow can't be equal or greater than Stoprow");

    
   
123
    }

    
   
124
  }

    
   
125

   

    
   
126
  /**

    
   
127
   * It gives the minimum value of a column for a given column family for the

    
   
128
   * given range. In case qualifier is null, a min of all values for the given

    
   
129
   * family is returned.

    
   
130
   * @param tableName

    
   
131
   * @param startKey

    
   
132
   * @param endKey

    
   
133
   * @param colFamily

    
   
134
   * @param colQualifier

    
   
135
   * @param ci

    
   
136
   * @param f

    
   
137
   * @return

    
   
138
   * @throws Throwable

    
   
139
   */

    
   
140
  public <R> R getMinimum(final byte[] tableName, final byte[] startKey,

    
   
141
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
142
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
143
    validateStartEndRows(startKey, endKey);

    
   
144
    class MinCallBack implements Batch.Callback<R> {

    
   
145

   

    
   
146
      private R min = ci.getMaxValue();

    
   
147

   

    
   
148
      public R getMinimum() {

    
   
149
        return min;

    
   
150
      }

    
   
151

   

    
   
152
      @Override

    
   
153
      public void update(byte[] region, byte[] row, R result) {

    
   
154
        if (ci.compare(min, result) > 0) {

    
   
155
          min = result;

    
   
156
        }

    
   
157
      }

    
   
158
    }

    
   
159
    HTable table = new HTable(conf, tableName);

    
   
160
    MinCallBack minCallBack = new MinCallBack();

    
   
161
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
162
        new Batch.Call<AggregateCpProtocol, R>() {

    
   
163

   

    
   
164
          @Override

    
   
165
          public R call(AggregateCpProtocol instance) throws IOException {

    
   
166
            return instance.getMin(colFamily, colQualifier, startKey, endKey,

    
   
167
                ci, f);

    
   
168
          }

    
   
169
        }, minCallBack);

    
   
170
    log.debug("Min fom all regions is: " + minCallBack.getMinimum());

    
   
171
    return minCallBack.getMinimum();

    
   
172
  }

    
   
173

   

    
   
174
  /**

    
   
175
   * It gives the row num, by summing up the individual results obtained from

    
   
176
   * regions. In case the qualifier is null, FirstKEyValueFilter is used to

    
   
177
   * optimised the operation. In case qualifier is provided, I can't use the

    
   
178
   * filter as it may set the flag to skip to next row, but the value read is

    
   
179
   * not of the given filter: in this case, this particular row will not be

    
   
180
   * counted ==> an error.

    
   
181
   * @param tableName

    
   
182
   * @param startKey

    
   
183
   * @param endKey

    
   
184
   * @param colFamily

    
   
185
   * @param colQualifier

    
   
186
   * @param ci

    
   
187
   * @param f

    
   
188
   * @return

    
   
189
   * @throws Throwable

    
   
190
   */

    
   
191
  public <R> R getRowNum(final byte[] tableName, final byte[] startKey,

    
   
192
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
193
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
194
    validateStartEndRows(startKey, endKey);

    
   
195
    class RowNumCallback implements Batch.Callback<R> {

    
   
196
      private R rowCount = ci.getInitialValue();

    
   
197

   

    
   
198
      public R getRowNumCount() {

    
   
199
        return rowCount;

    
   
200
      }

    
   
201

   

    
   
202
      @Override

    
   
203
      public void update(byte[] region, byte[] row, R result) {

    
   
204
        rowCount = ci.add(rowCount, result);

    
   
205
      }

    
   
206
    }

    
   
207
    RowNumCallback rowNum = new RowNumCallback();

    
   
208
    HTable table = new HTable(conf, tableName);

    
   
209
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
210
        new Batch.Call<AggregateCpProtocol, R>() {

    
   
211
          @Override

    
   
212
          public R call(AggregateCpProtocol instance) throws IOException {

    
   
213
            return instance.getRowNum(colFamily, colQualifier, startKey,

    
   
214
                endKey, ci, f);

    
   
215
          }

    
   
216
        }, rowNum);

    
   
217

   

    
   
218
    return rowNum.getRowNumCount();

    
   
219
  }

    
   
220

   

    
   
221
  /**

    
   
222
   * It sums up the value returned from various regions. In case qualifier is

    
   
223
   * null, summation of all the column qualifiers in the given family is done.

    
   
224
   * @param tableName

    
   
225
   * @param startKey

    
   
226
   * @param endKey

    
   
227
   * @param colFamily

    
   
228
   * @param colQualifier

    
   
229
   * @param ci

    
   
230
   * @param f

    
   
231
   * @return

    
   
232
   * @throws Throwable

    
   
233
   */

    
   
234
  public <R> R getSum(final byte[] tableName, final byte[] startKey,

    
   
235
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
236
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
237
    validateStartEndRows(startKey, endKey);

    
   
238
    class SumCallBack implements Batch.Callback<R> {

    
   
239
      R sumVal = ci.getInitialValue();

    
   
240

   

    
   
241
      public R getSumResult() {

    
   
242
        return sumVal;

    
   
243
      }

    
   
244

   

    
   
245
      @Override

    
   
246
      public void update(byte[] region, byte[] row, R result) {

    
   
247
        sumVal = ci.add(sumVal, result);

    
   
248
      }

    
   
249
    }

    
   
250
    SumCallBack sumCallBack = new SumCallBack();

    
   
251
    HTable table = new HTable(conf, tableName);

    
   
252
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
253
        new Batch.Call<AggregateCpProtocol, R>() {

    
   
254
          @Override

    
   
255
          public R call(AggregateCpProtocol instance) throws IOException {

    
   
256
            return instance.getSum(colFamily, colQualifier, startKey, endKey,

    
   
257
                ci, f);

    
   
258
          }

    
   
259
        }, sumCallBack);

    
   
260
    return sumCallBack.getSumResult();

    
   
261
  }

    
   
262

   

    
   
263
  /**

    
   
264
   * It computes average while fetching sum and row count from all the

    
   
265
   * corresponding regions. Approach is to compute a global sum of region level

    
   
266
   * sum and rowcount and then compute the average.

    
   
267
   * @param tableName

    
   
268
   * @param startKey

    
   
269
   * @param endKey

    
   
270
   * @param colFamily

    
   
271
   * @param colQualifier

    
   
272
   * @throws Throwable

    
   
273
   */

    
   
274
  private <R> List<R> getAvgArgs(final byte[] tableName, final byte[] startKey,

    
   
275
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
276
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
277
    validateStartEndRows(startKey, endKey);

    
   
278
    class AvgCallBack implements Batch.Callback<List<R>> {

    
   
279
      R sumVal = ci.getInitialValue();

    
   
280
      R rowVal = ci.getInitialValue();

    
   
281

   

    
   
282
      public List<R> getAvgArgs() {

    
   
283
        List<R> l = new ArrayList<R>();

    
   
284
        l.add(sumVal);

    
   
285
        l.add(rowVal);

    
   
286
        return l;

    
   
287
      }

    
   
288

   

    
   
289
      @Override

    
   
290
      public void update(byte[] region, byte[] row, List<R> result) {

    
   
291
        sumVal = ci.add(sumVal, result.get(0));

    
   
292
        rowVal = ci.add(rowVal, result.get(1));

    
   
293
      }

    
   
294
    }

    
   
295
    AvgCallBack avgCallBack = new AvgCallBack();

    
   
296
    HTable table = new HTable(conf, tableName);

    
   
297
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
298
        new Batch.Call<AggregateCpProtocol, List<R>>() {

    
   
299
          @Override

    
   
300
          public List<R> call(AggregateCpProtocol instance) throws IOException {

    
   
301
            return instance.getAvg(colFamily, colQualifier, startKey, endKey,

    
   
302
                ci, f);

    
   
303
          }

    
   
304
        }, avgCallBack);

    
   
305
    return avgCallBack.getAvgArgs();

    
   
306
  }

    
   
307

   

    
   
308
  /**

    
   
309
   * This is the client side interface/handle for calling the average method for

    
   
310
   * a given cf-cq combination. It was necessary to add one more call stack as

    
   
311
   * its return type should be a decimal value, irrespective of what

    
   
312
   * columninterpreter says. So, this methods collects the necessary parameters

    
   
313
   * to compute the average and returs the double value.

    
   
314
   * @param tableName

    
   
315
   * @param startKey

    
   
316
   * @param endKey

    
   
317
   * @param colFamily

    
   
318
   * @param colQualifier

    
   
319
   * @param ci

    
   
320
   * @param <R>

    
   
321
   * @param f

    
   
322
   * @return

    
   
323
   * @throws Throwable

    
   
324
   */

    
   
325
  public <R> double getAvg(final byte[] tableName, final byte[] startKey,

    
   
326
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
327
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
328
    List<R> l = getAvgArgs(tableName, startKey, endKey, colFamily,

    
   
329
        colQualifier, ci, f);

    
   
330
    R q = l.get(0), d = l.get(1);

    
   
331
    return ci.divide(q, d);

    
   
332
  }

    
   
333

   

    
   
334
  /**

    
   
335
   * It computes a global standard deviation for a given column and its value.

    
   
336
   * Standard deviation is square root of (average of squares -

    
   
337
   * average*average). From individual regions, it obtains sum, square sum and

    
   
338
   * number of rows. With these, the above values are computed to get the global

    
   
339
   * std.

    
   
340
   * @param tableName

    
   
341
   * @param startKey

    
   
342
   * @param endKey

    
   
343
   * @param colFamily

    
   
344
   * @param colQualifier

    
   
345
   * @return 

    
   
346
   * @throws Throwable

    
   
347
   */

    
   
348
  private <R> List<R> getStdArgs(final byte[] tableName, final byte[] startKey,

    
   
349
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
350
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
351
    validateStartEndRows(startKey, endKey);

    
   
352
    class StdCallback implements Batch.Callback<List<R>> {

    
   
353
      R rowCountVal = ci.getInitialValue();

    
   
354
      R sumVal = ci.getInitialValue(), sumSqVal = ci.getInitialValue();

    
   
355

   

    
   
356
      public List<R> getStdParams() {

    
   
357
        List<R> l = new ArrayList<R>();

    
   
358
        l.add(sumVal);

    
   
359
        l.add(sumSqVal);

    
   
360
        l.add(rowCountVal);

    
   
361
        return l;

    
   
362
      }

    
   
363

   

    
   
364
      @Override

    
   
365
      public void update(byte[] region, byte[] row, List<R> result) {

    
   
366
        sumVal = ci.add(sumVal, result.get(0));

    
   
367
        sumSqVal = ci.add(sumSqVal, result.get(1));

    
   
368
        rowCountVal = ci.add(rowCountVal, result.get(2));

    
   
369
      }

    
   
370
    }

    
   
371
    StdCallback stdCallback = new StdCallback();

    
   
372
    HTable table = new HTable(conf, tableName);

    
   
373
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
374
        new Batch.Call<AggregateCpProtocol, List<R>>() {

    
   
375
          @Override

    
   
376
          public List<R> call(AggregateCpProtocol instance) throws IOException {

    
   
377
            return instance.getStd(colFamily, colQualifier, startKey, endKey,

    
   
378
                ci, f);

    
   
379
          }

    
   
380

   

    
   
381
        }, stdCallback);

    
   
382
    return stdCallback.getStdParams();

    
   
383
  }

    
   
384

   

    
   
385
  /**

    
   
386
   * This is the client side interface/handle for calling the std method for a

    
   
387
   * given cf-cq combination. It was necessary to add one more call stack as its

    
   
388
   * return type should be a decimal value, irrespective of what

    
   
389
   * columninterpreter says. So, this methods collects the necessary parameters

    
   
390
   * to compute the std and returns the double value.

    
   
391
   * @param tableName

    
   
392
   * @param startKey

    
   
393
   * @param endKey

    
   
394
   * @param colFamily

    
   
395
   * @param colQualifier

    
   
396
   * @param ci

    
   
397
   * @param f

    
   
398
   * @return

    
   
399
   * @throws Throwable

    
   
400
   */

    
   
401
  public <R> double getStd(final byte[] tableName, final byte[] startKey,

    
   
402
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
403
      ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
404
    List<R> l = getStdArgs(tableName, startKey, endKey, colFamily,

    
   
405
        colQualifier, ci, f);

    
   
406
    double res = 0d;

    
   
407
    double avg = ci.divide(l.get(0), l.get(2));

    
   
408
    double avgOfSumSq = ci.divide(l.get(1), l.get(2));

    
   
409
    res = avgOfSumSq - (avg) * (avg); // variance

    
   
410
    res = Math.pow(res, 0.5);

    
   
411
    return res;

    
   
412
  }

    
   
413

   

    
   
414
}
/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
New File
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java
New File
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocolImpl.java
New File
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
New File
 
/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java
New File
 
  1. /src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java: Loading...
  2. /src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java: Loading...
  3. /src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java: Loading...
  4. /src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocolImpl.java: Loading...
  5. /src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java: Loading...
  6. /src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java: Loading...