forked from anubia/py_pg_tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalterer.py
180 lines (145 loc) · 6.8 KB
/
alterer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
from casting.casting import Casting
from connecter import Connecter
from const.const import Messenger as Msg
from const.const import Queries
from date_tools.date_tools import DateTools
from logger.logger import Logger
class Alterer:
in_dbs = [] # List of databases to be included in the process
old_role = '' # Current owner of the database's tables
new_role = '' # New owner for the database and its tables
# An object with connection parameters to connect to PostgreSQL
connecter = None
logger = None # Logger to show and log some messages
def __init__(self, connecter=None, in_dbs=[], old_role='', new_role='',
logger=None):
if logger:
self.logger = logger
else:
self.logger = Logger()
if connecter:
self.connecter = connecter
else:
self.logger.stop_exe(Msg.NO_CONNECTION_PARAMS)
if isinstance(in_dbs, list):
self.in_dbs = in_dbs
else:
self.in_dbs = Casting.str_to_list(in_dbs)
if old_role:
self.old_role = old_role
else:
self.logger.stop_exe(Msg.NO_OLD_ROLE)
if not new_role:
self.logger.stop_exe(Msg.NO_NEW_ROLE)
# First check whether the user exists in PostgreSQL or not
self.connecter.cursor.execute(Queries.PG_USER_EXISTS, (new_role, ))
# Do not alter database if the user does not exist
result = self.connecter.cursor.fetchone()
if result:
self.new_role = new_role
else:
msg = Msg.USER_DOES_NOT_EXIST.format(user=new_role)
self.logger.stop_exe(msg)
msg = Msg.ALTERER_VARS.format(
server=self.connecter.server, user=self.connecter.user,
port=self.connecter.port, in_dbs=self.in_dbs,
old_role=self.old_role, new_role=self.new_role)
self.logger.debug(Msg.ALTERER_VARS_INTRO)
self.logger.debug(msg)
def alter_db_owner(self, db):
'''
Target:
- change the owner of a databases and its tables.
Parameters:
- db: database which is going to be altered.
Return:
- a boolean which indicates the success of the process.
'''
msg = Msg.ALTERER_FEEDBACK.format(old_role=self.old_role,
new_role=self.new_role)
self.logger.info(msg)
success = True
dbname = db['datname']
if db['owner'] != 'postgres': # Do not allow switch an owner postgres
if db['datallowconn'] == 1: # Check if the db allows connections
try:
# Change the owner of the database
self.connecter.cursor.execute(
Queries.CHANGE_PG_DB_OWNER.format(
dbname=dbname, new_role=self.new_role))
except Exception as e:
success = False
self.logger.debug('Error en la función "alter_db_owner": '
'{}'.format(str(e)))
msg = Msg.CHANGE_PG_DB_OWNER_FAIL
self.logger.highlight('warning', msg, 'yellow')
# Start another connection to the target database to be able to
# apply the next query
own_connecter = Connecter(server=self.connecter.server,
user=self.connecter.user,
port=self.connecter.port,
database=dbname, logger=self.logger)
# Disallow connections to the database during the process
result = self.connecter.disallow_db_conn(dbname)
if not result:
msg = Msg.DISALLOW_CONN_TO_PG_DB_FAIL.format(dbname=dbname)
self.logger.highlight('warning', msg, 'yellow')
try:
# Change the owner of the database's tables
own_connecter.cursor.execute(
Queries.REASSIGN_PG_DB_TBLS_OWNER.format(
old_role=self.old_role, new_role=self.new_role))
except Exception as e:
success = False
self.logger.debug('Error en la función "alter_db_owner": '
'{}'.format(str(e)))
msg = Msg.REASSIGN_PG_DB_TBLS_OWNER_FAIL
self.logger.highlight('warning', msg, 'yellow')
# Allow connections to the database at the end of the process
result = self.connecter.allow_db_conn(dbname)
if not result:
msg = Msg.ALLOW_CONN_TO_PG_DB_FAIL.format(dbname=dbname)
self.logger.highlight('warning', msg, 'yellow')
# Close cursor and connection to the target database
own_connecter.pg_disconnect()
else:
success = False
msg = Msg.DB_DOES_NOT_ALLOW_CONN.format(dbname=dbname)
self.logger.highlight('warning', msg, 'yellow')
else:
success = False
msg = Msg.DB_OWNED_BY_POSTGRES_NOT_ALLOWED
self.logger.highlight('warning', msg, 'yellow')
return success
def alter_dbs_owner(self, alt_list):
'''
Target:
- change the owner of a group of databases and their tables.
Parameters:
- alt_list: names of the databases which are going to be altered.
'''
self.logger.highlight('info', Msg.PROCESSING_ALTERER, 'white')
if alt_list:
for db in alt_list:
dbname = db['datname']
msg = Msg.PROCESSING_DB.format(dbname=dbname)
self.logger.highlight('info', msg, 'cyan')
start_time = DateTools.get_current_datetime()
# Change the owner of the database
success = self.alter_db_owner(db)
end_time = DateTools.get_current_datetime()
# Get and show the process' duration
diff = DateTools.get_diff_datetimes(start_time, end_time)
if success:
msg = Msg.DB_ALTERER_DONE.format(dbname=dbname, diff=diff)
self.logger.highlight('info', msg, 'green')
else:
msg = Msg.DB_ALTERER_FAIL.format(dbname=dbname)
self.logger.highlight('warning', msg, 'yellow',
effect='bold')
else:
self.logger.highlight('warning', Msg.ALTERER_HAS_NOTHING_TO_DO,
'yellow', effect='bold')
self.logger.highlight('info', Msg.ALTERER_DONE, 'green', effect='bold')