Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
Expand Down Expand Up @@ -952,22 +952,12 @@ private class GridDummySpiContext implements IgniteSpiContext {

/** {@inheritDoc} */
@Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
Ignite ignite0 = ignite;

if (!(ignite0 instanceof IgniteKernal))
throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);

((IgniteEx)ignite0).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
}

/** {@inheritDoc} */
@Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
Ignite ignite0 = ignite;

if (!(ignite0 instanceof IgniteKernal))
throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);

((IgniteEx)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -1004,5 +994,15 @@ private class GridDummySpiContext implements IgniteSpiContext {
@Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
// No-op.
}

/** */
private GridKernalContext context() {
Ignite ignite0 = ignite;

if (ignite0 == null)
throw new IgniteSpiException(isStopping() ? "The node is stopping" : "The node is not yet started");

return ((IgniteEx)ignite0).context();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7460,10 +7460,13 @@
* @param clientMsgWrk Client message worker to start.
* @return Whether connection was successful.
* @throws IOException If IO failed.
* @throws IgniteCheckedException If node is not yet initialized or is stopping.
*/
@SuppressWarnings({"IfMayBeConditional"})
private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg,
@Nullable ClientMessageWorker clientMsgWrk) throws IOException {
private boolean processJoinRequestMessage(

Check failure on line 7466 in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 23 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZzX7FH8LqjZ_YRjUynH&open=AZzX7FH8LqjZ_YRjUynH&pullRequest=12869
TcpDiscoveryJoinRequestMessage msg,
@Nullable ClientMessageWorker clientMsgWrk
) throws IOException, IgniteCheckedException {
assert msg != null;
assert !msg.responded();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,9 +1573,12 @@
* @return Opened socket.
* @throws IOException If failed.
* @throws IgniteSpiOperationTimeoutException In case of timeout.
* @throws IgniteCheckedException If node is not yet initialized or is stopping.
*/
protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IOException, IgniteSpiOperationTimeoutException {
protected Socket openSocket(
InetSocketAddress sockAddr,
IgniteSpiOperationTimeoutHelper timeoutHelper
) throws IOException, IgniteSpiOperationTimeoutException, IgniteCheckedException {

Check warning on line 1581 in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the declaration of thrown exception 'org.apache.ignite.spi.IgniteSpiOperationTimeoutException' which is a subclass of 'org.apache.ignite.IgniteCheckedException'.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZzX7FTLLqjZ_YRjUynI&open=AZzX7FTLLqjZ_YRjUynI&pullRequest=12869
return openSocket(createSocket(), sockAddr, timeoutHelper);
}

Expand All @@ -1588,10 +1591,13 @@
* @return Connected socket.
* @throws IOException If failed.
* @throws IgniteSpiOperationTimeoutException In case of timeout.
* @throws IgniteCheckedException If node is not yet initialized or is stopping.
*/
protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IOException, IgniteSpiOperationTimeoutException {

protected Socket openSocket(
Socket sock,
InetSocketAddress remAddr,
IgniteSpiOperationTimeoutHelper timeoutHelper
) throws IOException, IgniteSpiOperationTimeoutException, IgniteCheckedException {

Check warning on line 1600 in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the declaration of thrown exception 'org.apache.ignite.spi.IgniteSpiOperationTimeoutException' which is a subclass of 'org.apache.ignite.IgniteCheckedException'.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZzX7FTLLqjZ_YRjUynJ&open=AZzX7FTLLqjZ_YRjUynJ&pullRequest=12869
assert remAddr != null;

try {
Expand All @@ -1608,7 +1614,7 @@

return sock;
}
catch (IOException | IgniteSpiOperationTimeoutException e) {
catch (IOException | IgniteCheckedException e) {
if (sock != null)
U.closeQuiet(sock);

Expand Down Expand Up @@ -1686,8 +1692,14 @@
* @param data Raw data to write.
* @param timeout Socket write timeout.
* @throws IOException If IO failed or write timed out.
* @throws IgniteCheckedException If node is not yet initialized or is stopping.
*/
protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, long timeout) throws IOException {
protected void writeToSocket(
Socket sock,
TcpDiscoveryAbstractMessage msg,
byte[] data,
long timeout
) throws IOException, IgniteCheckedException {
assert sock != null;
assert data != null;

Expand Down Expand Up @@ -1771,9 +1783,14 @@
* @param res Integer response.
* @param timeout Socket timeout.
* @throws IOException If IO failed or write timed out.
* @throws IgniteCheckedException If node is not yet initialized or is stopping.
*/
protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
throws IOException {
protected void writeToSocket(
TcpDiscoveryAbstractMessage msg,
Socket sock,
int res,
long timeout
) throws IOException, IgniteCheckedException {
assert sock != null;

try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
Expand All @@ -1799,8 +1816,7 @@
* @throws IOException If IO failed or read timed out.
* @throws IgniteCheckedException If unmarshalling failed.
*/
protected <T> T readMessage(TcpDiscoveryIoSession ses, long timeout) throws IOException,
IgniteCheckedException {
protected <T> T readMessage(TcpDiscoveryIoSession ses, long timeout) throws IOException, IgniteCheckedException {
Socket sock = ses.socket();

assert sock != null;
Expand Down Expand Up @@ -2445,12 +2461,17 @@
}

/** Starts a timer for a socket operation. */
private SocketTimeoutObject startTimer(Socket sock, long timeout) {
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
private SocketTimeoutObject startTimer(Socket sock, long timeout) throws IgniteCheckedException {
try {
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);

addTimeoutObject(obj);
addTimeoutObject(obj);

return obj;
return obj;
}
catch (IgniteSpiException e) {
throw new IgniteCheckedException("Failed to perform socket operation", e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
Expand Down Expand Up @@ -354,35 +353,49 @@ private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tc
*/
private class DiscoverySpi extends TcpDiscoverySpi {
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
long timeout) throws IOException {
@Override protected void writeToSocket(
Socket sock,
TcpDiscoveryAbstractMessage msg,
byte[] data,
long timeout
) throws IOException, IgniteCheckedException {
if (blockAll || block && sock.getPort() == 47500)
throw new SocketException("Test discovery exception");

super.writeToSocket(sock, msg, data, timeout);
}

/** {@inheritDoc} */
@Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
@Override protected void writeMessage(
TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout
) throws IOException, IgniteCheckedException {
if (blockAll || block && ses.socket().getPort() == 47500)
throw new SocketException("Test discovery exception");

super.writeMessage(ses, msg, timeout);
}

/** {@inheritDoc} */
@Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
long timeout) throws IOException {
@Override protected void writeToSocket(
TcpDiscoveryAbstractMessage msg,
Socket sock,
int res,
long timeout
) throws IOException, IgniteCheckedException {
if (blockAll || block && sock.getPort() == 47500)
throw new SocketException("Test discovery exception");

super.writeToSocket(msg, sock, res, timeout);
}

/** {@inheritDoc} */
@Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
@Override protected Socket openSocket(
Socket sock,
InetSocketAddress remAddr,
IgniteSpiOperationTimeoutHelper timeoutHelper
) throws IOException, IgniteCheckedException {
if (blockAll || block && sock.getPort() == 47500)
throw new SocketException("Test discovery exception");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,12 @@ public void testRecoveryOnDisconnect() throws Exception {
*/
private class FailDiscoverySpi extends TcpDiscoverySpi {
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
long timeout) throws IOException {
@Override protected void writeToSocket(
Socket sock,
TcpDiscoveryAbstractMessage msg,
byte[] data,
long timeout
) throws IOException, IgniteCheckedException {
assertNotFailedNode(sock);

if (isDrop(msg))
Expand Down Expand Up @@ -336,8 +340,12 @@ private boolean isDrop(TcpDiscoveryAbstractMessage msg) {
}

/** {@inheritDoc} */
@Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
long timeout) throws IOException {
@Override protected void writeToSocket(
TcpDiscoveryAbstractMessage msg,
Socket sock,
int res,
long timeout
) throws IOException, IgniteCheckedException {
assertNotFailedNode(sock);

if (isDrop(msg))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
Expand Down Expand Up @@ -208,8 +207,11 @@ protected void checkSegmented(InetSocketAddress sockAddr, long timeout) throws S
}

/** {@inheritDoc} */
@Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
@Override protected Socket openSocket(
Socket sock,
InetSocketAddress remAddr,
IgniteSpiOperationTimeoutHelper timeoutHelper
) throws IOException, IgniteCheckedException {
checkSegmented(remAddr, timeoutHelper.nextTimeoutChunk(getSocketTimeout()));

return super.openSocket(sock, remAddr, timeoutHelper);
Expand All @@ -221,7 +223,7 @@ protected void checkSegmented(InetSocketAddress sockAddr, long timeout) throws S
TcpDiscoveryAbstractMessage msg,
byte[] data,
long timeout
) throws IOException {
) throws IOException, IgniteCheckedException {
checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);

super.writeToSocket(sock, msg, data, timeout);
Expand All @@ -237,8 +239,12 @@ protected void checkSegmented(InetSocketAddress sockAddr, long timeout) throws S
}

/** {@inheritDoc} */
@Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
long timeout) throws IOException {
@Override protected void writeToSocket(
TcpDiscoveryAbstractMessage msg,
Socket sock,
int res,
long timeout
) throws IOException, IgniteCheckedException {
checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);

super.writeToSocket(msg, sock, res, timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ public void testNodeStopOnDiscoverySpiFailTest() throws Exception {

// Discovery spi that never allows connecting.
TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi() {
@Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) throws IOException {
@Override protected void writeToSocket(
TcpDiscoveryAbstractMessage msg,
Socket sock,
int res,
long timeout
) throws IOException, IgniteCheckedException {
try {
// Wait until request is added to rest processor.
latch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
Expand Down Expand Up @@ -183,7 +182,7 @@ private class TestDiscoverySpi extends TcpDiscoverySpi {
@Override protected Socket openSocket(
InetSocketAddress sockAddr,
IgniteSpiOperationTimeoutHelper timeoutHelper
) throws IOException, IgniteSpiOperationTimeoutException {
) throws IOException, IgniteCheckedException {
awaitLatch();

return super.openSocket(sockAddr, timeoutHelper);
Expand All @@ -194,14 +193,19 @@ private class TestDiscoverySpi extends TcpDiscoverySpi {
Socket sock,
InetSocketAddress remAddr,
IgniteSpiOperationTimeoutHelper timeoutHelper
) throws IOException, IgniteSpiOperationTimeoutException {
) throws IOException, IgniteCheckedException {
awaitLatch();

return super.openSocket(sock, remAddr, timeoutHelper);
}

/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, long timeout) throws IOException {
@Override protected void writeToSocket(
Socket sock,
TcpDiscoveryAbstractMessage msg,
byte[] data,
long timeout
) throws IOException, IgniteCheckedException {
awaitLatch();

super.writeToSocket(sock, msg, data, timeout);
Expand All @@ -219,7 +223,12 @@ private class TestDiscoverySpi extends TcpDiscoverySpi {
}

/** {@inheritDoc} */
@Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) throws IOException {
@Override protected void writeToSocket(
TcpDiscoveryAbstractMessage msg,
Socket sock,
int res,
long timeout
) throws IOException, IgniteCheckedException {
awaitLatch();

super.writeToSocket(msg, sock, res, timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private synchronized void apply(ClusterNode addr, TcpDiscoveryAbstractMessage ms
TcpDiscoveryAbstractMessage msg,
byte[] data,
long timeout
) throws IOException {
) throws IOException, IgniteCheckedException {
if (spiCtx != null)
apply(spiCtx.localNode(), msg);

Expand Down
Loading
Loading