diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java index 88a596a14577..7f45f5cb2d19 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java @@ -240,7 +240,7 @@ public DeletedBlockLogStateManager build() throws IOException { final DeletedBlockLogStateManager impl = new DeletedBlockLogStateManagerImpl( deletedBlocksTransactionTable, statefulServiceConfigTable, containerManager, transactionBuffer); - return scmRatisServer.getProxyHandler(new DeletedBlockLogStateManagerInvoker(impl)); + return scmRatisServer.getProxyHandler(new DeletedBlockLogStateManagerInvoker(impl, scmRatisServer)); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java index 382f18819205..5fe2cd733b01 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java @@ -26,9 +26,11 @@ import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; +import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker; import org.apache.hadoop.hdds.scm.metadata.Replicate; import org.apache.hadoop.util.Time; import org.apache.ratis.protocol.exceptions.NotLeaderException; +import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,10 +88,8 @@ private Object invokeLocal(Method method, Object[] args) LOG.trace("Invoking method {} on target {} with arguments {}", method, localHandler, args); } + Preconditions.assertNull(invoker, "invoker"); try { - if (invoker != null) { - return invoker.invokeLocal(method.getName(), args); - } return method.invoke(localHandler, args); } catch (Exception e) { throw translateException(e); @@ -104,6 +104,7 @@ private Object invokeRatis(Method method, Object[] args) if (LOG.isTraceEnabled()) { LOG.trace("Invoking method {} on target {}", method, ratisHandler); } + try { switch (method.getAnnotation(Replicate.class).invocationType()) { case CLIENT: @@ -143,7 +144,7 @@ private Object invokeRatisClient(Method method, Object[] args) throw response.getException(); } - private static SCMException translateException(Throwable t) { + public static SCMException translateException(Throwable t) { if (t instanceof SCMException) { return (SCMException) t; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java index 689c3aff8ef9..596fe37c4d88 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.hadoop.hdds.scm.AddSCMRequest; import org.apache.hadoop.hdds.scm.RemoveSCMRequest; +import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.exceptions.NotLeaderException; @@ -82,6 +83,9 @@ default T getProxyHandler(ScmInvoker invoker) { default T getProxyHandler(RequestType type, Class intf, T impl, ScmInvoker invoker) { final SCMHAInvocationHandler invocationHandler = new SCMHAInvocationHandler(type, impl, invoker, this); + if (invoker != null) { + return invoker.getProxy(); + } return intf.cast(Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {intf}, invocationHandler)); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ScmInvoker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ScmInvoker.java deleted file mode 100644 index 2bc1202fcd9f..000000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ScmInvoker.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.ha; - -import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; - -/** - * Invokes methods without using reflection. - */ -public interface ScmInvoker { - RequestType getType(); - - Class getApi(); - - T getImpl(); - - Object invokeLocal(String methodName, Object[] args) throws Exception; -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/invoker/DeletedBlockLogStateManagerInvoker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/invoker/DeletedBlockLogStateManagerInvoker.java index 9574ac1c0d38..11a1768f3a9c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/invoker/DeletedBlockLogStateManagerInvoker.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/invoker/DeletedBlockLogStateManagerInvoker.java @@ -17,20 +17,24 @@ package org.apache.hadoop.hdds.scm.ha.invoker; +import com.google.protobuf.ByteString; +import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManager; -import org.apache.hadoop.hdds.scm.ha.ScmInvoker; +import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; import org.apache.hadoop.hdds.utils.db.Table; /** * Invoker for DeletedBlockLogStateManager local (non-@Replicate) methods. */ -public class DeletedBlockLogStateManagerInvoker implements ScmInvoker { +public class DeletedBlockLogStateManagerInvoker extends ScmInvoker { private final DeletedBlockLogStateManager impl; - public DeletedBlockLogStateManagerInvoker(DeletedBlockLogStateManager impl) { + public DeletedBlockLogStateManagerInvoker(DeletedBlockLogStateManager impl, SCMRatisServer scmRatisServer) { + super(scmRatisServer); this.impl = impl; } @@ -49,6 +53,76 @@ public DeletedBlockLogStateManager getImpl() { return impl; } + @Override + protected Class[] getParameterTypes(String methodName) { + switch (methodName) { + case "addTransactionsToDB": + return new Class[] {ArrayList.class, DeletedBlocksTransactionSummary.class}; + + case "removeTransactionsFromDB": + return new Class[] {ArrayList.class, DeletedBlocksTransactionSummary.class}; + + default: + throw new IllegalArgumentException("Unknown method: " + methodName); + } + } + + @Override + public DeletedBlockLogStateManager getProxy() { + return new DeletedBlockLogStateManager() { + @Override + public void addTransactionsToDB(ArrayList txs) + throws IOException { + final Object[] args = {txs}; + invokeRatisServer("addTransactionsToDB", + new Class[] {ArrayList.class}, args); + } + + @Override + public void addTransactionsToDB(ArrayList txs, + DeletedBlocksTransactionSummary summary) throws IOException { + final Object[] args = {txs, summary}; + invokeRatisServer("addTransactionsToDB", + new Class[] {ArrayList.class, + DeletedBlocksTransactionSummary.class}, args); + } + + @Override + public void removeTransactionsFromDB(ArrayList txIDs) + throws IOException { + final Object[] args = {txIDs}; + invokeRatisServer("removeTransactionsFromDB", + new Class[] {ArrayList.class}, args); + } + + @Override + public void removeTransactionsFromDB(ArrayList txIDs, + DeletedBlocksTransactionSummary summary) throws IOException { + final Object[] args = {txIDs, summary}; + invokeRatisServer("removeTransactionsFromDB", + new Class[] {ArrayList.class, + DeletedBlocksTransactionSummary.class}, args); + } + + @Override + public Table.KeyValueIterator getReadOnlyIterator() throws IOException { + return impl.getReadOnlyIterator(); + } + + @Override + public void onFlush() { + impl.onFlush(); + } + + @Override + public void reinitialize( + Table deletedBlocksTXTable, + Table statefulConfigTable) { + impl.reinitialize(deletedBlocksTXTable, statefulConfigTable); + } + }; + } + // Code generated for DeletedBlockLogStateManager. Do not modify. @SuppressWarnings("unchecked") @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/invoker/ScmInvoker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/invoker/ScmInvoker.java new file mode 100644 index 000000000000..dcff65b09629 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/invoker/ScmInvoker.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.ha.invoker; + +import static org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler.translateException; + +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.ha.SCMRatisRequest; +import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse; +import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; + +/** + * Invokes methods without using reflection. + */ +public abstract class ScmInvoker { + static final Object[] NO_ARGS = {}; + + private final SCMRatisServer ratisHandler; + + ScmInvoker(SCMRatisServer ratisHandler) { + this.ratisHandler = ratisHandler; + } + + public abstract RequestType getType(); + + public abstract Class getApi(); + + public abstract T getImpl(); + + public abstract T getProxy(); + + abstract Class[] getParameterTypes(String methodName); + + abstract Object invokeLocal(String methodName, Object[] args) throws Exception; + + Object invokeRatisServer(String methodName, Class[] paramTypes, + Object[] args) throws SCMException { + try { + final SCMRatisRequest request = SCMRatisRequest.of( + getType(), methodName, paramTypes, args); + final SCMRatisResponse response = ratisHandler.submitRequest(request); + if (response.isSuccess()) { + return response.getResult(); + } + throw response.getException(); + } catch (Exception e) { + throw translateException(e); + } + } +}