Skip to content

Commit

Permalink
Merge pull request #78 from supriyopaul/bug_fixes
Browse files Browse the repository at this point in the history
Bug fixes
  • Loading branch information
supriyopaul authored Apr 5, 2018
2 parents 64d21d6 + 80fe9d7 commit 271a306
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 40 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ deploy:
skip_cleanup: true
api-key:
secure: Rxl45qbTHWIbOhst3PS60ETfW5wDByxp0xv4ZbtgRGe4SPvHtOLHRNGiajsQX37pgUFF9ALcCseY2cTk46jNEA1jOzFx4DDSKyH+Wu4H5F4M8JDBBlIsvsgezumLsYMqOL18caZA8J84N9UyuzgdPBDb0B0mMclRa9xRaxWncrUZgXwW9r3N2zU1LvGtd0Su4zLXXP6HC6mKHdOOaNSDONqaesx1njYTGr5fbWy7IXrjSg75wWCtHW1dKDPXmyyWZomwpmhURYfYXn/o9lRaXSDpLWx4xTsbJQdG9EiSPm5fLjfv9tZTxIF7jB0tTrOB63gGAgrLu0zC5Z5MJ1Y0+sbotI8eySI4w0GTffhi4WQjTTyO02vgPuSCm9JV5aW+YeNJtSncEgaVgsuUmZUiWdqMsvPG+bqOjh/i0eIkHr/v7cyf3HndFieZH9H3XdlEDtyr4SRExQSjG+be6mcGOJMWMrXervcW6kGP3pcX7EWgrFxnkz9lSgx/0meNMP4JDo8pZWg50b0xpni3zUcweTgCIeYUBd5aIKUvPaCqSHC1BAyZI5z3Cvdlq0tjCS726drQcV4OJNjrnmb301/K6MBbXhAsyhbkB1NpUZ0k0ZwmGxQ7iE4N1pod2BQbTPxjNUL1KNQJXFvjr9Clrw9Arqo6X9S9t//GP2DDl5Ke5KQ=
name: logagg-0.3.0
tag_name: 0.3.0
name: logagg-0.3.1
tag_name: 0.3.1
on:
branch: master
repo: deep-compute/logagg
Expand Down
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ FROM ubuntu:16.04
RUN apt-get -y update

WORKDIR /logagg

ADD . /logagg

RUN apt-get update
RUN apt-get install python-pip -y
RUN pip install --upgrade pip
RUN pip install .

RUN easy_install https://github.com/deep-compute/pygtail/tarball/master/#egg=pygtail-0.6.1
Expand Down
30 changes: 19 additions & 11 deletions logagg/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,21 @@ def collect_log_lines(self, log_file):
freader = Pygtail(fpath)
for line_info in freader:
line = line_info['line'][:-1] # remove new line char at the end

# assign default values
log = dict(
id=None,
file=fpath,
host=self.HOST,
formatter=L['formatter'],
event='event',
data={},
raw=line,
timestamp=datetime.datetime.utcnow().isoformat(),
type='log',
level='debug',
error= False,
error_tb='',
)

try:
Expand All @@ -108,15 +112,17 @@ def collect_log_lines(self, log_file):
_log = util.load_object(formatter)(raw_log)

log.update(_log)
if log['id'] == None:
log['id'] = uuid.uuid1().hex
log = self._remove_redundancy(log)
self.validate_log_format(log)
except (SystemExit, KeyboardInterrupt) as e: raise
except:
self.log.exception('Error during handling log line', log=log)
log['error'] = True
log['error_tb'] = traceback.format_exc()
self.log.exception('error_during_handling_log_line', log=log['raw'])

if log['id'] == None:
log['id'] = uuid.uuid1().hex

log = self._remove_redundancy(log)
self.validate_log_format(log)

self.queue.put(dict(log=json.dumps(log),
freader=freader, line_info=line_info))
Expand Down Expand Up @@ -144,7 +150,7 @@ def _get_msgs_from_queue(self, msgs, timeout):
_msgs_nbytes = msgs_nbytes + len(msg['log'])
_msgs_nbytes += 1 # for newline char

if _msgs_nbytes > self.MAX_NBYTES_TO_SEND:
if _msgs_nbytes > self.MAX_NBYTES_TO_SEND:
msgs_pending.append(msg)
self.log.debug('msg_bytes_read_mem_queue_exceeded')
break
Expand Down Expand Up @@ -216,7 +222,7 @@ def confirm_success(self, msgs):
ack_fnames.add(fname)
freader.update_offset_file(msg['line_info'])

@keeprunning(SCAN_FPATTERNS_INTERVAL,on_error=util.log_exception)
@keeprunning(SCAN_FPATTERNS_INTERVAL, on_error=util.log_exception)
def _scan_fpatterns(self, state):
'''
fpaths = 'file=/var/log/nginx/access.log:formatter=logagg.formatters.nginx_access'
Expand Down Expand Up @@ -246,9 +252,9 @@ def _scan_fpatterns(self, state):
log_key = (fpath, fpattern, formatter)
if log_key not in self.log_reader_threads:
self.log.info('starting_collect_log_lines_thread', log_key=log_key)
#self.collect_log_lines(log_f)
# There is no existing thread tracking this log file. Start one
self.log_reader_threads[log_key] = util.start_daemon_thread(self.collect_log_lines, (log_f,))
log_reader_thread = util.start_daemon_thread(self.collect_log_lines, (log_f,))
self.log_reader_threads[log_key] = log_reader_thread
state.files_tracked.append(fpath)
time.sleep(self.SCAN_FPATTERNS_INTERVAL)

Expand All @@ -265,12 +271,14 @@ def send_heartbeat(self, state):

def start(self):
state = AttrDict(files_tracked=list())
#self._scan_fpatterns(state)
util.start_daemon_thread(self._scan_fpatterns, (state,))

state = AttrDict(last_push_ts=time.time())
util.start_daemon_thread(self.send_to_nsq, (state,))

state = AttrDict(heartbeat_number=0)
util.start_daemon_thread(self.send_heartbeat, (state,)).join()
th_heartbeat = util.start_daemon_thread(self.send_heartbeat, (state,))

while True:
th_heartbeat.join(1)
if not th_heartbeat.isAlive(): break
16 changes: 7 additions & 9 deletions logagg/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class RawLog(dict): pass

#FIXME: cannot do both returns .. should it?
def docker_log_file_driver(line):
def docker_file_log_driver(line):
log = json.loads(json.loads(line)['msg'])
if 'formatter' in log.get('extra'):
return RawLog(dict(formatter=log.get('extra').get('formatter'),
Expand Down Expand Up @@ -38,7 +38,7 @@ def nginx_access(line):
u'status': u'200',
u'timestamp': '2018-01-05T09:31:39.201000',
u'upstream_response_time': 0.0},
'event': u'GET_request',
'event': 'nginx_event',
'timestamp': '2018-01-05T09:31:39.201000',
'type': 'metric'}
Expand All @@ -61,7 +61,7 @@ def nginx_access(line):
u'status': u'404',
u'timestamp': '2018-01-05T09:14:46.415000',
u'upstream_response_time': 0.0},
'event': u'POST_request',
'event': 'nginx_event',
'timestamp': '2018-01-05T09:14:46.415000',
'type': 'metric'}
'''
Expand All @@ -74,14 +74,12 @@ def nginx_access(line):
log['body_bytes_sent'] = float(log['body_bytes_sent'])
log['request_time'] = float(log['request_time'])
log['upstream_response_time'] = float(log['upstream_response_time'])

event = log['request'].split(' ')[0] + '_request'

return dict(
timestamp=log.get('timestamp',' '),
data=log,
type='metric',
event=event
event='nginx_event',
)

def mongodb(line):
Expand Down Expand Up @@ -192,7 +190,7 @@ def django(line):
else:
return dict(
timestamp=datetime.datetime.isoformat(datetime.datetime.utcnow()),
data=line
data={raw:line}
)

def basescript(line):
Expand Down Expand Up @@ -281,7 +279,7 @@ def elasticsearch(line):
event = data['message']
level=values[1]
timestamp=values[0]

return dict(
timestamp=timestamp,
level=level,
Expand All @@ -293,5 +291,5 @@ def elasticsearch(line):
else:
return dict(
timestamp=datetime.datetime.isoformat(datetime.datetime.now()),
data=line
data={'raw': line}
)
5 changes: 2 additions & 3 deletions logagg/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class LogForwarder(object):
MAX_SECONDS_TO_PUSH = 1
MAX_MESSAGES_TO_PUSH = 200

WAIT_TIME_TARGET_FAILURE = 10
WAIT_TIME_TARGET_FAILURE = 2

def __init__(self, message_source, targets, log=DUMMY_LOGGER):

Expand Down Expand Up @@ -102,8 +102,7 @@ def _send_msgs_to_target(self, target, msgs):
except (SystemExit, KeyboardInterrupt): raise
except:
# FIXME: do we log the failed messages themselves somewhere?
self.log.exception('_send_msgs_to_target_failed',
target=target, num_msgs=len(msgs))
self.log.exception('_send_msgs_to_target_failed', target=target)
time.sleep(self.WAIT_TIME_TARGET_FAILURE)
# FIXME: also implement some sort of backoff sleep

Expand Down
20 changes: 8 additions & 12 deletions logagg/forwarders.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,13 @@ def _ensure_connection(self):
def _parse_msg_for_mongodb(self, msgs):
msgs_list = []
for msg in msgs:
msg['_id'] = msg.pop('id')
try:
msg['_id'] = msg.pop('id')
except KeyError:
self.log.exception('collector_failure_id_not_found', log=msg)
msgs_list.append(msg)
return msgs_list

def _insert_1by1(self, records):
for r in records:
try:
self.collection.update({'_id': r['_id']}, r, upsert=True)
except pymongo.errors.OperationFailure as opfail:
self.log.exception('failed_to_insert_record_in_mongodb',
record=r, tb=opfail.details,
num_records=1, type='metric')

def handle_logs(self, msgs):
msgs_list = self._parse_msg_for_mongodb(msgs)
try:
Expand All @@ -86,8 +80,10 @@ def handle_logs(self, msgs):
except pymongo.errors.AutoReconnect(message='connection_to_mongodb_failed'):
self._ensure_connection()
except pymongo.errors.BulkWriteError as bwe:
self.log.exception('bulk_write_to_mongodb_failed')
self._insert_1by1(msgs_list)
self.log.info('logs_inserted_into_mongodb',
num_records=bwe.details['nInserted'], type='metric',
records_not_inserted = bwe.details['writeErrors'],
num_records_missed= len(bwe.details['writeErrors']))


from influxdb import InfluxDBClient
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name="logagg",
version="0.3.0",
version="0.3.1",
description="logs aggregation framework",
keywords="logagg",
author="Deep Compute, LLC",
Expand Down

0 comments on commit 271a306

Please sign in to comment.