Review Board 1.7.22


Coprocessors: Support aggregate functions

Review Request #585 - Created April 12, 2011 and updated

Ted Yu
trunk
HBASE-1512
Reviewers
hbase
ghelmling
hbase
This patch provides reference implementation for aggregate function support through Coprocessor framework.
ColumnInterpreter interface allows client to specify how the value's byte array is interpreted.
Some of the thoughts are summarized at http://zhihongyu.blogspot.com/2011/03/genericizing-endpointcoprocessor.html

Himanshu Vashishtha started the work. I provided some review comments and some of the code.
TestAggFunctions passes.
/src/main/java/org/apache/hadoop/hbase/client/CursorCallable.java
New File

    
   
1
package org.apache.hadoop.hbase.client;

    
   
2

   

    
   
3
import java.io.IOException;

    
   
4

   

    
   
5
import org.apache.commons.logging.Log;

    
   
6
import org.apache.commons.logging.LogFactory;

    
   
7
import org.apache.hadoop.hbase.NotServingRegionException;

    
   
8

   

    
   
9
/**

    
   
10
 * This captures the call of the cursor behavior from client to the server.

    
   
11
 * It has the location of the regionserver, row, and the regioninfo for that row.

    
   
12
 * 

    
   
13
 * it is almost similar to the scannercallable, so it should have a corresponding ID, 

    
   
14
 * that is mapped to a fat object (CursorResultSetCp) on the server side. this object is created by the 

    
   
15
 * coprocessor endpoint. That object provides support for navigating/pulling rows from a resultset.

    
   
16
 * There can be two types of resultset: InMemoryResultSet and IncrementalResultSet.

    
   
17
 */

    
   
18
public class CursorCallable extends ServerCallable<Result[]> {

    
   
19

   

    
   
20
  private long cursorId = -1;

    
   
21
  private boolean instantiated = false;

    
   
22
  private boolean closed = false;

    
   
23
  private int cache = 1;

    
   
24
  

    
   
25
  private static final Log LOG = LogFactory.getLog(CursorCallable.class);

    
   
26
 

    
   
27
  public void setCache(int cache) {

    
   
28
    this.cache = cache;

    
   
29
  }

    
   
30

   

    
   
31
  @Override

    
   
32
  public void instantiateServer(boolean reload) throws IOException {

    
   
33
      if(!instantiated){

    
   
34
        super.instantiateServer(reload);

    
   
35
        instantiated = true;

    
   
36
      }

    
   
37
  }

    
   
38
  

    
   
39
  public CursorCallable(HConnection connection, byte[] tableName, byte[] row) {

    
   
40
    super(connection, tableName, row);

    
   
41
  }

    
   
42
  

    
   
43
  public void setCursorId(long cursorId) {

    
   
44
    this.cursorId = cursorId;

    
   
45
  }

    
   
46
  

    
   
47
  @Override

    
   
48
  public Result[] call() throws IOException {

    
   
49
    Result [] res;

    
   
50
    if(cursorId == -1)

    
   
51
        return null;// cursor is not set/registered.

    
   
52
    else if (closed && cursorId != -1){ // go and close the cursor on the HRS

    
   
53
      this.close();

    
   
54
    }else{

    
   
55
    try {

    
   
56
      res = this.server.nextCp(cursorId, cache);

    
   
57
      if(res ==null || res.length !=this.cache){ //results are all fetch, set the close flag, so that on next call, the cursor is closed.

    
   
58
        closed = true;

    
   
59
      }

    
   
60
      return res;

    
   
61
    } catch (Exception e) {

    
   
62
      if (e instanceof NotServingRegionException){

    
   
63
        LOG.error("got a NSRE from region server with location"+ this.location.toString());

    
   
64
        // at this point, it will abort the process! (sad but true). not supporting the logic of resending the request.

    
   
65
        throw new IOException("Aborting the process due to NSRE");

    
   
66
      }

    
   
67
      throw new IOException(e.getCause());

    
   
68
    }

    
   
69
    }

    
   
70
    return null;

    
   
71
  }

    
   
72
  

    
   
73
  

    
   
74
  public void setClosed(boolean closed) {

    
   
75
    this.closed = closed;

    
   
76
  }

    
   
77
  

    
   
78
  public void close(){

    
   
79
    if(this.cursorId ==-1)

    
   
80
      return;

    
   
81
    try{

    
   
82
      this.server.closeCp(cursorId);

    
   
83
    }catch(IOException ioe){

    
   
84
      LOG.error("Got an exception while closing the cursor resource");

    
   
85
    }

    
   
86
    this.cursorId = -1; 

    
   
87
  }

    
   
88
}
/src/main/java/org/apache/hadoop/hbase/client/CursorCp.java
Diff Revision 1 Diff Revision 7
 
/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Diff Revision 1 Diff Revision 7
 
/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Diff Revision 1 Diff Revision 7
 
/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
Diff Revision 1 Diff Revision 7
 
/src/main/java/org/apache/hadoop/hbase/client/Scan.java
Diff Revision 1 Diff Revision 7
 
/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
Diff Revision 1 Diff Revision 7
 
/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
Diff Revision 1 Diff Revision 7
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java
Diff Revision 1 Diff Revision 7 - File Reverted
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
Diff Revision 1 Diff Revision 7
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java
Diff Revision 1 Diff Revision 7
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocolImpl.java
Diff Revision 1 Diff Revision 7 - File Reverted
 
/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
Diff Revision 1 Diff Revision 7
 
/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java
Diff Revision 1 Diff Revision 7 - File Reverted
 
/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
Diff Revision 1 Diff Revision 7
 
  1. /src/main/java/org/apache/hadoop/hbase/client/CursorCallable.java: Loading...
  2. /src/main/java/org/apache/hadoop/hbase/client/CursorCp.java: Loading...
  3. /src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java: Loading...
  4. /src/main/java/org/apache/hadoop/hbase/client/HTable.java: Loading...
  5. /src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java: Loading...
  6. /src/main/java/org/apache/hadoop/hbase/client/Scan.java: Loading...
  7. /src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java: Loading...
  8. /src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java: Loading...
  9. /src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java: Loading...
  10. /src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java: Loading...
  11. /src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java: Loading...
  12. /src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocolImpl.java: Loading...
  13. /src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java: Loading...
  14. /src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java: Loading...
  15. /src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java: Loading...