Skip to content

Commit c793b32

Browse files
committed
Added inner transaction impl
1 parent 0a484a8 commit c793b32

File tree

1 file changed

+83
-0
lines changed

1 file changed

+83
-0
lines changed

src/driver/inner_transaction.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use deadpool_postgres::Transaction as dp_Transaction;
2+
use postgres_types::ToSql;
3+
use tokio_postgres::{Portal, Row, ToStatement, Transaction as tp_Transaction};
4+
5+
use crate::exceptions::rust_errors::PSQLPyResult;
6+
7+
pub enum PsqlpyTransaction {
8+
PoolTrans(dp_Transaction<'static>),
9+
SingleConnTrans(tp_Transaction<'static>)
10+
}
11+
12+
impl PsqlpyTransaction {
13+
async fn commit(self) -> PSQLPyResult<()> {
14+
match self {
15+
PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.commit().await?),
16+
PsqlpyTransaction::SingleConnTrans(s_txid) => Ok(s_txid.commit().await?)
17+
}
18+
}
19+
20+
async fn rollback(self) -> PSQLPyResult<()> {
21+
match self {
22+
PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.rollback().await?),
23+
PsqlpyTransaction::SingleConnTrans(s_txid) => Ok(s_txid.rollback().await?)
24+
}
25+
}
26+
27+
async fn savepoint(&mut self, sp_name: &str) -> PSQLPyResult<()> {
28+
match self {
29+
PsqlpyTransaction::PoolTrans(p_txid) => {
30+
p_txid.savepoint(sp_name).await?;
31+
Ok(())
32+
},
33+
PsqlpyTransaction::SingleConnTrans(s_txid) => {
34+
s_txid.savepoint(sp_name).await?;
35+
Ok(())
36+
}
37+
}
38+
}
39+
40+
async fn release_savepoint(&self, sp_name: &str) -> PSQLPyResult<()> {
41+
match self {
42+
PsqlpyTransaction::PoolTrans(p_txid) => {
43+
p_txid.batch_execute(format!("RELEASE SAVEPOINT {sp_name}").as_str()).await?;
44+
Ok(())
45+
},
46+
PsqlpyTransaction::SingleConnTrans(s_txid) => {
47+
s_txid.batch_execute(format!("RELEASE SAVEPOINT {sp_name}").as_str()).await?;
48+
Ok(())
49+
}
50+
}
51+
}
52+
53+
async fn rollback_savepoint(&self, sp_name: &str) -> PSQLPyResult<()> {
54+
match self {
55+
PsqlpyTransaction::PoolTrans(p_txid) => {
56+
p_txid.batch_execute(format!("ROLLBACK TO SAVEPOINT {sp_name}").as_str()).await?;
57+
Ok(())
58+
},
59+
PsqlpyTransaction::SingleConnTrans(s_txid) => {
60+
s_txid.batch_execute(format!("ROLLBACK TO SAVEPOINT {sp_name}").as_str()).await?;
61+
Ok(())
62+
}
63+
}
64+
}
65+
66+
async fn bind<T>(&self, statement: &T, params: &[&(dyn ToSql + Sync)]) -> PSQLPyResult<Portal>
67+
where
68+
T: ?Sized + ToStatement {
69+
match self {
70+
PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.bind(statement, params).await?),
71+
PsqlpyTransaction::SingleConnTrans(s_txid) => Ok(s_txid.bind(statement, params).await?)
72+
}
73+
}
74+
75+
pub async fn query_portal(&self, portal: &Portal, size: i32) -> PSQLPyResult<Vec<Row>> {
76+
match self {
77+
PsqlpyTransaction::PoolTrans(p_txid)
78+
=> Ok(p_txid.query_portal(portal, size).await?),
79+
PsqlpyTransaction::SingleConnTrans(s_txid)
80+
=> Ok(s_txid.query_portal(portal, size).await?)
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)