Skip to content

Commit

Permalink
logd: replace internal CLOCK_MONOTONIC use with sequence numbers
Browse files Browse the repository at this point in the history
- switch to simpler and faster internal sequence number, drops
  a syscall overhead on 32-bit platforms.
- add ability to break-out of flushTo loop with filter return -1
  allowing in reduction in reader overhead.

Change-Id: Ic5cb2b9afa4d9470153971fc9197b07279e2b79d
  • Loading branch information
Mark Salyzyn committed Mar 18, 2015
1 parent 5aa097c commit f7c0f75
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 66 deletions.
2 changes: 1 addition & 1 deletion logd/FlushCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FlushCommand::FlushCommand(LogReader &reader,
unsigned long tail,
unsigned int logMask,
pid_t pid,
log_time start)
uint64_t start)
: mReader(reader)
, mNonBlock(nonBlock)
, mTail(tail)
Expand Down
4 changes: 2 additions & 2 deletions logd/FlushCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ class FlushCommand : public SocketClientCommand {
unsigned long mTail;
unsigned int mLogMask;
pid_t mPid;
log_time mStart;
uint64_t mStart;

public:
FlushCommand(LogReader &mReader,
bool nonBlock = false,
unsigned long tail = -1,
unsigned int logMask = -1,
pid_t pid = 0,
log_time start = LogTimeEntry::EPOCH);
uint64_t start = 1);
virtual void runSocketCommand(SocketClient *client);

static bool hasReadLogs(SocketClient *client);
Expand Down
40 changes: 23 additions & 17 deletions logd/LogBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void LogBuffer::log(log_id_t log_id, log_time realtime,
if (last == mLogElements.end()) {
mLogElements.push_back(elem);
} else {
log_time end = log_time::EPOCH;
uint64_t end = 1;
bool end_set = false;
bool end_always = false;

Expand All @@ -184,7 +184,7 @@ void LogBuffer::log(log_id_t log_id, log_time realtime,
}

if (end_always
|| (end_set && (end >= (*last)->getMonotonicTime()))) {
|| (end_set && (end >= (*last)->getSequence()))) {
mLogElements.push_back(elem);
} else {
mLogElements.insert(last,elem);
Expand Down Expand Up @@ -241,7 +241,7 @@ void LogBuffer::prune(log_id_t id, unsigned long pruneRows, uid_t caller_uid) {
for(it = mLogElements.begin(); it != mLogElements.end();) {
LogBufferElement *e = *it;

if (oldest && (oldest->mStart <= e->getMonotonicTime())) {
if (oldest && (oldest->mStart <= e->getSequence())) {
break;
}

Expand Down Expand Up @@ -293,7 +293,7 @@ void LogBuffer::prune(log_id_t id, unsigned long pruneRows, uid_t caller_uid) {
for(it = mLogElements.begin(); it != mLogElements.end();) {
LogBufferElement *e = *it;

if (oldest && (oldest->mStart <= e->getMonotonicTime())) {
if (oldest && (oldest->mStart <= e->getSequence())) {
break;
}

Expand Down Expand Up @@ -334,7 +334,7 @@ void LogBuffer::prune(log_id_t id, unsigned long pruneRows, uid_t caller_uid) {
while((pruneRows > 0) && (it != mLogElements.end())) {
LogBufferElement *e = *it;
if (e->getLogId() == id) {
if (oldest && (oldest->mStart <= e->getMonotonicTime())) {
if (oldest && (oldest->mStart <= e->getSequence())) {
if (!whitelist) {
if (stats.sizes(id) > (2 * log_buffer_size(id))) {
// kick a misbehaving log reader client off the island
Expand Down Expand Up @@ -366,7 +366,7 @@ void LogBuffer::prune(log_id_t id, unsigned long pruneRows, uid_t caller_uid) {
while((it != mLogElements.end()) && (pruneRows > 0)) {
LogBufferElement *e = *it;
if (e->getLogId() == id) {
if (oldest && (oldest->mStart <= e->getMonotonicTime())) {
if (oldest && (oldest->mStart <= e->getSequence())) {
if (stats.sizes(id) > (2 * log_buffer_size(id))) {
// kick a misbehaving log reader client off the island
oldest->release_Locked();
Expand Down Expand Up @@ -423,16 +423,16 @@ unsigned long LogBuffer::getSize(log_id_t id) {
return retval;
}

log_time LogBuffer::flushTo(
SocketClient *reader, const log_time start, bool privileged,
bool (*filter)(const LogBufferElement *element, void *arg), void *arg) {
uint64_t LogBuffer::flushTo(
SocketClient *reader, const uint64_t start, bool privileged,
int (*filter)(const LogBufferElement *element, void *arg), void *arg) {
LogBufferElementCollection::iterator it;
log_time max = start;
uint64_t max = start;
uid_t uid = reader->getUid();

pthread_mutex_lock(&mLogElementsLock);

if (start == LogTimeEntry::EPOCH) {
if (start <= 1) {
// client wants to start from the beginning
it = mLogElements.begin();
} else {
Expand All @@ -441,7 +441,7 @@ log_time LogBuffer::flushTo(
for (it = mLogElements.end(); it != mLogElements.begin(); /* do nothing */) {
--it;
LogBufferElement *element = *it;
if (element->getMonotonicTime() <= start) {
if (element->getSequence() <= start) {
it++;
break;
}
Expand All @@ -455,13 +455,19 @@ log_time LogBuffer::flushTo(
continue;
}

if (element->getMonotonicTime() <= start) {
if (element->getSequence() <= start) {
continue;
}

// NB: calling out to another object with mLogElementsLock held (safe)
if (filter && !(*filter)(element, arg)) {
continue;
if (filter) {
int ret = (*filter)(element, arg);
if (ret == false) {
continue;
}
if (ret != true) {
break;
}
}

pthread_mutex_unlock(&mLogElementsLock);
Expand All @@ -481,7 +487,7 @@ log_time LogBuffer::flushTo(
}

void LogBuffer::formatStatistics(char **strp, uid_t uid, unsigned int logMask) {
log_time oldest(CLOCK_MONOTONIC);
uint64_t oldest = UINT64_MAX;

pthread_mutex_lock(&mLogElementsLock);

Expand All @@ -491,7 +497,7 @@ void LogBuffer::formatStatistics(char **strp, uid_t uid, unsigned int logMask) {
LogBufferElement *element = *it;

if ((logMask & (1 << element->getLogId()))) {
oldest = element->getMonotonicTime();
oldest = element->getSequence();
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions logd/LogBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class LogBuffer {
void log(log_id_t log_id, log_time realtime,
uid_t uid, pid_t pid, pid_t tid,
const char *msg, unsigned short len);
log_time flushTo(SocketClient *writer, const log_time start,
uint64_t flushTo(SocketClient *writer, const uint64_t start,
bool privileged,
bool (*filter)(const LogBufferElement *element, void *arg) = NULL,
int (*filter)(const LogBufferElement *element, void *arg) = NULL,
void *arg = NULL);

void clear(log_id_t id, uid_t uid = AID_ROOT);
Expand Down
9 changes: 5 additions & 4 deletions logd/LogBufferElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#include "LogBufferElement.h"
#include "LogReader.h"

const log_time LogBufferElement::FLUSH_ERROR((uint32_t)0, (uint32_t)0);
const uint64_t LogBufferElement::FLUSH_ERROR(0);
atomic_int_fast64_t LogBufferElement::sequence;

LogBufferElement::LogBufferElement(log_id_t log_id, log_time realtime,
uid_t uid, pid_t pid, pid_t tid,
Expand All @@ -34,7 +35,7 @@ LogBufferElement::LogBufferElement(log_id_t log_id, log_time realtime,
, mPid(pid)
, mTid(tid)
, mMsgLen(len)
, mMonotonicTime(CLOCK_MONOTONIC)
, mSequence(sequence.fetch_add(1, memory_order_relaxed))
, mRealTime(realtime) {
mMsg = new char[len];
memcpy(mMsg, msg, len);
Expand All @@ -44,7 +45,7 @@ LogBufferElement::~LogBufferElement() {
delete [] mMsg;
}

log_time LogBufferElement::flushTo(SocketClient *reader) {
uint64_t LogBufferElement::flushTo(SocketClient *reader) {
struct logger_entry_v3 entry;
memset(&entry, 0, sizeof(struct logger_entry_v3));
entry.hdr_size = sizeof(struct logger_entry_v3);
Expand All @@ -64,5 +65,5 @@ log_time LogBufferElement::flushTo(SocketClient *reader) {
return FLUSH_ERROR;
}

return mMonotonicTime;
return mSequence;
}
11 changes: 7 additions & 4 deletions logd/LogBufferElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define _LOGD_LOG_BUFFER_ELEMENT_H__

#include <sys/types.h>
#include <stdatomic.h>
#include <sysutils/SocketClient.h>
#include <log/log.h>
#include <log/log_read.h>
Expand All @@ -29,8 +30,9 @@ class LogBufferElement {
const pid_t mTid;
char *mMsg;
const unsigned short mMsgLen;
const log_time mMonotonicTime;
const uint64_t mSequence;
const log_time mRealTime;
static atomic_int_fast64_t sequence;

public:
LogBufferElement(log_id_t log_id, log_time realtime,
Expand All @@ -43,11 +45,12 @@ class LogBufferElement {
pid_t getPid(void) const { return mPid; }
pid_t getTid(void) const { return mTid; }
unsigned short getMsgLen() const { return mMsgLen; }
log_time getMonotonicTime(void) const { return mMonotonicTime; }
uint64_t getSequence(void) const { return mSequence; }
static uint64_t getCurrentSequence(void) { return sequence.load(memory_order_relaxed); }
log_time getRealTime(void) const { return mRealTime; }

static const log_time FLUSH_ERROR;
log_time flushTo(SocketClient *writer);
static const uint64_t FLUSH_ERROR;
uint64_t flushTo(SocketClient *writer);
};

#endif
38 changes: 19 additions & 19 deletions logd/LogReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,63 +100,63 @@ bool LogReader::onDataAvailable(SocketClient *cli) {
nonBlock = true;
}

// Convert realtime to monotonic time
if (start == log_time::EPOCH) {
start = LogTimeEntry::EPOCH;
} else {
uint64_t sequence = 1;
// Convert realtime to sequence number
if (start != log_time::EPOCH) {
class LogFindStart {
const pid_t mPid;
const unsigned mLogMask;
bool startTimeSet;
log_time &start;
log_time last;
uint64_t &sequence;
uint64_t last;

public:
LogFindStart(unsigned logMask, pid_t pid, log_time &start)
LogFindStart(unsigned logMask, pid_t pid, log_time &start, uint64_t &sequence)
: mPid(pid)
, mLogMask(logMask)
, startTimeSet(false)
, start(start)
, last(LogTimeEntry::EPOCH)
, sequence(sequence)
, last(sequence)
{ }

static bool callback(const LogBufferElement *element, void *obj) {
static int callback(const LogBufferElement *element, void *obj) {
LogFindStart *me = reinterpret_cast<LogFindStart *>(obj);
if (!me->startTimeSet
&& (!me->mPid || (me->mPid == element->getPid()))
if ((!me->mPid || (me->mPid == element->getPid()))
&& (me->mLogMask & (1 << element->getLogId()))) {
if (me->start == element->getRealTime()) {
me->start = element->getMonotonicTime();
me->sequence = element->getSequence();
me->startTimeSet = true;
return -1;
} else {
if (me->start < element->getRealTime()) {
me->start = me->last;
me->sequence = me->last;
me->startTimeSet = true;
return -1;
}
me->last = element->getMonotonicTime();
me->last = element->getSequence();
}
}
return false;
}

bool found() { return startTimeSet; }
} logFindStart(logMask, pid, start);
} logFindStart(logMask, pid, start, sequence);

logbuf().flushTo(cli, LogTimeEntry::EPOCH,
FlushCommand::hasReadLogs(cli),
logbuf().flushTo(cli, sequence, FlushCommand::hasReadLogs(cli),
logFindStart.callback, &logFindStart);

if (!logFindStart.found()) {
if (nonBlock) {
doSocketDelete(cli);
return false;
}
log_time now(CLOCK_MONOTONIC);
start = now;
sequence = LogBufferElement::getCurrentSequence();
}
}

FlushCommand command(*this, nonBlock, tail, logMask, pid, start);
FlushCommand command(*this, nonBlock, tail, logMask, pid, sequence);
command.runSocketCommand(cli);
return true;
}
Expand Down
Loading

0 comments on commit f7c0f75

Please sign in to comment.