Review Board 1.7.22


Coprocessors: Support aggregate functions

Review Request #585 - Created April 12, 2011 and updated

Ted Yu
trunk
HBASE-1512
Reviewers
hbase
ghelmling
hbase
This patch provides reference implementation for aggregate function support through Coprocessor framework.
ColumnInterpreter interface allows client to specify how the value's byte array is interpreted.
Some of the thoughts are summarized at http://zhihongyu.blogspot.com/2011/03/genericizing-endpointcoprocessor.html

Himanshu Vashishtha started the work. I provided some review comments and some of the code.
TestAggFunctions passes.
/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
New File

    
   
1
/*

    
   
2
 * Copyright 2010 The Apache Software Foundation

    
   
3
 *

    
   
4
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
5
 * or more contributor license agreements.  See the NOTICE file

    
   
6
 * distributed with this work for additional information

    
   
7
 * regarding copyright ownership.  The ASF licenses this file

    
   
8
 * to you under the Apache License, Version 2.0 (the

    
   
9
 * "License"); you may not use this file except in compliance

    
   
10
 * with the License.  You may obtain a copy of the License at

    
   
11
 *

    
   
12
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
13
 *

    
   
14
 * Unless required by applicable law or agreed to in writing, software

    
   
15
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
17
 * See the License for the specific language governing permissions and

    
   
18
 * limitations under the License.

    
   
19
 */

    
   
20

   

    
   
21
package org.apache.hadoop.hbase.client.coprocessor;

    
   
22

   

    
   
23
import java.io.IOException;

    
   
24
import java.util.ArrayList;

    
   
25
import java.util.List;

    
   
26

   

    
   
27
import org.apache.commons.logging.Log;

    
   
28
import org.apache.commons.logging.LogFactory;

    
   
29
import org.apache.hadoop.conf.Configuration;

    
   
30
import org.apache.hadoop.hbase.HConstants;

    
   
31
import org.apache.hadoop.hbase.client.HTable;

    
   
32
import org.apache.hadoop.hbase.coprocessor.AggregateCpProtocol;

    
   
33
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;

    
   
34
import org.apache.hadoop.hbase.filter.Filter;

    
   
35
import org.apache.hadoop.hbase.util.Bytes;

    
   
36

   

    
   
37
/**

    
   
38
 * 

    
   
39
 * This client class is for invoking the agg functions deployed on the RS side

    
   
40
 * via the cp impls. This class will implement the supporting functionality for

    
   
41
 * summing/processing the individual results obtained from the cp impl for each

    
   
42
 * region server.

    
   
43
 * <p>

    
   
44
 * This will serve as the client side handle for invoking the agg funcitons,

    
   
45
 * like AggrationClient#sum(table, start, end, colFamily, colQualifier);

    
   
46
 * 

    
   
47
 * <ul>

    
   
48
 * For all aggregate functions,

    
   
49
 * <li>start row < end row is an essential condition (if they are not

    
   
50
 * {@link HConstants#EMPTY_BYTE_ARRAY})

    
   
51
 * <li>Column family can't be null.

    
   
52
 * <li>For all mathematical computations like avg, std etc, a long cell value (8

    
   
53
 * bytes) is assumed; otherwise that value is skipped.

    
   
54
 * 

    
   
55
 */

    
   
56
public class AggregationClient {

    
   
57

   

    
   
58
  private static final Log log = LogFactory.getLog(AggregationClient.class);

    
   
59
  Configuration conf;

    
   
60

   

    
   
61
  /**

    
   
62
   * no arg constructor

    
   
63
   */

    
   
64
  public AggregationClient() {

    
   
65

   

    
   
66
  }

    
   
67

   

    
   
68
  /**

    
   
69
   * Constructor with Conf object

    
   
70
   * 

    
   
71
   * @param cfg

    
   
72
   */

    
   
73
  public AggregationClient(Configuration cfg) {

    
   
74
    this.conf = cfg;

    
   
75
  }

    
   
76

   

    
   
77
  /**

    
   
78
   * It gives the maximum value of a column for a given column family for the

    
   
79
   * given range. In case qualifier is null, a max of all values for the given

    
   
80
   * family is returned.

    
   
81
   * 

    
   
82
   * @param <R>

    
   
83
   * 

    
   
84
   * @param tableName

    
   
85
   * @param startKey

    
   
86
   * @param endKey

    
   
87
   * @param colFamily

    
   
88
   * @param colQualifier

    
   
89
   * @param ci

    
   
90
   * @param f

    
   
91
   * @return the maximum value as a long.

    
   
92
   * @throws Throwable

    
   
93
   *           The caller is supposed to handle the exception as they are thrown

    
   
94
   *           & propagated to it.

    
   
95
   */

    
   
96
  public <R> R getMaximum(final byte[] tableName, final byte[] startKey,

    
   
97
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
98
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
99
    validateStartEndRows(startKey, endKey);

    
   
100
    HTable table = new HTable(tableName);

    
   
101

   

    
   
102
    class MaxCallBack implements Batch.Callback<R> {

    
   
103
      R max = ci.getMinValue();

    
   
104

   

    
   
105
      R getMax() {

    
   
106
        return max;

    
   
107
      }

    
   
108

   

    
   
109
      @Override

    
   
110
      public void update(byte[] region, byte[] row, R result) {

    
   
111
        if (ci.compare(max, result) < 0) {

    
   
112
          max = result;

    
   
113
        }

    
   
114
      }

    
   
115
    }

    
   
116
    MaxCallBack aMaxCallBack = new MaxCallBack();

    
   
117
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
118
        new Batch.Call<AggregateCpProtocol, R>() {

    
   
119
          @Override

    
   
120
          public R call(AggregateCpProtocol instance) throws IOException {

    
   
121
            return instance.getMax(colFamily, colQualifier, startKey, endKey,

    
   
122
                ci, f);

    
   
123
          }

    
   
124
        }, aMaxCallBack);

    
   
125
    return aMaxCallBack.getMax();

    
   
126
  }

    
   
127

   

    
   
128
  private void validateStartEndRows(final byte[] startKey, final byte[] endKey)

    
   
129
      throws IOException {

    
   
130
    if ((Bytes.equals(startKey, endKey) && (!Bytes.equals(startKey,

    
   
131
        HConstants.EMPTY_START_ROW)))

    
   
132
        || Bytes.compareTo(startKey, endKey) > 0) {

    
   
133
      throw new IOException(

    
   
134
          "Agg client Exception: Startrow can't be equal or greater than Stoprow");

    
   
135
    }

    
   
136
  }

    
   
137

   

    
   
138
  /**

    
   
139
   * It gives the minimum value of a column for a given column family for the

    
   
140
   * given range. In case qualifier is null, a min of all values for the given

    
   
141
   * family is returned.

    
   
142
   * 

    
   
143
   * @param <R>

    
   
144
   * 

    
   
145
   * @param tableName

    
   
146
   * @param startKey

    
   
147
   * @param endKey

    
   
148
   * @param colFamily

    
   
149
   * @param colQualifier

    
   
150
   * @param ci

    
   
151
   * @param f

    
   
152
   * @return

    
   
153
   * @throws Throwable

    
   
154
   */

    
   
155
  public <R> R getMinimum(final byte[] tableName, final byte[] startKey,

    
   
156
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
157
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
158
    validateStartEndRows(startKey, endKey);

    
   
159
    class MinCallBack implements Batch.Callback<R> {

    
   
160

   

    
   
161
      private R min = ci.getMaxValue();

    
   
162

   

    
   
163
      public R getMinimum() {

    
   
164
        return min;

    
   
165
      }

    
   
166

   

    
   
167
      @Override

    
   
168
      public void update(byte[] region, byte[] row, R result) {

    
   
169
        if (ci.compare(min, result) > 0) {

    
   
170
          min = result;

    
   
171
        }

    
   
172
      }

    
   
173
    }

    
   
174
    HTable table = new HTable(tableName);

    
   
175
    MinCallBack minCallBack = new MinCallBack();

    
   
176
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
177
        new Batch.Call<AggregateCpProtocol, R>() {

    
   
178

   

    
   
179
          @Override

    
   
180
          public R call(AggregateCpProtocol instance) throws IOException {

    
   
181
            return instance.getMin(colFamily, colQualifier, startKey, endKey,

    
   
182
                ci, f);

    
   
183
          }

    
   
184
        }, minCallBack);

    
   
185
    log.debug("Min fom all regions is: " + minCallBack.getMinimum());

    
   
186
    return minCallBack.getMinimum();

    
   
187
  }

    
   
188

   

    
   
189
  /**

    
   
190
   * It gives the row num, by summing up the individual results obtained from

    
   
191
   * regions. In case the qualifier is null, FirstKEyValueFilter is used to

    
   
192
   * optimised the operation. In case qualifier is provided, I can't use the

    
   
193
   * filter as it may set the flag to skip to next row, but the value read is

    
   
194
   * not of the given filter: in this case, this particular row will not be

    
   
195
   * counted ==> an error.

    
   
196
   * 

    
   
197
   * @param <R>

    
   
198
   * 

    
   
199
   * @param tableName

    
   
200
   * @param startKey

    
   
201
   * @param endKey

    
   
202
   * @param colFamily

    
   
203
   * @param colQualifier

    
   
204
   * @param ci

    
   
205
   * @param f

    
   
206
   * @return

    
   
207
   * @throws Throwable

    
   
208
   */

    
   
209
  public <R> R getRowNum(final byte[] tableName, final byte[] startKey,

    
   
210
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
211
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
212
    validateStartEndRows(startKey, endKey);

    
   
213
    class RowNumCallback implements Batch.Callback<R> {

    
   
214
      private R rowCount = ci.getInitialValue();

    
   
215

   

    
   
216
      public R getRowNumCount() {

    
   
217
        return rowCount;

    
   
218
      }

    
   
219

   

    
   
220
      @Override

    
   
221
      public void update(byte[] region, byte[] row, R result) {

    
   
222
        rowCount = ci.add(rowCount, result);

    
   
223
      }

    
   
224
    }

    
   
225
    RowNumCallback rowNum = new RowNumCallback();

    
   
226
    HTable table = new HTable(tableName);

    
   
227
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
228
        new Batch.Call<AggregateCpProtocol, R>() {

    
   
229
          @Override

    
   
230
          public R call(AggregateCpProtocol instance) throws IOException {

    
   
231
            return instance.getRowNum(colFamily, colQualifier, startKey,

    
   
232
                endKey, ci, f);

    
   
233
          }

    
   
234
        }, rowNum);

    
   
235

   

    
   
236
    return rowNum.getRowNumCount();

    
   
237
  }

    
   
238

   

    
   
239
  /**

    
   
240
   * It sums up the value returned from various regions. In case qualifier is

    
   
241
   * null, summation of all the column qualifiers in the given family is done.

    
   
242
   * 

    
   
243
   * @param <R>

    
   
244
   * 

    
   
245
   * @param tableName

    
   
246
   * @param startKey

    
   
247
   * @param endKey

    
   
248
   * @param colFamily

    
   
249
   * @param colQualifier

    
   
250
   * @param ci

    
   
251
   * @param f

    
   
252
   * @return

    
   
253
   * @throws Throwable

    
   
254
   */

    
   
255
  public <R> R getSum(final byte[] tableName, final byte[] startKey,

    
   
256
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
257
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
258
    validateStartEndRows(startKey, endKey);

    
   
259
    class SumCallBack implements Batch.Callback<R> {

    
   
260
      R sumVal = ci.getInitialValue();

    
   
261

   

    
   
262
      public R getSumResult() {

    
   
263
        return sumVal;

    
   
264
      }

    
   
265

   

    
   
266
      @Override

    
   
267
      public void update(byte[] region, byte[] row, R result) {

    
   
268
        sumVal = ci.add(sumVal, result);

    
   
269
      }

    
   
270
    }

    
   
271
    SumCallBack sumCallBack = new SumCallBack();

    
   
272
    HTable table = new HTable(tableName);

    
   
273
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
274
        new Batch.Call<AggregateCpProtocol, R>() {

    
   
275
          @Override

    
   
276
          public R call(AggregateCpProtocol instance) throws IOException {

    
   
277
            return instance.getSum(colFamily, colQualifier, startKey, endKey,

    
   
278
                ci, f);

    
   
279
          }

    
   
280
        }, sumCallBack);

    
   
281
    return sumCallBack.getSumResult();

    
   
282
  }

    
   
283

   

    
   
284
  /**

    
   
285
   * It computes average while fetching sum and row count from all the

    
   
286
   * corresponding regions. Approach is to compute a global sum of region level

    
   
287
   * sum and rowcount and then compute the average.

    
   
288
   * 

    
   
289
   * @param tableName

    
   
290
   * @param startKey

    
   
291
   * @param endKey

    
   
292
   * @param colFamily

    
   
293
   * @param colQualifier

    
   
294
   * @throws Throwable

    
   
295
   */

    
   
296
  private <R> List<R> getAvgArgs(final byte[] tableName, final byte[] startKey,

    
   
297
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
298
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
299
    validateStartEndRows(startKey, endKey);

    
   
300
    class AvgCallBack implements Batch.Callback<List<R>> {

    
   
301
      R sumVal = ci.getInitialValue();

    
   
302
      R rowVal = ci.getInitialValue();

    
   
303

   

    
   
304
      public List<R> getAvgArgs() {

    
   
305
        List<R> l = new ArrayList<R>();

    
   
306
        l.add(sumVal);

    
   
307
        l.add(rowVal);

    
   
308
        return l;

    
   
309
      }

    
   
310

   

    
   
311
      @Override

    
   
312
      public void update(byte[] region, byte[] row, List<R> result) {

    
   
313
        sumVal = ci.add(sumVal, result.get(0));

    
   
314
        rowVal = ci.add(rowVal, result.get(1));

    
   
315
      }

    
   
316
    }

    
   
317
    AvgCallBack avgCallBack = new AvgCallBack();

    
   
318
    HTable table = new HTable(tableName);

    
   
319
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
320
        new Batch.Call<AggregateCpProtocol, List<R>>() {

    
   
321
          @Override

    
   
322
          public List<R> call(AggregateCpProtocol instance) throws IOException {

    
   
323
            return instance.getAvg(colFamily, colQualifier, startKey, endKey,

    
   
324
                ci, f);

    
   
325
          }

    
   
326
        }, avgCallBack);

    
   
327
    return avgCallBack.getAvgArgs();

    
   
328
  }

    
   
329

   

    
   
330
  /**

    
   
331
   * This is the client side interface/handle for calling the average method for

    
   
332
   * a given cf-cq combination. It was necessary to add one more call stack as

    
   
333
   * its return type should be a decimal value, irrespective of what

    
   
334
   * columninterpreter says. So, this methods collects the necessary parameters

    
   
335
   * to compute the average and returs the double value.

    
   
336
   * 

    
   
337
   * @param tableName

    
   
338
   * @param startKey

    
   
339
   * @param endKey

    
   
340
   * @param colFamily

    
   
341
   * @param colQualifier

    
   
342
   * @param ci

    
   
343
   * @param <R>

    
   
344
   * @param f

    
   
345
   * @return

    
   
346
   * @throws Throwable

    
   
347
   * 

    
   
348
   */

    
   
349
  public <R> double getAvg(final byte[] tableName, final byte[] startKey,

    
   
350
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
351
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
352
    List<R> l = getAvgArgs(tableName, startKey, endKey, colFamily,

    
   
353
        colQualifier, ci, f);

    
   
354
    R q = l.get(0), d = l.get(1);

    
   
355
    return ci.divide(q, d);

    
   
356
  }

    
   
357

   

    
   
358
  /**

    
   
359
   * It computes a global standard deviation for a given column and its value.

    
   
360
   * Standard deviation is square root of (average of squares -

    
   
361
   * average*average). From individual regions, it obtains sum, square sum and

    
   
362
   * number of rows. With these, the above values are computed to get the global

    
   
363
   * std.

    
   
364
   * 

    
   
365
   * @param tableName

    
   
366
   * @param startKey

    
   
367
   * @param endKey

    
   
368
   * @param colFamily

    
   
369
   * @param colQualifier

    
   
370
   * @return TODO: number formatting

    
   
371
   * @throws Throwable

    
   
372
   */

    
   
373
  private <R> List<R> getStdArgs(final byte[] tableName, final byte[] startKey,

    
   
374
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
375
      final ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
376
    validateStartEndRows(startKey, endKey);

    
   
377
    class StdCallback implements Batch.Callback<List<R>> {

    
   
378
      R rowCountVal = ci.getInitialValue();

    
   
379
      R sumVal = ci.getInitialValue(), sumSqVal = ci.getInitialValue();

    
   
380

   

    
   
381
      public List<R> getStdParams() {

    
   
382
        List<R> l = new ArrayList<R>();

    
   
383
        l.add(sumVal);

    
   
384
        l.add(sumSqVal);

    
   
385
        l.add(rowCountVal);

    
   
386
        return l;

    
   
387
      }

    
   
388

   

    
   
389
      @Override

    
   
390
      public void update(byte[] region, byte[] row, List<R> result) {

    
   
391
        sumVal = ci.add(sumVal, result.get(0));

    
   
392
        sumSqVal = ci.add(sumSqVal, result.get(1));

    
   
393
        rowCountVal = ci.add(rowCountVal, result.get(2));

    
   
394
      }

    
   
395
    }

    
   
396
    StdCallback stdCallback = new StdCallback();

    
   
397
    HTable table = new HTable(tableName);

    
   
398
    table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,

    
   
399
        new Batch.Call<AggregateCpProtocol, List<R>>() {

    
   
400
          @Override

    
   
401
          public List<R> call(AggregateCpProtocol instance) throws IOException {

    
   
402
            return instance.getStd(colFamily, colQualifier, startKey, endKey,

    
   
403
                ci, f);

    
   
404
          }

    
   
405

   

    
   
406
        }, stdCallback);

    
   
407
    return stdCallback.getStdParams();

    
   
408
  }

    
   
409

   

    
   
410
  /**

    
   
411
   * This is the client side interface/handle for calling the std method for a

    
   
412
   * given cf-cq combination. It was necessary to add one more call stack as its

    
   
413
   * return type should be a decimal value, irrespective of what

    
   
414
   * columninterpreter says. So, this methods collects the necessary parameters

    
   
415
   * to compute the std and returns the double value.

    
   
416
   * 

    
   
417
   * @param <R>

    
   
418
   * 

    
   
419
   * @param tableName

    
   
420
   * @param startKey

    
   
421
   * @param endKey

    
   
422
   * @param colFamily

    
   
423
   * @param colQualifier

    
   
424
   * @param ci

    
   
425
   * @param f

    
   
426
   * @return

    
   
427
   * @throws Throwable

    
   
428
   */

    
   
429
  public <R> double getStd(final byte[] tableName, final byte[] startKey,

    
   
430
      final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,

    
   
431
      ColumnInterpreter<R> ci, final Filter f) throws Throwable {

    
   
432
    List<R> l = getStdArgs(tableName, startKey, endKey, colFamily,

    
   
433
        colQualifier, ci, f);

    
   
434

   

    
   
435
    double res = 0d;

    
   
436
    double avg = ci.divide(l.get(0), l.get(2));

    
   
437

   

    
   
438
    double avgOfSumSq = ci.divide(l.get(1), l.get(2));

    
   
439
    res = avgOfSumSq - (avg) * (avg); // variance

    
   
440
    res = Math.pow(res, 0.5);

    
   
441

   

    
   
442
    return res;

    
   
443
  }

    
   
444

   

    
   
445
}
/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
New File
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java
New File
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocolImpl.java
New File
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
New File
 
/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java
New File
 
  1. /src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java: Loading...
  2. /src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java: Loading...
  3. /src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java: Loading...
  4. /src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocolImpl.java: Loading...
  5. /src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java: Loading...
  6. /src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java: Loading...