-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbase.py
242 lines (191 loc) · 7.86 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
"""
base.py :
Generated with Automatron
"""
__author__ = "Mohit Kumar"
__credits__ = ["Mohit Kumar"]
__version__ = "1.0.0"
__maintainer__ = "Mohit Kumar"
__email__ = "[email protected]"
__status__ = "Production"
import os
import sys
import threading
import logging
import argparse
from datetime import datetime
import pyfiglet
import yaml
PROCESSOR_THREADS = 8
CONFIG_FILE_NAME = 'config.yaml'
CONFIG_FILE_DATA = None
LOGS_DIR = 'logs'
INPUT_DIR = 'input'
OUTPUT_DIR = 'output'
logging.basicConfig(level=logging.DEBUG)
#################################################################################
# Private Methods ###############################################################
#################################################################################
def _init_command_line_arguments():
"""Initializing argument parser for command line arguments"""
global args
argument_parser = argparse.ArgumentParser()
argument_parser.add_argument('-i', '--input', help='Name of input file.')
argument_parser.add_argument('-o', '--output', help='Name of output file.')
argument_parser.add_argument('-c', '--config', help='Name of external configuration file')
argument_parser.add_argument('-t', '--threads', type=int, default=1, help='No. of threads for processor.')
args = argument_parser.parse_args()
################################################################################
def _load_configurations():
"""Loading configurations from config file"""
global CONFIG_FILE_DATA
global CONFIG_FILE_NAME
if args.config:
CONFIG_FILE_NAME = args.config
logger.info(f"External configurations provided : {CONFIG_FILE_NAME}")
if CONFIG_FILE_NAME and not os.path.exists(CONFIG_FILE_NAME):
logger.info(f"No external configuration file found with name : {CONFIG_FILE_NAME}")
return
with open(CONFIG_FILE_NAME) as config_file:
CONFIG_FILE_DATA = yaml.full_load(config_file)
logger.info(f"Loaded data from configuration file : {CONFIG_FILE_NAME}")
#################################################################################
def _print_header():
"""Printing the header information"""
automatron_text_art = pyfiglet.figlet_format("Automatron")
print(automatron_text_art)
print("#"*80)
print(f"Script : {__file__}")
print(f"Command line params : {args}")
print("#"*80)
print("")
#################################################################################
def _init_logger():
"""Initializing the logger"""
global logger
logger = logging.getLogger(__name__)
absolute_log_file_name = "_".join([
os.path.join(LOGS_DIR,os.path.basename(__file__).split('.')[0]),
datetime.now().strftime(format="%Y%m%d%H%M%S%f")
]) + '.log'
# Creating handlers
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler(absolute_log_file_name)
console_handler.setLevel(logging.DEBUG)
file_handler.setLevel(logging.DEBUG)
# Creating formatters
console_format = logging.Formatter("[ %(asctime)s ] - %(threadName)s - %(levelname)s - %(message)s")
file_format = logging.Formatter("[ %(asctime)s ] - %(threadName)s - %(levelname)s - %(message)s")
console_handler.setFormatter(console_format)
file_handler.setFormatter(file_format)
# Adding handlers to logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)
#################################################################################
def _init_directories():
"""Initializing directories"""
if not os.path.isdir(LOGS_DIR):
os.mkdir(LOGS_DIR)
if not os.path.isdir(INPUT_DIR):
os.mkdir(INPUT_DIR)
if not os.path.isdir(OUTPUT_DIR):
os.mkdir(OUTPUT_DIR)
#################################################################################
def _create_threads():
processor_threads = []
for i in range(PROCESSOR_THREADS):
processor_threads.append( threading.Thread( target=process, args=() ) )
processor_threads[i].start()
return processor_threads
#################################################################################
def _wait_for_threads(processor_threads):
results = []
for i in range(PROCESSOR_THREADS):
results.append(processor_threads[i].join())
return results
#################################################################################
# Utility Methods ###############################################################
#################################################################################
def print_exec_time(func):
"""Decorator that reports the execution time"""
def wrap(*args, **kwargs):
start_time = datetime.now()
result = func(*args, **kwargs)
end_time = datetime.now()
logger.info(f"{func.__name__} completed in [ {str(end_time-start_time)} ]")
return result
return wrap
#################################################################################
def print_thread_info(func):
"""Decorator that reports the execution time"""
def wrap(*args, **kwargs):
logger.debug(f"Started Thread : PID [ {threading.get_ident()} ] | {threading.current_thread().name}")
result = func(*args, **kwargs)
end_time = datetime.now()
logger.debug(f"Finished Thread : PID [ {threading.get_ident()} ] | {threading.current_thread().name}")
return result
return wrap
#################################################################################
def handle_exceptions(func):
"""Decorator that handle exception for safe batch processing"""
def wrap(*args, **kwargs):
result = None
try:
result = func(*args, **kwargs)
except Exception as ex:
logger.error(f"Exception occurred while processing : {ex}")
return result
return wrap
#################################################################################
# Core Methods ##################################################################
#################################################################################
@handle_exceptions
@print_exec_time
def Initializer():
"""Initializer : To be used for initializing data before processing"""
global PROCESSOR_THREADS
PROCESSOR_THREADS = 1 if args.threads == None else args.threads
logger.debug(f"Configured No. of processor threads : {args.threads}")
#################################################################################
@print_thread_info
def process():
"""Processor : To put main processing logic"""
pass
#################################################################################
@handle_exceptions
@print_exec_time
def Executor():
"""Executor : To execute the process in multi-threading mode"""
processor_threads = _create_threads()
results = _wait_for_threads(processor_threads)
logger.info(f"Results from all threads : {results}")
#################################################################################
@handle_exceptions
@print_exec_time
def Finalizer():
"""
Finalizer :
To be used for closing database connections, performing clean-ups and post process activities.
"""
pass
#################################################################################
# M A I N #######################################################################
#################################################################################
if __name__ == "__main__":
# Pre Steps
_init_command_line_arguments()
_init_directories()
_init_logger()
_print_header()
_load_configurations()
# Started batch job
start_time = datetime.now()
# Running initializations
Initializer()
# Executing core functionality
Executor()
# Finalizing
Finalizer()
# Finished batch job
end_time = datetime.now()
logger.info(f"Batch job finished! Total time taken : [ {str(end_time-start_time)} ]")