Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public DeletedBlockLogStateManager build() throws IOException {
final DeletedBlockLogStateManager impl = new DeletedBlockLogStateManagerImpl(
deletedBlocksTransactionTable, statefulServiceConfigTable, containerManager, transactionBuffer);

return scmRatisServer.getProxyHandler(new DeletedBlockLogStateManagerInvoker(impl));
return scmRatisServer.getProxyHandler(new DeletedBlockLogStateManagerInvoker(impl, scmRatisServer));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker;
import org.apache.hadoop.hdds.scm.metadata.Replicate;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,10 +88,8 @@ private Object invokeLocal(Method method, Object[] args)
LOG.trace("Invoking method {} on target {} with arguments {}",
method, localHandler, args);
}
Preconditions.assertNull(invoker, "invoker");
try {
if (invoker != null) {
return invoker.invokeLocal(method.getName(), args);
}
return method.invoke(localHandler, args);
} catch (Exception e) {
throw translateException(e);
Expand All @@ -104,6 +104,7 @@ private Object invokeRatis(Method method, Object[] args)
if (LOG.isTraceEnabled()) {
LOG.trace("Invoking method {} on target {}", method, ratisHandler);
}

try {
switch (method.getAnnotation(Replicate.class).invocationType()) {
case CLIENT:
Expand Down Expand Up @@ -143,7 +144,7 @@ private Object invokeRatisClient(Method method, Object[] args)
throw response.getException();
}

private static SCMException translateException(Throwable t) {
public static SCMException translateException(Throwable t) {
if (t instanceof SCMException) {
return (SCMException) t;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
Expand Down Expand Up @@ -82,6 +83,9 @@ default <T> T getProxyHandler(ScmInvoker<T> invoker) {
default <T> T getProxyHandler(RequestType type, Class<T> intf, T impl, ScmInvoker<T> invoker) {
final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(type, impl, invoker, this);
if (invoker != null) {
return invoker.getProxy();
}
return intf.cast(Proxy.newProxyInstance(getClass().getClassLoader(),
new Class<?>[] {intf}, invocationHandler));
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@

package org.apache.hadoop.hdds.scm.ha.invoker;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManager;
import org.apache.hadoop.hdds.scm.ha.ScmInvoker;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.utils.db.Table;

/**
* Invoker for DeletedBlockLogStateManager local (non-@Replicate) methods.
*/
public class DeletedBlockLogStateManagerInvoker implements ScmInvoker<DeletedBlockLogStateManager> {
public class DeletedBlockLogStateManagerInvoker extends ScmInvoker<DeletedBlockLogStateManager> {
private final DeletedBlockLogStateManager impl;

public DeletedBlockLogStateManagerInvoker(DeletedBlockLogStateManager impl) {
public DeletedBlockLogStateManagerInvoker(DeletedBlockLogStateManager impl, SCMRatisServer scmRatisServer) {
super(scmRatisServer);
this.impl = impl;
}

Expand All @@ -49,6 +53,76 @@ public DeletedBlockLogStateManager getImpl() {
return impl;
}

@Override
protected Class<?>[] getParameterTypes(String methodName) {
switch (methodName) {
case "addTransactionsToDB":
return new Class<?>[] {ArrayList.class, DeletedBlocksTransactionSummary.class};

case "removeTransactionsFromDB":
return new Class<?>[] {ArrayList.class, DeletedBlocksTransactionSummary.class};

default:
throw new IllegalArgumentException("Unknown method: " + methodName);
}
}

@Override
public DeletedBlockLogStateManager getProxy() {
return new DeletedBlockLogStateManager() {
@Override
public void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs)
throws IOException {
final Object[] args = {txs};
invokeRatisServer("addTransactionsToDB",
new Class<?>[] {ArrayList.class}, args);
}

@Override
public void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs,
DeletedBlocksTransactionSummary summary) throws IOException {
final Object[] args = {txs, summary};
invokeRatisServer("addTransactionsToDB",
new Class<?>[] {ArrayList.class,
DeletedBlocksTransactionSummary.class}, args);
}

@Override
public void removeTransactionsFromDB(ArrayList<Long> txIDs)
throws IOException {
final Object[] args = {txIDs};
invokeRatisServer("removeTransactionsFromDB",
new Class<?>[] {ArrayList.class}, args);
}

@Override
public void removeTransactionsFromDB(ArrayList<Long> txIDs,
DeletedBlocksTransactionSummary summary) throws IOException {
final Object[] args = {txIDs, summary};
invokeRatisServer("removeTransactionsFromDB",
new Class<?>[] {ArrayList.class,
DeletedBlocksTransactionSummary.class}, args);
}

@Override
public Table.KeyValueIterator<Long, DeletedBlocksTransaction> getReadOnlyIterator() throws IOException {
return impl.getReadOnlyIterator();
}

@Override
public void onFlush() {
impl.onFlush();
}

@Override
public void reinitialize(
Table<Long, DeletedBlocksTransaction> deletedBlocksTXTable,
Table<String, ByteString> statefulConfigTable) {
impl.reinitialize(deletedBlocksTXTable, statefulConfigTable);
}
};
}

// Code generated for DeletedBlockLogStateManager. Do not modify.
@SuppressWarnings("unchecked")
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.ha.invoker;

import static org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler.translateException;

import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMRatisRequest;
import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;

/**
* Invokes methods without using reflection.
*/
public abstract class ScmInvoker<T> {
static final Object[] NO_ARGS = {};

private final SCMRatisServer ratisHandler;

ScmInvoker(SCMRatisServer ratisHandler) {
this.ratisHandler = ratisHandler;
}

public abstract RequestType getType();

public abstract Class<T> getApi();

public abstract T getImpl();

public abstract T getProxy();

abstract Class<?>[] getParameterTypes(String methodName);

abstract Object invokeLocal(String methodName, Object[] args) throws Exception;

Object invokeRatisServer(String methodName, Class<?>[] paramTypes,
Object[] args) throws SCMException {
try {
final SCMRatisRequest request = SCMRatisRequest.of(
getType(), methodName, paramTypes, args);
final SCMRatisResponse response = ratisHandler.submitRequest(request);
if (response.isSuccess()) {
return response.getResult();
}
throw response.getException();
} catch (Exception e) {
throw translateException(e);
}
}
}