Skip to content

Commit

Permalink
Doc updates
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbourgon committed Jan 10, 2017
1 parent 046f350 commit 5cfbf7d
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 89 deletions.
162 changes: 75 additions & 87 deletions DESIGN.md
Original file line number Diff line number Diff line change
@@ -1,51 +1,13 @@
# Design

## Motivation

The motivation here is basically Prometheus for logs.
What are the important properties of Prometheus that make it good, in my opinion?

- Designed to be run yourself: open-source and on-prem
- Designed specifically for highly-dynamic, typically-microservice workloads i.e. "cloud-native"
- Designed to be easy to deploy and operate: local storage, no clustering, pull model
- A complete system out of the box: doesn't need a separate TSDB, web UI, etc. to get something usable
- Scales up and out: 90% of users are satisfied without special effort

No current log system ticks these boxes.

- Splunk, Loggly, etc. are great, but hosted and/or paid
- Elastic/ELK are featureful, but too difficult to operate, and I believe Lucene is the wrong storage format
- Heka suffered from design errors leading to performance problems and is now abandoned
- Ekanite and other syslog systems are needlessly burdened by ancient requirements
- Fluentd, Logstash, etc. solve annotation and forwarding, but aren't complete systems

Ingestion is relatively straightforward.
(Somewhere, cynical and battle hardened logging engineers are laughing at me. I know, I know.)
Still: the task is to get log streams onto disk as efficiently as possible.
Ideally, we want to be bottlenecked by disk speeds on the ingest nodes.
This seems not impossible, especially if we operate at the transport level, agnostic to the log records themselves.
That is: no annotation, structured log parsing, topical routing, etc.
We can declare those as separate concerns.

Querying is a little more interesting.
I wondered the minimum viable query interface could look like.
After asking around, everyone in my peer group seemed to agree: basic grep, with time bounds, would be perfectly acceptable.
More sophisticated use cases could be modeled as ETLs or materialized views to richer systems.
And that an ingest-to-queryable time of single-digit seconds was also just fine.

It seems like there could be an opportunity here.
A logging system that you can operate yourself, without too many headaches.
One that is designed from first principles to be "cloud-native", to plug in to e.g. Kubernetes without friction.
One that absorbs large-scale workloads without special effort.
And a complete system: one that includes querying out of the box.

Let's iterate toward a system design.
In this document, we first describe the system at a high level.
Then, we introduce constraints and invariants to reify the problem domain.
We move methodically toward a concrete solution, describing key components and behaviors along the way.

## Producers and consumers

On the produer side, we have a large and dynamic set of instances emitting log records to stdout/stderr
in the [12 Factor tradition](https://12factor.net/logs).
On the consumer side, clients want to search their logs -- somehow.
We have a large and dynamic set of producers emitting a stream of log records.
Those records should be made available for searching by a consumer.

```
+-----------+
Expand Down Expand Up @@ -73,7 +35,6 @@ P -> | R |

## Operational details

Let's consider some operational details.
We will have many producers, order of thousands.
(A producer for us is the application process, plus a forwarding agent.)
Our log system will necessarily be smaller than the production system it is serving.
Expand Down Expand Up @@ -112,26 +73,29 @@ If an ingester dies, its forwarders should be able to simply reconnect to other
This is all to say forwarders must not require knowledge about which is the "correct" ingester.
Any ingester must be equally viable.

> ★ As an optimization, highly-loaded ingesters can shed load (connections) to other ingesters.
> Ingesters will gossip load information between each other, like number of connections, iops, etc.
> Then, highly-loaded ingesters can refuse new connections, and thus redirect forwarders to more-lightly-loaded peers.
> Extremely highly-loaded ingesters can even terminate existing connections, if necessary.
As an optimization, highly-loaded ingesters can shed load (connections) to other ingesters.
Ingesters will gossip load information between each other, like number of connections, iops, etc.
Then, highly-loaded ingesters can refuse new connections, and thus redirect forwarders to more-lightly-loaded peers.
Extremely highly-loaded ingesters can even terminate existing connections, if necessary.
This needs to be carefully managed to prevent inadvertant denial of service.
For example, no more than a handful of ingesters should be refusing connections at a given time.

Consumers need to be able to make queries without any prior knowledge about time partitions, replica allocation, etc.
Without that knowledge, this implies the queries will always be scattered to each query node, gathered, and deduplicated.
Query nodes may die at any time, or start up and be empty, so query operations must manage partial results gracefully.

> As an optimization, consumers can perform read-repair.
> A query should return N copies of each matching record, where N is the replication factor.
> Any records with fewer than N copies returned may be under-replicated.
> A new segment with the suspect records can be created and replicated to the cluster.
> As a further optimization, a separate process can perform sequential queries of the timespace, for the explicit purpose of read repair.
As an optimization, consumers can perform read-repair.
A query should return N copies of each matching record, where N is the replication factor.
Any records with fewer than N copies returned may be under-replicated.
A new segment with the suspect records can be created and replicated to the cluster.
As a further optimization, a separate process can perform sequential queries of the timespace, for the explicit purpose of read repair.

The transfer of data between the ingest and query tier also needs care.
Ideally, any ingest node should be able to transfer segments to any/all query nodes arbitrarily,
and do work that makes forward progress in the system as a whole.
And we must recover gracefully from failure i.e. network partitions at any stage of the transaction.
Let's look at how to shuffle data safely from the ingest tier to the query tier.
Ideally, any ingest node should be able to transfer segments to any/all query nodes arbitrarily.
That transfer should accomplish work that makes forward progress in the system as a whole.
And we must recover gracefully from failure; for example, network partitions at any stage of the transaction.

Let's now look at how to shuffle data safely from the ingest tier to the query tier.

## Ingest segments

Expand All @@ -141,6 +105,8 @@ It's important that each record have a timestamp with reasonable precision, to c
But it's not important that the clocks are globally synced, or that the records are e.g. strictly linearizable.
Also, it's fine if records that arrive in the same minimum time window to appear out-of-order, as long as that order is stable.

Incoming records are written to a so-called active segment, which is a file on disk.

```
+---+
P -> F -> | I | -> Active: R R R...
Expand All @@ -149,7 +115,6 @@ P -> F -> | |
+---+
```

Then, we mux the records to a so-called active segment, which is a file on disk.
Once it has written B bytes, or been active for S seconds, the active segment is flushed to disk.

```
Expand All @@ -160,11 +125,19 @@ P -> F -> | | Flushed: R R R R R R R R
+---+
```

> ☛ The ingester consumes records serially from each forwarder connection.
> The next record is consumed when the current record is successfully written to the active segment.
> And the active segment is normally synced once it is flushed.
> But producers can optionally connect to a separate port, whose handler will flush the active segment after each record is written.
> This provides stronger durability, at the expense of throughput.
The ingester consumes records serially from each forwarder connection.
The next record is consumed when the current record is successfully written to the active segment.
And the active segment is normally synced once it is flushed.
This is the default durability mode, tentative called fast.

Producers can optionally connect to a separate port, whose handler will sync the active segment after each record is written.
This provides stronger durability, at the expense of throughput.
This is a separate durability mode, tentatively called durable.

There can be a third, even higher durability mode, tentatively called bulk.
Forwarders may write entire segment files at once to the ingester.
Each segment file would only be acknowledged when it's been successfully replicated among the storage nodes.
Then, the forwarder is free to send the next complete segment.

Ingesters host an API which serves flushed segments.

Expand All @@ -190,19 +163,19 @@ P -> F -> | | Active | Q | --'
+---+
```

> ☛ Ingesters are stateful, so they should have a graceful shutdown process.
> First, they should terminate connections and close listeners.
> Then, they should wait for all flushed segments to be consumed.
> Finally, they may be shut down.
Observe that ingesters are stateful, so they need a graceful shutdown process.
First, they should terminate connections and close listeners.
Then, they should wait for all flushed segments to be consumed.
Finally, they may be shut down.

### Consume segments

The ingesters act as a sort of queue, buffering records to disk in groups called segments.
But that storage should be considered ephemeral.
Segments should be consumed as quickly as possible by the query tier.
While it is protected against e.g. power failure, ultimately, we consider that storage ephemeral.
Segments should be replicated as quickly as possible by the query tier.
Here, we take a page from the Prometheus playbook.
Rather than the ingesters pushing flushed segments to query nodes, the query nodes pull flushed segments from the ingesters.
This enables a coherent model for scaling.
This enables a coherent model to scale for throughput.
To accept a higher ingest rate, add more ingest nodes, with faster disks.
If the ingest nodes are backing up, add more query nodes to consume from them.

Expand All @@ -215,6 +188,7 @@ This process repeats, consuming multiple segments from the ingest tier, and merg

Once the composite segment has reached B bytes, or been active for S seconds, it is closed, and we enter the replication stage.
Replication means writing the composite segment to N distinct query nodes, where N is the replication factor.
At the moment we just POST the segment to a replication endpoint on N random store nodes.

Once the segment is confirmed replicated on N nodes, we enter the commit stage.
The query node commits the original segments on all of the ingest nodes, via POST /commit.
Expand Down Expand Up @@ -302,7 +276,8 @@ This gives us a way to select segment files for querying.
Compaction serves two purposes: deduplication of records, and de-overlapping of segments.
Duplication of records can occur during failures e.g. network partitions.
But segments will regularly and naturally overlap.
Consider 3 overlapping segments on a given query node.

Consider 3 overlapping segment files on a given query node.

```
t0 t1
Expand All @@ -317,30 +292,35 @@ t0 t1
| +---------+
```

Compaction will first merge these overlapping segments into a single übersegment.
Duplicate records can be detected by time UUID and dropped.
Then, it will re-split the übersegment on time boundaries, and produce new, non-overlapping segments.
Compaction will first merge these overlapping segments into a single aggregate segment in memory.
During the merge, duplicate records can be detected by ULID and dropped.
Then, compaction will split the aggregate segment to achieve desired file sizes, and produce new, non-overlapping segments.

```
t0 t1
+---------+-----+
| | |
| D | E |
| | |
+---------+-----+
+-------+-------+
| | |
| D | E |
| | |
+-------+-------+
```

Compaction reduces the number of segments necessary to read in order to serve queries for a given time.
In the ideal state, each time will map to exactly 1 segment.
This helps query performance by reducing the number of reads.

> ☛ Observe that compaction improves query performance, but doesn't affect either correctness or space utilization.
> Compression can be applied to segments completely orthogonally to the process described here.
> More thought and experimentation is needed.
Observe that compaction improves query performance, but doesn't affect either correctness or space utilization.
Compression can be applied to segments completely orthogonally to the process described here.
Proper compression can dramatically increase retention, at the cost of some CPU burn.
It may also prevent processing of segment files with common UNIX tooling like grep, though this may or may not be important.

Since records are individually addressable, read-time deduplication occurs on a per-record basis.
So the mapping of record to segment can be optimized completely independently by each node, without coördination.
And the schedule and aggressiveness of compaction can be tuned; more thought is needed here.

The schedule and aggressiveness of compaction is an important performance consideration.
At the moment, a single compactor thread (goroutine) performs each of the compaction tasks sequentially and perpetually.
It fires at most once per second.
Much more performance analysis and real-world study is necessary here.

## Querying

Expand All @@ -350,13 +330,21 @@ Upon receipt, the query is broadcast to every node in the query tier.
Responses are gathered, results are merged and deduplicated, and then returned to the user.

The actual grepping work is done by each query node individually.
Only the matching records are returned (streamed?) back to the query node handling the user request.
First, segment files matching the time boundaries of the query are identified.
Then, a per-segment reader is attached to each file, and filters matching records from the file.
Finally, a merging reader takes results from each segment file, orders them, and returns them to the originating query node.
This pipeline is lazily constructed of io.ReadClosers, and costs paid when reads actually occur.
That is, when the HTTP response is written to the originating query node.

Note that the per-segment reader is launched in its own goroutine, and reading/filtering occurs concurrently.
Currently there is no fixed limit on the number of active goroutines allowed to read segment files.
This should be improved.

The query request has several fields.
(Note this is a work in progress.)

- From, To time.Time — bounds of query
- Q string — regexp to grep for, blank is OK and matches all records
- Q string — term to grep for, blank is OK and matches all records
- Regex bool — if true, compile and match Q as a regex
- StatsOnly bool — if true, just return stats, without actual results

The query response has several fields.
Expand All @@ -366,7 +354,7 @@ The query response has several fields.
- Size int — file size of segments read to produce results
- Results io.Reader — merged and time-ordered results

Setting StatsOnly true can be used to quickly "explore" a data set, and narrow down a query to a usable result set.
StatsOnly can be used to "explore" and iterate on a query, until it's been narrowed down to a usable result set.

# Component model

Expand Down
11 changes: 9 additions & 2 deletions MODEL.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
# Queueing theory
# Model

This is a very preliminary braindump on some ideas for a system model.
I am no academic, and it's been a long time since I've taken my queueing theory class.
This is all unvalidated, and may be wildly off-base.
I welcome corrections with an open heart and mind.

## Queueing theory

First, [an introduction to some queueing theory](http://www.perfdynamics.com/Tools/PDQ.html).

Expand Down Expand Up @@ -29,7 +36,7 @@ For testing we can reasonably fix λ to some constant value.
In contrast, S will be a function of the size of the work unit (record).
For now let's fix each record at N bytes. Then, S = f(N).

# System model
## Our components

- Producer — Forward — Ingest — Store

Expand Down

0 comments on commit 5cfbf7d

Please sign in to comment.