Review Board 1.7.22


Coprocessors: Support aggregate functions

Review Request #585 - Created April 12, 2011 and updated

Ted Yu
trunk
HBASE-1512
Reviewers
hbase
ghelmling
hbase
This patch provides reference implementation for aggregate function support through Coprocessor framework.
ColumnInterpreter interface allows client to specify how the value's byte array is interpreted.
Some of the thoughts are summarized at http://zhihongyu.blogspot.com/2011/03/genericizing-endpointcoprocessor.html

Himanshu Vashishtha started the work. I provided some review comments and some of the code.
TestAggFunctions passes.
/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
New File

    
   
1
/*

    
   
2
 * Copyright 2011 The Apache Software Foundation

    
   
3
 *

    
   
4
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
5
 * or more contributor license agreements.  See the NOTICE file

    
   
6
 * distributed with this work for additional information

    
   
7
 * regarding copyright ownership.  The ASF licenses this file

    
   
8
 * to you under the Apache License, Version 2.0 (the

    
   
9
 * "License"); you may not use this file except in compliance

    
   
10
 * with the License.  You may obtain a copy of the License at

    
   
11
 *

    
   
12
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
13
 *

    
   
14
 * Unless required by applicable law or agreed to in writing, software

    
   
15
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
17
 * See the License for the specific language governing permissions and

    
   
18
 * limitations under the License.

    
   
19
 */

    
   
20

   

    
   
21
package org.apache.hadoop.hbase.client.coprocessor;

    
   
22

   

    
   
23
import java.io.IOException;

    
   
24
import java.util.ArrayList;

    
   
25
import java.util.List;

    
   
26

   

    
   
27
import org.apache.commons.logging.Log;

    
   
28
import org.apache.commons.logging.LogFactory;

    
   
29
import org.apache.hadoop.conf.Configuration;

    
   
30
import org.apache.hadoop.hbase.HConstants;

    
   
31
import org.apache.hadoop.hbase.client.HTable;

    
   
32
import org.apache.hadoop.hbase.client.Scan;

    
   
33
import org.apache.hadoop.hbase.coprocessor.AggregateProtocol;

    
   
34
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;

    
   
35
import org.apache.hadoop.hbase.util.Bytes;

    
   
36
import org.apache.hadoop.hbase.util.Pair;

    
   
37

   

    
   
38
/**

    
   
39
 * This client class is for invoking the aggregate functions deployed on the

    
   
40
 * Region Server side via the AggregateProtocol. This class will implement the

    
   
41
 * supporting functionality for summing/processing the individual results

    
   
42
 * obtained from the AggregateProtocol for each region.

    
   
43
 * <p>

    
   
44
 * This will serve as the client side handler for invoking the agg functions.

    
   
45
 * <ul>

    
   
46
 * For all aggregate functions,

    
   
47
 * <li>start row < end row is an essential condition (if they are not

    
   
48
 * {@link HConstants#EMPTY_BYTE_ARRAY})

    
   
49
 * <li>Column family can't be null. In case if multiple families are provided,

    
   
50
 * only first one will be picked. An optional column qualifier can also be

    
   
51
 * defined.

    
   
52
 * <li>For methods to find maximum, minimum, sum, rowcount, it returns the

    
   
53
 * parameter type. For average and std, it returns a double value. To achieve

    
   
54
 * this, public methods getAvg and getStd call corresponding private methods

    
   
55
 * getAvgArgs and getStdArgs and compute the result. For row count, it returns a

    
   
56
 * long value.

    
   
57
 */

    
   
58
public class AggregationClient {

    
   
59

   

    
   
60
  private static final Log log = LogFactory.getLog(AggregationClient.class);

    
   
61
  Configuration conf;

    
   
62

   

    
   
63
  /**

    
   
64
   * Constructor with Conf object

    
   
65
   * @param cfg

    
   
66
   */

    
   
67
  public AggregationClient(Configuration cfg) {

    
   
68
    this.conf = cfg;

    
   
69
  }

    
   
70

   

    
   
71
  /**

    
   
72
   * It gives the maximum value of a column for a given column family for the

    
   
73
   * given range. In case qualifier is null, a max of all values for the given

    
   
74
   * family is returned.

    
   
75
   * @param tableName

    
   
76
   * @param ci

    
   
77
   * @param scan

    
   
78
   * @return max val <R>

    
   
79
   * @throws Throwable

    
   
80
   *           The caller is supposed to handle the exception as they are thrown

    
   
81
   *           & propagated to it.

    
   
82
   */

    
   
83
  public <R> R max(final byte[] tableName, final ColumnInterpreter<R, R> ci,

    
   
84
      final Scan scan) throws Throwable {

    
   
85
    validateParameters(scan);

    
   
86
    HTable table = new HTable(conf, tableName);

    
   
87

   

    
   
88
    class MaxCallBack implements Batch.Callback<R> {

    
   
89
      R max = null;

    
   
90

   

    
   
91
      R getMax() {

    
   
92
        return max;

    
   
93
      }

    
   
94

   

    
   
95
      @Override

    
   
96
      public void update(byte[] region, byte[] row, R result) {

    
   
97
        max = ci.compare(max, result) < 0 ? result : max;

    
   
98
      }

    
   
99
    }

    
   
100
    MaxCallBack aMaxCallBack = new MaxCallBack();

    
   
101
    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan

    
   
102
        .getStopRow(), new Batch.Call<AggregateProtocol, R>() {

    
   
103
      @Override

    
   
104
      public R call(AggregateProtocol instance) throws IOException {

    
   
105
        return instance.getMax(ci, scan);

    
   
106
      }

    
   
107
    }, aMaxCallBack);

    
   
108
    return aMaxCallBack.getMax();

    
   
109
  }

    
   
110

   

    
   
111
  private void validateParameters(Scan scan) throws IOException {

    
   
112
    if (scan == null

    
   
113
        || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes

    
   
114
            .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))

    
   
115
        || Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) {

    
   
116
      throw new IOException(

    
   
117
          "Agg client Exception: Startrow can't be equal or greater than Stoprow");

    
   
118
    }

    
   
119
    if (scan.getFamilyMap().isEmpty()) {

    
   
120
      throw new IOException("Family can't be null");

    
   
121
    }

    
   
122
  }

    
   
123

   

    
   
124
  /**

    
   
125
   * It gives the minimum value of a column for a given column family for the

    
   
126
   * given range. In case qualifier is null, a min of all values for the given

    
   
127
   * family is returned.

    
   
128
   * @param tableName

    
   
129
   * @param ci

    
   
130
   * @param scan

    
   
131
   * @return min val <R>

    
   
132
   * @throws Throwable

    
   
133
   */

    
   
134
  public <R> R min(final byte[] tableName, final ColumnInterpreter<R, R> ci,

    
   
135
      final Scan scan) throws Throwable {

    
   
136
    validateParameters(scan);

    
   
137
    class MinCallBack implements Batch.Callback<R> {

    
   
138

   

    
   
139
      private R min = null;

    
   
140

   

    
   
141
      public R getMinimum() {

    
   
142
        return min;

    
   
143
      }

    
   
144

   

    
   
145
      @Override

    
   
146
      public void update(byte[] region, byte[] row, R result) {

    
   
147
        min = (min == null || ci.compare(result, min) < 0) ? result : min;

    
   
148
      }

    
   
149
    }

    
   
150
    HTable table = new HTable(conf, tableName);

    
   
151
    MinCallBack minCallBack = new MinCallBack();

    
   
152
    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan

    
   
153
        .getStopRow(), new Batch.Call<AggregateProtocol, R>() {

    
   
154

   

    
   
155
      @Override

    
   
156
      public R call(AggregateProtocol instance) throws IOException {

    
   
157
        return instance.getMin(ci, scan);

    
   
158
      }

    
   
159
    }, minCallBack);

    
   
160
    log.debug("Min fom all regions is: " + minCallBack.getMinimum());

    
   
161
    return minCallBack.getMinimum();

    
   
162
  }

    
   
163

   

    
   
164
  /**

    
   
165
   * It gives the row count, by summing up the individual results obtained from

    
   
166
   * regions. In case the qualifier is null, FirstKEyValueFilter is used to

    
   
167
   * optimised the operation. In case qualifier is provided, I can't use the

    
   
168
   * filter as it may set the flag to skip to next row, but the value read is

    
   
169
   * not of the given filter: in this case, this particular row will not be

    
   
170
   * counted ==> an error.

    
   
171
   * @param tableName

    
   
172
   * @param ci

    
   
173
   * @param scan

    
   
174
   * @return

    
   
175
   * @throws Throwable

    
   
176
   */

    
   
177
  public <R> long rowCount(final byte[] tableName,

    
   
178
      final ColumnInterpreter<R, R> ci, final Scan scan) throws Throwable {

    
   
179
    validateParameters(scan);

    
   
180
    class RowNumCallback implements Batch.Callback<Long> {

    
   
181
      private long rowCountL = 0l;

    
   
182

   

    
   
183
      public long getRowNumCount() {

    
   
184
        return rowCountL;

    
   
185
      }

    
   
186

   

    
   
187
      @Override

    
   
188
      public void update(byte[] region, byte[] row, Long result) {

    
   
189
        rowCountL += result.longValue();

    
   
190
      }

    
   
191
    }

    
   
192
    RowNumCallback rowNum = new RowNumCallback();

    
   
193
    HTable table = new HTable(conf, tableName);

    
   
194
    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan

    
   
195
        .getStopRow(), new Batch.Call<AggregateProtocol, Long>() {

    
   
196
      @Override

    
   
197
      public Long call(AggregateProtocol instance) throws IOException {

    
   
198
        return instance.getRowNum(ci, scan);

    
   
199
      }

    
   
200
    }, rowNum);

    
   
201
    return rowNum.getRowNumCount();

    
   
202
  }

    
   
203

   

    
   
204
  /**

    
   
205
   * It sums up the value returned from various regions. In case qualifier is

    
   
206
   * null, summation of all the column qualifiers in the given family is done.

    
   
207
   * @param tableName

    
   
208
   * @param ci

    
   
209
   * @param scan

    
   
210
   * @return sum <S>

    
   
211
   * @throws Throwable

    
   
212
   */

    
   
213
  public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci,

    
   
214
      final Scan scan) throws Throwable {

    
   
215
    validateParameters(scan);

    
   
216
    class SumCallBack implements Batch.Callback<S> {

    
   
217
      S sumVal = null;

    
   
218

   

    
   
219
      public S getSumResult() {

    
   
220
        return sumVal;

    
   
221
      }

    
   
222

   

    
   
223
      @Override

    
   
224
      public void update(byte[] region, byte[] row, S result) {

    
   
225
        sumVal = ci.add(sumVal, result);

    
   
226
      }

    
   
227
    }

    
   
228
    SumCallBack sumCallBack = new SumCallBack();

    
   
229
    HTable table = new HTable(conf, tableName);

    
   
230
    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan

    
   
231
        .getStopRow(), new Batch.Call<AggregateProtocol, S>() {

    
   
232
      @Override

    
   
233
      public S call(AggregateProtocol instance) throws IOException {

    
   
234
        return instance.getSum(ci, scan);

    
   
235
      }

    
   
236
    }, sumCallBack);

    
   
237
    return sumCallBack.getSumResult();

    
   
238
  }

    
   
239

   

    
   
240
  /**

    
   
241
   * It computes average while fetching sum and row count from all the

    
   
242
   * corresponding regions. Approach is to compute a global sum of region level

    
   
243
   * sum and rowcount and then compute the average.

    
   
244
   * @param tableName

    
   
245
   * @param scan

    
   
246
   * @throws Throwable

    
   
247
   */

    
   
248
  private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName,

    
   
249
      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {

    
   
250
    validateParameters(scan);

    
   
251
    class AvgCallBack implements Batch.Callback<Pair<S, Long>> {

    
   
252
      S sum = null;

    
   
253
      Long rowCount = 0l;

    
   
254

   

    
   
255
      public Pair<S, Long> getAvgArgs() {

    
   
256
        return new Pair<S, Long>(sum, rowCount);

    
   
257
      }

    
   
258

   

    
   
259
      @Override

    
   
260
      public void update(byte[] region, byte[] row, Pair<S, Long> result) {

    
   
261
        sum = ci.add(sum, result.getFirst());

    
   
262
        rowCount += result.getSecond();

    
   
263
      }

    
   
264
    }

    
   
265
    AvgCallBack avgCallBack = new AvgCallBack();

    
   
266
    HTable table = new HTable(conf, tableName);

    
   
267
    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan

    
   
268
        .getStopRow(), new Batch.Call<AggregateProtocol, Pair<S, Long>>() {

    
   
269
      @Override

    
   
270
      public Pair<S, Long> call(AggregateProtocol instance) throws IOException {

    
   
271
        return instance.getAvg(ci, scan);

    
   
272
      }

    
   
273
    }, avgCallBack);

    
   
274
    return avgCallBack.getAvgArgs();

    
   
275
  }

    
   
276

   

    
   
277
  /**

    
   
278
   * This is the client side interface/handle for calling the average method for

    
   
279
   * a given cf-cq combination. It was necessary to add one more call stack as

    
   
280
   * its return type should be a decimal value, irrespective of what

    
   
281
   * columninterpreter says. So, this methods collects the necessary parameters

    
   
282
   * to compute the average and returs the double value.

    
   
283
   * @param tableName

    
   
284
   * @param ci

    
   
285
   * @param scan

    
   
286
   * @return

    
   
287
   * @throws Throwable

    
   
288
   */

    
   
289
  public <R, S> double avg(final byte[] tableName,

    
   
290
      final ColumnInterpreter<R, S> ci, Scan scan) throws Throwable {

    
   
291
    Pair<S, Long> p = getAvgArgs(tableName, ci, scan);

    
   
292
    return ci.divideForAvg(p.getFirst(), p.getSecond());

    
   
293
  }

    
   
294

   

    
   
295
  /**

    
   
296
   * It computes a global standard deviation for a given column and its value.

    
   
297
   * Standard deviation is square root of (average of squares -

    
   
298
   * average*average). From individual regions, it obtains sum, square sum and

    
   
299
   * number of rows. With these, the above values are computed to get the global

    
   
300
   * std.

    
   
301
   * @param tableName

    
   
302
   * @param scan

    
   
303
   * @return

    
   
304
   * @throws Throwable

    
   
305
   */

    
   
306
  private <R, S> Pair<List<S>, Long> getStdArgs(final byte[] tableName,

    
   
307
      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {

    
   
308
    validateParameters(scan);

    
   
309
    class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {

    
   
310
      long rowCountVal = 0l;

    
   
311
      S sumVal = null, sumSqVal = null;

    
   
312

   

    
   
313
      public Pair<List<S>, Long> getStdParams() {

    
   
314
        List<S> l = new ArrayList<S>();

    
   
315
        l.add(sumVal);

    
   
316
        l.add(sumSqVal);

    
   
317
        Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);

    
   
318
        return p;

    
   
319
      }

    
   
320

   

    
   
321
      @Override

    
   
322
      public void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {

    
   
323
        sumVal = ci.add(sumVal, result.getFirst().get(0));

    
   
324
        sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));

    
   
325
        rowCountVal += result.getSecond();

    
   
326
      }

    
   
327
    }

    
   
328
    StdCallback stdCallback = new StdCallback();

    
   
329
    HTable table = new HTable(conf, tableName);

    
   
330
    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan

    
   
331
        .getStopRow(),

    
   
332
        new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() {

    
   
333
          @Override

    
   
334
          public Pair<List<S>, Long> call(AggregateProtocol instance)

    
   
335
              throws IOException {

    
   
336
            return instance.getStd(ci, scan);

    
   
337
          }

    
   
338

   

    
   
339
        }, stdCallback);

    
   
340
    return stdCallback.getStdParams();

    
   
341
  }

    
   
342

   

    
   
343
  /**

    
   
344
   * This is the client side interface/handle for calling the std method for a

    
   
345
   * given cf-cq combination. It was necessary to add one more call stack as its

    
   
346
   * return type should be a decimal value, irrespective of what

    
   
347
   * columninterpreter says. So, this methods collects the necessary parameters

    
   
348
   * to compute the std and returns the double value.

    
   
349
   * @param tableName

    
   
350
   * @param ci

    
   
351
   * @param scan

    
   
352
   * @return

    
   
353
   * @throws Throwable

    
   
354
   */

    
   
355
  public <R, S> double std(final byte[] tableName, ColumnInterpreter<R, S> ci,

    
   
356
      Scan scan) throws Throwable {

    
   
357
    Pair<List<S>, Long> p = getStdArgs(tableName, ci, scan);

    
   
358
    double res = 0d;

    
   
359
    double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());

    
   
360
    double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());

    
   
361
    res = avgOfSumSq - (avg) * (avg); // variance

    
   
362
    res = Math.pow(res, 0.5);

    
   
363
    return res;

    
   
364
  }

    
   
365

   

    
   
366
}
/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
New File
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
New File
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java
New File
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
New File
 
/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
New File
 
  1. /src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java: Loading...
  2. /src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java: Loading...
  3. /src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java: Loading...
  4. /src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java: Loading...
  5. /src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java: Loading...
  6. /src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java: Loading...