Skip to content
This repository has been archived by the owner on Jun 1, 2024. It is now read-only.

Commit

Permalink
manually merged serilog/serilog/391 back in and made sure it plays ni…
Browse files Browse the repository at this point in the history
…ce with the existing sink, introduced state object for common state values such as client, serializer to be constructed in one place
  • Loading branch information
Mpdreamz committed Mar 2, 2015
1 parent c4c5da1 commit be59314
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Elasticsearch.Net.Connection;
using Elasticsearch.Net.Serialization;
using Serilog.Configuration;
using Serilog.Core;
using Serilog.Events;
using Serilog.Sinks.Elasticsearch;

Expand Down Expand Up @@ -48,8 +49,10 @@ public static LoggerConfiguration Elasticsearch(
//TODO NEST trace logging ID's to corrolate requests to eachother
//Deal with positional formatting in fields property (default to scalar string in mapping)
options = options ?? new ElasticsearchSinkOptions(new [] { new Uri("http://localhost:9200") });
var sink = new ElasticsearchSink(options);
sink.RegisterTemplateIfNeeded();

var sink = string.IsNullOrWhiteSpace(options.BufferBaseFilename)
? (ILogEventSink) new ElasticsearchSink(options)
: new DurableElasticsearchSink(options);
return loggerSinkConfiguration.Sink(sink, options.MinimumLogEventLevel ?? LevelAlias.Minimum);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,23 @@
<ItemGroup>
<Compile Include="LoggerConfigurationElasticSearchExtensions.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Sinks\ElasticSearch\ElasticsearchJsonFormatter.cs" />
<Compile Include="Sinks\Elasticsearch\DurableElasticsearchSink.cs" />
<Compile Include="Sinks\Elasticsearch\ElasticsearchJsonFormatter.cs" />
<Compile Include="..\..\assets\CommonAssemblyInfo.cs">
<Link>Properties\CommonAssemblyInfo.cs</Link>
</Compile>
<Compile Include="Sinks\ElasticSearch\ElasticsearchSink.cs" />
<Compile Include="Sinks\ElasticSearch\ElasticsearchSinkOptions.cs" />
<Compile Include="Sinks\Elasticsearch\ElasticsearchLogShipper.cs" />
<Compile Include="Sinks\Elasticsearch\ElasticsearchSink.cs" />
<Compile Include="Sinks\Elasticsearch\ElasticsearchSinkOptions.cs" />
<Compile Include="Sinks\Elasticsearch\ElasticsearchSinkState.cs" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\assets\Serilog.snk">
<Link>Serilog.snk</Link>
</None>
<None Include="app.config" />
<None Include="packages.config" />
<None Include="Serilog.Sinks.ElasticSearch.nuspec" />
<None Include="Serilog.Sinks.Elasticsearch.nuspec" />
</ItemGroup>
<ItemGroup>
<BootstrapperPackage Include=".NETFramework,Version=v4.5">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,33 @@
using Serilog.Events;
using Serilog.Sinks.RollingFile;

namespace Serilog.Sinks.ElasticSearch
namespace Serilog.Sinks.Elasticsearch
{
class DurableElasticSearchSink : ILogEventSink, IDisposable
class DurableElasticsearchSink : ILogEventSink, IDisposable
{
// we rely on the date in the filename later!
const string FileNameSuffix = "-{Date}.json";

readonly RollingFileSink _sink;
readonly ElasticSearchLogShipper _shipper;
readonly ElasticsearchLogShipper _shipper;
private readonly ElasticsearchSinkState _state;

public DurableElasticSearchSink(ElasticsearchSinkOptions options)
public DurableElasticsearchSink(ElasticsearchSinkOptions options)
{
if (options == null) throw new ArgumentNullException("options");
_state = ElasticsearchSinkState.Create(options);

if (string.IsNullOrWhiteSpace(options.BufferBaseFilename))
{
throw new ArgumentException("Cannot create the durable ElasticSearch sink without a buffer base file name!");
}

var formatter = options.CustomFormatter ?? new ElasticsearchJsonFormatter(
formatProvider: options.FormatProvider,
renderMessage: true,
closingDelimiter: Environment.NewLine,
serializer: options.Serializer,
inlineFields: options.InlineFields
);

_sink = new RollingFileSink(
options.BufferBaseFilename + FileNameSuffix,
formatter,
_state.Formatter,
options.BufferFileSizeLimitBytes,
null);

_shipper = new ElasticSearchLogShipper(options);
_shipper = new ElasticsearchLogShipper(_state);
}

public void Emit(LogEvent logEvent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
using Elasticsearch.Net.Serialization;
using Serilog.Debugging;

namespace Serilog.Sinks.ElasticSearch
namespace Serilog.Sinks.Elasticsearch
{
class ElasticSearchLogShipper : IDisposable
class ElasticsearchLogShipper : IDisposable
{
readonly ElasticsearchClient _client;
private readonly ElasticsearchSinkState _state;

readonly int _batchPostingLimit;
readonly Timer _timer;
Expand All @@ -39,27 +39,15 @@ class ElasticSearchLogShipper : IDisposable
readonly string _bookmarkFilename;
readonly string _logFolder;
readonly string _candidateSearchPath;
readonly string _typeName;
readonly string _indexFormat;

public ElasticSearchLogShipper(ElasticsearchSinkOptions options)
internal ElasticsearchLogShipper(ElasticsearchSinkState state)
{
var configuration = new ConnectionConfiguration(options.ConnectionPool)
.SetTimeout(ElasticsearchSink.DefaultConnectionTimeout)
.SetMaximumAsyncConnections(20);

_period = options.BufferLogShippingInterval ?? TimeSpan.FromSeconds(5);

_indexFormat = !string.IsNullOrWhiteSpace(options.IndexFormat) ? options.IndexFormat : ElasticsearchSink.DefaultIndexFormat;
_typeName = !string.IsNullOrWhiteSpace(options.TypeName) ? options.TypeName : ElasticsearchSink.DefaultTypeName;

_client = new ElasticsearchClient(configuration, connection: options.Connection, serializer: options.Serializer);

_batchPostingLimit = options.BatchPostingLimit ?? 100;

_bookmarkFilename = Path.GetFullPath(options.BufferBaseFilename + ".bookmark");
_state = state;
_period = _state.Options.BufferLogShippingInterval ?? TimeSpan.FromSeconds(5);
_batchPostingLimit = _state.Options.BatchPostingLimit;
_bookmarkFilename = Path.GetFullPath(_state.Options.BufferBaseFilename + ".bookmark");
_logFolder = Path.GetDirectoryName(_bookmarkFilename);
_candidateSearchPath = Path.GetFileName(options.BufferBaseFilename) + "*.json";
_candidateSearchPath = Path.GetFileName(_state.Options.BufferBaseFilename) + "*.json";

_timer = new Timer(s => OnTick());

Expand Down Expand Up @@ -173,19 +161,18 @@ void OnTick()
string nextLine;
while (count < _batchPostingLimit && TryReadLine(current, ref nextLineBeginsAtOffset, out nextLine))
{
var indexName = string.Format(_indexFormat, date);
var action = new { index = new { _index = indexName, _type = _typeName } };
var actionJson = _client.Serializer.Serialize(action, SerializationFormatting.None);

payload.Add(Encoding.UTF8.GetString(actionJson));
var indexName = string.Format(_state.Options.IndexFormat, date);
var action = new { index = new { _index = indexName, _type = _state.Options.TypeName } };
var actionJson = _state.Serialize(action);
payload.Add(actionJson);
payload.Add(nextLine);
++count;
}
}

if (count > 0)
{
var response = _client.Bulk(payload);
var response = _state.Client.Bulk(payload);

if (response.Success)
{
Expand Down
126 changes: 10 additions & 116 deletions src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using Serilog.Sinks.PeriodicBatching;
using System.Text;
using System.Text.RegularExpressions;
using Serilog.Formatting;

namespace Serilog.Sinks.Elasticsearch
{
Expand All @@ -31,15 +32,8 @@ namespace Serilog.Sinks.Elasticsearch
/// </summary>
public class ElasticsearchSink : PeriodicBatchingSink
{
readonly ElasticsearchJsonFormatter _formatter;
readonly string _typeName;
readonly ElasticsearchClient _client;
readonly Func<LogEvent, DateTimeOffset, string> _indexDecider;

private readonly bool _registerTemplateOnStartup;
private readonly string _templateName;
private readonly string _templateMatchString;
private static readonly Regex IndexFormatRegex = new Regex(@"^(.*)(?:\{0\:.+\})(.*)$");
private readonly ElasticsearchSinkState _state;

/// <summary>
/// Creates a new ElasticsearchSink instance with the provided options
Expand All @@ -48,105 +42,7 @@ public class ElasticsearchSink : PeriodicBatchingSink
public ElasticsearchSink(ElasticsearchSinkOptions options)
: base(options.BatchPostingLimit, options.Period)
{
if (string.IsNullOrWhiteSpace(options.IndexFormat)) throw new ArgumentException("options.IndexFormat");
if (string.IsNullOrWhiteSpace(options.TypeName)) throw new ArgumentException("options.TypeName");
if (string.IsNullOrWhiteSpace(options.TemplateName)) throw new ArgumentException("options.TemplateName");
this._templateName = options.TemplateName;
this._templateMatchString = IndexFormatRegex.Replace(options.IndexFormat, @"$1*$2");

_indexDecider = options.IndexDecider ?? ((@event, offset) => string.Format(options.IndexFormat, offset));
_typeName = options.TypeName;

var configuration = new ConnectionConfiguration(options.ConnectionPool)
.SetTimeout(5000)
.SetMaximumAsyncConnections(20);

if (options.ModifyConnectionSetttings != null)
configuration = options.ModifyConnectionSetttings(configuration);

_client = new ElasticsearchClient(configuration, connection: options.Connection, serializer: options.Serializer);
_formatter = new ElasticsearchJsonFormatter(
formatProvider: options.FormatProvider,
renderMessage: true,
closingDelimiter: string.Empty,
serializer: options.Serializer,
inlineFields: options.InlineFields
);

this._registerTemplateOnStartup = options.AutoRegisterTemplate;
}

Func<LogEvent, DateTimeOffset, string> DefaultIndexDecider(string indexFormat)
{
return (@event, offset) => string.Format(indexFormat, offset);
}


/// <summary>
/// Register the elasticsearch index template if the provided options mandate it.
/// </summary>
public void RegisterTemplateIfNeeded()
{
if (!this._registerTemplateOnStartup) return;
var result = this._client.IndicesPutTemplateForAll<VoidResponse>(this._templateName, new
{
template = this._templateMatchString,
settings = new Dictionary<string, string>
{
{"index.refresh_interval", "5s"}
},
mappings = new
{
_default_ = new
{
_all = new { enabled = true },
dynamic_templates = new[]
{
new
{
string_fields = new
{
match = "*",
match_mapping_type = "string",
mapping = new
{
type = "string", index = "analyzed", omit_norms = true,
fields = new
{
raw = new
{
type= "string", index = "not_analyzed", ignore_above = 256
}
}
}
}
}
},
properties = new Dictionary<string, object>
{
{ "message", new { type = "string", index = "analyzed" } },
{ "exceptions", new
{
type = "nested", properties = new Dictionary<string, object>
{
{ "Depth", new { type = "integer" } },
{ "RemoteStackIndex", new { type = "integer" } },
{ "HResult", new { type = "integer" } },
{ "StackTraceString", new { type = "string", index = "analyzed" } },
{ "RemoteStackTraceString", new { type = "string", index = "analyzed" } },
{ "ExceptionMessage", new
{
type = "object", properties = new Dictionary<string, object>
{
{ "MemberType", new { type = "integer" } },
}
}}
}
} }
}
}
}
});
_state = ElasticsearchSinkState.Create(options);
}

/// <summary>
Expand All @@ -161,23 +57,21 @@ public void RegisterTemplateIfNeeded()
protected override void EmitBatch(IEnumerable<LogEvent> events)
{
// ReSharper disable PossibleMultipleEnumeration
if (!events.Any())
if (events == null || !events.Any())
return;

var payload = new List<string>();

foreach (var e in events)
{
var indexName = _indexDecider(e, e.Timestamp.ToUniversalTime());
var action = new { index = new { _index = indexName, _type = _typeName } };
var actionJson = _client.Serializer.Serialize(action, SerializationFormatting.None);
payload.Add(Encoding.UTF8.GetString(actionJson));
var indexName = _state.GetIndexForEvent(e, e.Timestamp.ToUniversalTime());
var action = new { index = new { _index = indexName, _type = _state.Options.TypeName } };
var actionJson = _state.Serialize(action);
payload.Add(actionJson);
var sw = new StringWriter();
_formatter.Format(e, sw);
_state.Formatter.Format(e, sw);
payload.Add(sw.ToString());
}

_client.Bulk(payload);
_state.Client.Bulk(payload);
}
}
}
Loading

0 comments on commit be59314

Please sign in to comment.