Skip to content

Commit 8f02724

Browse files
Merge pull request #54 from oracle/savepoints
Support Savepoints
2 parents d61d8c4 + f803307 commit 8f02724

File tree

2 files changed

+22
-25
lines changed

2 files changed

+22
-25
lines changed

src/main/java/oracle/r2dbc/impl/OracleConnectionImpl.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727
import io.r2dbc.spi.IsolationLevel;
2828
import io.r2dbc.spi.Lifecycle;
2929
import io.r2dbc.spi.R2dbcException;
30+
import io.r2dbc.spi.Result;
3031
import io.r2dbc.spi.Statement;
3132
import io.r2dbc.spi.TransactionDefinition;
3233
import io.r2dbc.spi.ValidationDepth;
3334
import org.reactivestreams.Publisher;
35+
import reactor.core.publisher.Flux;
3436
import reactor.core.publisher.Mono;
3537

3638
import java.sql.SQLException;
39+
import java.sql.Savepoint;
3740
import java.time.Duration;
3841

3942
import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED;
@@ -474,21 +477,23 @@ public ConnectionMetadata getMetadata() {
474477
/**
475478
* {@inheritDoc}
476479
* <p>
477-
* This SPI method is not yet implemented.
480+
* This SPI method is implemented to execute a "SAVEPOINT ..." command.
481+
* This is the same as how Oracle JDBC implements
482+
* {@link java.sql.Connection#setSavepoint(String)}, except that JDBC uses a
483+
* blocking call to {@code java.sql.Statement.executeUpdate(String)}. This
484+
* method uses a non-blocking call.
478485
* </p>
479-
* @throws UnsupportedOperationException In this release of Oracle
480-
* R2DBC
481486
* @throws IllegalStateException If this {@code Connection} is closed
482487
*/
483488
@Override
484489
public Publisher<Void> createSavepoint(String name) {
485490
requireNonNull(name, "name is null");
486491
requireOpenConnection(jdbcConnection);
487-
// TODO: Execute SQL to create a savepoint. Examine and understand the
488-
// Oracle JDBC driver's implementation of
489-
// OracleConnection.oracleSetSavepoint(), and replicate it without
490-
// blocking a thread. Consider adding a ReactiveJDBCAdapter API to do this.
491-
throw new UnsupportedOperationException("createSavepoint not supported");
492+
return Mono.from(setAutoCommit(false))
493+
.then(Flux.from(createStatement("SAVEPOINT " + name)
494+
.execute())
495+
.flatMap(Result::getRowsUpdated)
496+
.then());
492497
}
493498

494499
/**
@@ -534,20 +539,22 @@ public Publisher<Void> rollbackTransaction() {
534539
/**
535540
* {@inheritDoc}
536541
* <p>
537-
* This SPI method is not yet implemented.
542+
* This SPI method is implemented to execute a "ROLLBACK TO " command. This
543+
* is the same as how Oracle JDBC implements
544+
* {@link java.sql.Connection#rollback(Savepoint)}, except that JDBC uses a
545+
* blocking call to {@code java.sql.Statement.executeUpdate(String)}. This
546+
* method uses a non-blocking call.
538547
* </p>
539548
* @throws IllegalStateException If this {@code Connection} is closed
540-
* @throws UnsupportedOperationException In version this release of Oracle
541-
* R2DBC
542549
*/
543550
@Override
544551
public Publisher<Void> rollbackTransactionToSavepoint(String name) {
545552
requireNonNull(name, "name is null");
546553
requireOpenConnection(jdbcConnection);
547-
// TODO: Use the JDBC connection to rollback to a savepoint without blocking
548-
// a thread.
549-
throw new UnsupportedOperationException(
550-
"rollbackTransactionToSavepoint not supported");
554+
return Flux.from(createStatement("ROLLBACK TO " + name)
555+
.execute())
556+
.flatMap(Result::getRowsUpdated)
557+
.then();
551558
}
552559

553560
/**

src/test/java/oracle/r2dbc/test/OracleTestKit.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -297,16 +297,6 @@ public void prepareStatement() {
297297
@Override
298298
public void compoundStatement() {}
299299

300-
@Disabled("Disabled until savepoint is implemented")
301-
@Test
302-
@Override
303-
public void savePoint() {}
304-
305-
@Disabled("Disabled until savepoint is implemented")
306-
@Test
307-
@Override
308-
public void savePointStartsTransaction() {}
309-
310300
static <T> Mono<T> close(Connection connection) {
311301
return Mono.from(connection.close())
312302
.then(Mono.empty());

0 commit comments

Comments
 (0)