Skip to content

Commit dbc4c0b

Browse files
committed
init commit
1 parent cef7795 commit dbc4c0b

File tree

4 files changed

+139
-0
lines changed

4 files changed

+139
-0
lines changed

__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from postgres_engine import PostgresEngine

postgres_engine.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import logging
2+
import pandas as pd
3+
import psycopg2
4+
import psycopg2.extras
5+
6+
from retry_decorator import retry
7+
8+
9+
class PostgresEngine:
10+
11+
def __init__(self, databaseName, user, password, host='localhost', port=5432):
12+
self.databaseName = databaseName
13+
self.user = user
14+
self.password = password
15+
self.host = host
16+
self.port = port
17+
self.connection = None
18+
self.cursor = None
19+
20+
def _get_connection(self):
21+
try:
22+
self.connection = psycopg2.connect(user=self.user, password=self.password, host=self.host, port=self.port, database=self.databaseName)
23+
except Exception as ex:
24+
logging.exception(f'Error connecting to PostgreSQL {ex}')
25+
raise ex
26+
27+
def _get_cursor(self, isInsertionQuery):
28+
if isInsertionQuery:
29+
self.cursor = self.connection.cursor()
30+
else:
31+
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
32+
33+
def _close_connection(self):
34+
self.connection.close()
35+
36+
def _close_cursor(self):
37+
self.cursor.close()
38+
39+
def close(self):
40+
if self.connection is not None:
41+
self._close_connection()
42+
if self.cursor is not None:
43+
self._close_cursor()
44+
45+
def create_table(self, schema):
46+
self._get_connection()
47+
self._get_cursor(isInsertionQuery=True)
48+
self.cursor.execute(schema)
49+
try:
50+
self.connection.commit()
51+
except Exception as ex:
52+
logging.exception(f'error: {ex} \nschemaQuery: {schema}')
53+
raise ex
54+
finally:
55+
self.close()
56+
57+
def create_index(self, tableName, column):
58+
self._get_connection()
59+
self._get_cursor(isInsertionQuery=True)
60+
indexQuery = f'CREATE INDEX IF NOT EXISTS {tableName}_{column} ON {tableName}({column});'
61+
self.cursor.execute(indexQuery)
62+
try:
63+
self.connection.commit()
64+
except Exception as ex:
65+
logging.exception(f'error: {ex} \nindexQuery: {indexQuery}')
66+
raise ex
67+
finally:
68+
self.close()
69+
70+
def create_new_foreign_key_constraint(self, tableName, constraintName, foreignKeySQL):
71+
FOREIGN_KEY_QUERY = """
72+
ALTER TABLE {tableName}
73+
ADD CONSTRAINT {constraintName} {foreignKeySQL};
74+
"""
75+
self.create_table(
76+
schema=FOREIGN_KEY_QUERY.format(
77+
tableName=tableName,
78+
constraintName=constraintName,
79+
foreignKeySQL=foreignKeySQL
80+
)
81+
)
82+
83+
@retry(numRetries=5, retryDelay=3, backoffScalingFactor=2)
84+
def run_select_query(self, query, parameters=None):
85+
self._get_connection()
86+
self._get_cursor(isInsertionQuery=False)
87+
self.cursor.execute(query, parameters)
88+
outputs = self.cursor.fetchall()
89+
self.close()
90+
outputDataframe = pd.DataFrame(outputs)
91+
return outputDataframe.where(outputDataframe.notnull(), None).dropna(axis=0, how='all')
92+
93+
@retry(numRetries=5, retryDelay=3, backoffScalingFactor=2)
94+
def run_update_query(self, query, parameters=None, returnId=True):
95+
self._get_connection()
96+
self._get_cursor(isInsertionQuery=True)
97+
if returnId:
98+
query = f'{query}\nRETURNING id'
99+
self.cursor.execute(query, parameters)
100+
if returnId:
101+
insertedId = self.cursor.fetchone()[0]
102+
else:
103+
insertedId = None
104+
try:
105+
self.connection.commit()
106+
except Exception as ex:
107+
logging.exception(f'error: {ex} \nquery: {query} \nparameters: {parameters}')
108+
raise ex
109+
finally:
110+
self.close()
111+
return insertedId

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pandas==0.25.3
2+
psycopg2-binary==2.8.4

retry_decorator.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from functools import wraps
2+
from time import sleep
3+
4+
5+
def retry(numRetries=5, retryDelay=3, backoffScalingFactor=2, logger=None):
6+
7+
def retry_decorator(func):
8+
@wraps(func)
9+
def retry_function(*args, **kwargs):
10+
numTries, currentDelay = numRetries, retryDelay
11+
while numTries > 1:
12+
try:
13+
return func(*args, **kwargs)
14+
except Exception as ex:
15+
exceptionMessage = f'{ex}, Retrying in {currentDelay} seconds...'
16+
if logger:
17+
logger.warning(exceptionMessage)
18+
else:
19+
print(exceptionMessage)
20+
sleep(currentDelay)
21+
numTries -= 1
22+
currentDelay *= backoffScalingFactor
23+
return func(*args, **kwargs)
24+
return retry_function
25+
return retry_decorator

0 commit comments

Comments
 (0)