1818package com .alipay .oceanbase .hbase .util ;
1919
2020import com .alipay .oceanbase .hbase .OHTable ;
21- import com .google . common . annotations . VisibleForTesting ;
21+ import com .alipay . oceanbase . rpc . protocol . payload . impl . execute . ObTableBatchOperation ;
2222import org .apache .hadoop .classification .InterfaceAudience ;
2323import org .apache .hadoop .conf .Configuration ;
24+ import org .apache .hadoop .hbase .KeyValue ;
2425import org .apache .hadoop .hbase .TableName ;
2526import org .apache .hadoop .hbase .client .*;
2627import org .slf4j .Logger ;
3132import java .util .concurrent .ExecutorService ;
3233import java .util .concurrent .TimeUnit ;
3334import java .util .concurrent .atomic .AtomicLong ;
34- import static com .alipay .oceanbase .rpc .util .TableClientLoggerFactory .LCD ;
35+
36+ import static com .alipay .oceanbase .hbase .util .TableHBaseLoggerFactory .LCD ;
37+ import static com .alipay .oceanbase .rpc .ObGlobal .*;
3538
3639@ InterfaceAudience .Private
3740public class OHBufferedMutatorImpl implements BufferedMutator {
@@ -40,13 +43,11 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
4043
4144 private final ExceptionListener listener ;
4245
43- private final OHTable ohTable ;
4446 private final TableName tableName ;
4547 private volatile Configuration conf ;
4648
47- @ VisibleForTesting
49+ private OHTable ohTable ;
4850 final ConcurrentLinkedQueue <Mutation > asyncWriteBuffer = new ConcurrentLinkedQueue <Mutation >();
49- @ VisibleForTesting
5051 AtomicLong currentAsyncBufferSize = new AtomicLong (0 );
5152
5253 private long writeBufferSize ;
@@ -55,9 +56,11 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
5556 private final ExecutorService pool ;
5657 private int rpcTimeout ;
5758 private int operationTimeout ;
59+ private static final long OB_VERSION_4_2_5_1 = calcVersion (4 , (short ) 2 ,
60+ (byte ) 5 , (byte ) 1 );
5861
59- public OHBufferedMutatorImpl (OHConnectionImpl ohConnection , BufferedMutatorParams params )
60- throws IOException {
62+ public OHBufferedMutatorImpl (OHConnectionImpl ohConnection , BufferedMutatorParams params ,
63+ OHTable ohTable ) throws IOException {
6164 if (ohConnection == null || ohConnection .isClosed ()) {
6265 throw new IllegalArgumentException ("Connection is null or closed." );
6366 }
@@ -77,7 +80,11 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
7780 .getMaxKeyValueSize () : connectionConfig .getMaxKeyValueSize ();
7881
7982 // create an OHTable object to do batch work
80- this .ohTable = new OHTable (tableName , ohConnection , connectionConfig , pool );
83+ if (ohTable != null ) {
84+ this .ohTable = ohTable ;
85+ } else {
86+ this .ohTable = new OHTable (tableName , ohConnection , connectionConfig , pool );
87+ }
8188 }
8289
8390 @ Override
@@ -119,14 +126,12 @@ public void mutate(List<? extends Mutation> mutations) throws IOException {
119126 validateOperation (m );
120127 toAddSize += m .heapSize ();
121128 }
122-
123129 currentAsyncBufferSize .addAndGet (toAddSize );
124130 asyncWriteBuffer .addAll (mutations );
125131
126132 if (currentAsyncBufferSize .get () > writeBufferSize ) {
127- execute (false );
133+ batchExecute (false );
128134 }
129-
130135 }
131136
132137 /**
@@ -142,10 +147,18 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
142147 }
143148 if (mt instanceof Put ) {
144149 // family empty check is in validatePut
145- HTable .validatePut ((Put ) mt , maxKeyValueSize );
146- OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), true );
150+ OHTable .validatePut ((Put ) mt , maxKeyValueSize );
151+ if (isMultiFamilySupport ()) {
152+ OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), true );
153+ } else {
154+ OHTable .checkFamilyViolationForOneFamily (mt .getFamilyMap ().keySet ());
155+ }
147156 } else {
148- OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), false );
157+ if (isMultiFamilySupport ()) {
158+ OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), false );
159+ } else {
160+ OHTable .checkFamilyViolationForOneFamily (mt .getFamilyMap ().keySet ());
161+ }
149162 }
150163 }
151164
@@ -156,7 +169,7 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
156169 * @param flushAll - if true, sends all the writes and wait for all of them to finish before
157170 * returning.
158171 */
159- private void execute (boolean flushAll ) throws IOException {
172+ private void batchExecute (boolean flushAll ) throws IOException {
160173 LinkedList <Mutation > execBuffer = new LinkedList <>();
161174 long dequeuedSize = 0L ;
162175 try {
@@ -172,19 +185,16 @@ private void execute(boolean flushAll) throws IOException {
172185 if (execBuffer .isEmpty ()) {
173186 return ;
174187 }
175- ohTable .batch (execBuffer );
188+ Object [] results = new Object [execBuffer .size ()];
189+ ohTable .batch (execBuffer , results );
176190 // if commit all successfully, clean execBuffer
177191 execBuffer .clear ();
178192 } catch (Exception ex ) {
179- LOGGER .error (LCD .convert ("01-00026" ), ex );
193+ // do not recollect error operations, notify outside
194+ LOGGER .error ("error happens: table name = " , tableName .getNameAsString (), ex );
180195 if (ex .getCause () instanceof RetriesExhaustedWithDetailsException ) {
181- LOGGER .error (tableName + ": One or more of the operations have failed after retries." );
196+ LOGGER .error (tableName . getNameAsString () + ": One or more of the operations have failed after retries." , ex );
182197 RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException ) ex .getCause ();
183- // recollect mutations
184- execBuffer .clear ();
185- for (int i = 0 ; i < retryException .getNumExceptions (); ++i ) {
186- execBuffer .add ((Mutation ) retryException .getRow (i ));
187- }
188198 if (listener != null ) {
189199 listener .onException (retryException , this );
190200 } else {
@@ -194,12 +204,6 @@ private void execute(boolean flushAll) throws IOException {
194204 LOGGER .error ("Errors unrelated to operations occur during mutation operation" , ex );
195205 throw ex ;
196206 }
197- } finally {
198- for (Mutation mutation : execBuffer ) {
199- long size = mutation .heapSize ();
200- currentAsyncBufferSize .addAndGet (size );
201- asyncWriteBuffer .add (mutation );
202- }
203207 }
204208 }
205209
@@ -209,7 +213,7 @@ public void close() throws IOException {
209213 return ;
210214 }
211215 try {
212- execute (true );
216+ batchExecute (true );
213217 } finally {
214218 // the pool in ObTableClient will be shut down too
215219 this .pool .shutdown ();
@@ -234,13 +238,21 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException {
234238 }
235239 }
236240
241+ /**
242+ * Only 4_2_5 BP1 - 4_3_0 and after 4_3_4 support multi-cf
243+ * */
244+ boolean isMultiFamilySupport () {
245+ return (OB_VERSION >= OB_VERSION_4_2_5_1 && OB_VERSION < OB_VERSION_4_3_0_0 )
246+ || (OB_VERSION >= OB_VERSION_4_3_4_0 );
247+ }
248+
237249 /**
238250 * Force to commit all operations
239251 * do not care whether the pool is shut down or this BufferedMutator is closed
240252 */
241253 @ Override
242254 public void flush () throws IOException {
243- execute (true );
255+ batchExecute (true );
244256 }
245257
246258 @ Override
@@ -258,6 +270,10 @@ public void setOperationTimeout(int operationTimeout) {
258270 this .ohTable .setOperationTimeout (operationTimeout );
259271 }
260272
273+ public long getCurrentBufferSize () {
274+ return currentAsyncBufferSize .get ();
275+ }
276+
261277 @ Deprecated
262278 public List <Row > getWriteBuffer () {
263279 return Arrays .asList (asyncWriteBuffer .toArray (new Row [0 ]));
0 commit comments