Review Board 1.7.22


HBASE-3584 We need to atomically put/delete/increment in one call

Review Request #3481 - Created Jan. 12, 2012 and submitted

Lars Hofhansl
trunk
HBASE-3584
Reviewers
hbase
hbase
Add API for atomic row mutations to HBase (currently Put and Delete).

Client API would look like this:
    Delete d = new Delete(ROW);
    Put p = new Put(ROW);
    ...
    AtomicRowMutation arm = new AtomicRowMutation(ROW);
    arm.add(p);
    arm.add(d);
    myHtable.mutateAtomically(arm);
* Simple functional test: TestFromClientSide.testAtomicRowMutation
* Multithreaded stress test: TestAtomicOperation.testAtomicMutationMultiThreads
* coprocessor test: TestRegionObserverInterface.testAtomicRowMutation
* manual testing
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AtomicRowMutation.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing, software

    
   
13
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
15
 * See the License for the specific language governing permissions and

    
   
16
 * limitations under the License.

    
   
17
 */

    
   
18
package org.apache.hadoop.hbase.client;

    
   
19

   

    
   
20
import java.io.DataInput;

    
   
21
import java.io.DataOutput;

    
   
22
import java.io.IOException;

    
   
23
import java.util.ArrayList;

    
   
24
import java.util.Arrays;

    
   
25
import java.util.Collections;

    
   
26
import java.util.List;

    
   
27

   

    
   
28
import org.apache.hadoop.hbase.HConstants;

    
   
29
import org.apache.hadoop.hbase.io.HbaseObjectWritable;

    
   
30
import org.apache.hadoop.hbase.util.Bytes;

    
   
31

   

    
   
32
/**

    
   
33
 * Performs multiple mutations atomically on a single row.

    
   
34
 * Currently {@link Put} and {@link Delete} are supported.

    
   
35
 *

    
   
36
 * The mutations are performed in the order in which they

    
   
37
 * were added.

    
   
38
 */

    
   
39
public class AtomicRowMutation implements Row {

    
   
40
  private List<Mutation> mutations = new ArrayList<Mutation>();

    
   
41
  protected byte [] row;

    
   
42
  private static final byte VERSION = (byte)1;

    
   
43

   

    
   
44
  /** Constructor for Writable. DO NOT USE */

    
   
45
  public AtomicRowMutation() {}

    
   
46

   

    
   
47
  /**

    
   
48
   * Create an atomic mutation for the specified row.

    
   
49
   * @param row row key

    
   
50
   */

    
   
51
  public AtomicRowMutation(byte [] row) {

    
   
52
    if(row == null || row.length > HConstants.MAX_ROW_LENGTH) {

    
   
53
      throw new IllegalArgumentException("Row key is invalid");

    
   
54
    }

    
   
55
    this.row = Arrays.copyOf(row, row.length);

    
   
56
  }

    
   
57

   

    
   
58
  /**

    
   
59
   * Add a {@link Put} operation to the list of mutations

    
   
60
   * @param p The {@link Put} to add

    
   
61
   * @throws IOException

    
   
62
   */

    
   
63
  public void add(Put p) throws IOException {

    
   
64
    internalAdd(p);

    
   
65
  }

    
   
66

   

    
   
67
  /**

    
   
68
   * Add a {@link Delete} operation to the list of mutations

    
   
69
   * @param d The {@link Delete} to add

    
   
70
   * @throws IOException

    
   
71
   */

    
   
72
  public void add(Delete d) throws IOException {

    
   
73
    internalAdd(d);

    
   
74
  }

    
   
75

   

    
   
76
  private void internalAdd(Mutation m) throws IOException {

    
   
77
    int res = Bytes.compareTo(this.row, m.getRow());

    
   
78
    if(res != 0) {

    
   
79
      throw new IOException("The row in the recently added Put/Delete " +

    
   
80
          Bytes.toStringBinary(m.getRow()) + " doesn't match the original one " +

    
   
81
          Bytes.toStringBinary(this.row));

    
   
82
    }

    
   
83
    mutations.add(m);

    
   
84
  }

    
   
85

   

    
   
86
  @Override

    
   
87
  public void readFields(final DataInput in) throws IOException {

    
   
88
    int version = in.readByte();

    
   
89
    if (version > VERSION) {

    
   
90
      throw new IOException("version not supported");

    
   
91
    }

    
   
92
    this.row = Bytes.readByteArray(in);

    
   
93
    int numMutations = in.readInt();

    
   
94
    mutations.clear();

    
   
95
    for(int i=0; i<numMutations; i++) {

    
   
96
      mutations.add((Mutation) HbaseObjectWritable.readObject(in, null));

    
   
97
    }

    
   
98
  }

    
   
99

   

    
   
100
  @Override

    
   
101
  public void write(final DataOutput out) throws IOException {

    
   
102
    out.writeByte(VERSION);

    
   
103
    Bytes.writeByteArray(out, this.row);

    
   
104
    out.writeInt(mutations.size());

    
   
105
    for (Mutation m : mutations) {

    
   
106
      HbaseObjectWritable.writeObject(out, m, m.getClass(), null);

    
   
107
    }

    
   
108
  }

    
   
109

   

    
   
110
  @Override

    
   
111
  public int compareTo(Row i) {

    
   
112
    return Bytes.compareTo(this.getRow(), i.getRow());

    
   
113
  }

    
   
114

   

    
   
115
  @Override

    
   
116
  public byte[] getRow() {

    
   
117
    return row;

    
   
118
  }

    
   
119

   

    
   
120
  /**

    
   
121
   * @return An unmodifiable list of the current mutations.

    
   
122
   */

    
   
123
  public List<Mutation> getMutations() {

    
   
124
    return Collections.unmodifiableList(mutations);

    
   
125
  }

    
   
126
}
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Revision 1231172 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
Revision 1231172 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
Revision 1231172 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
Revision 1231172 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
Revision 1231172 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Revision 1231172 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Revision 1231172 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
Revision 1231172 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Revision 1231172 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
Revision 1231172 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
Revision 1231172 New Change
 
  1. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AtomicRowMutation.java: Loading...
  2. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java: Loading...
  3. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java: Loading...
  4. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java: Loading...
  5. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java: Loading...
  6. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java: Loading...
  7. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java: Loading...
  8. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java: Loading...
  9. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java: Loading...
  10. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java: Loading...
  11. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java: Loading...
  12. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java: Loading...