Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions src/SeqCli/Cli/Commands/IngestCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using SeqCli.Cli.Features;
using SeqCli.Connection;
using SeqCli.Ingestion;
using SeqCli.PlainText;
using Serilog;
using Serilog.Core;
using Serilog.Events;
Expand All @@ -28,7 +29,7 @@
namespace SeqCli.Cli.Commands
{
[Command("ingest", "Send JSON log events from a file or `STDIN`",
Example = "seqcli ingest -i events.clef --filter=\"@Level <> 'Debug'\" -p Environment=Test")]
Example = "seqcli ingest -i events.clef --json --filter=\"@Level <> 'Debug'\" -p Environment=Test")]
class IngestCommand : Command
{
readonly SeqConnectionFactory _connectionFactory;
Expand All @@ -37,6 +38,7 @@ class IngestCommand : Command
readonly PropertiesFeature _properties;
readonly ConnectionFeature _connection;
string _filter;
bool _json;

public IngestCommand(SeqConnectionFactory connectionFactory)
{
Expand All @@ -45,6 +47,10 @@ public IngestCommand(SeqConnectionFactory connectionFactory)
_invalidDataHandlingFeature = Enable<InvalidDataHandlingFeature>();
_properties = Enable<PropertiesFeature>();

Options.Add("json",
"Read the events as JSON (the default assumes plain text)",
v => _json = true);

Options.Add("f=|filter=",
"Filter expression to select a subset of events",
v => _filter = string.IsNullOrWhiteSpace(v) ? null : v.Trim());
Expand All @@ -71,14 +77,22 @@ protected override async Task<int> Run()
? new StreamReader(File.Open(_fileInputFeature.InputFilename, FileMode.Open, FileAccess.Read,
FileShare.ReadWrite))
: null)
using (var reader = new LogEventReader(inputFile ?? Console.In))
{
return await LogShipper.ShipEvents(
_connectionFactory.Connect(_connection),
reader,
enrichers,
_invalidDataHandlingFeature.InvalidDataHandling,
filter);
var input = inputFile ?? Console.In;

var reader = _json ?
(ILogEventReader)new ClefLogEventReader(input) :
new PlainTextLogEventReader(input);

using (reader as IDisposable)
{
return await LogShipper.ShipEvents(
_connectionFactory.Connect(_connection),
reader,
enrichers,
_invalidDataHandlingFeature.InvalidDataHandling,
filter);
}
}
}
catch (Exception ex)
Expand Down
8 changes: 7 additions & 1 deletion src/SeqCli/Cli/Commands/LogCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,13 @@ protected override async Task<int> Run()
}

var connection = _connectionFactory.Connect(_connection);
var result = await connection.Client.HttpClient.PostAsync(ApiConstants.IngestionEndpoint, content);

var request = new HttpRequestMessage(HttpMethod.Post, ApiConstants.IngestionEndpoint) {Content = content};

if (_connection.IsApiKeySpecified)
request.Headers.Add("X-Seq-ApiKey", _connection.ApiKey);

var result = await connection.Client.HttpClient.SendAsync(request);

if (result.IsSuccessStatusCode)
return 0;
Expand Down
45 changes: 45 additions & 0 deletions src/SeqCli/Ingestion/ClefLogEventReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2018 Datalust Pty Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.IO;
using System.Threading.Tasks;
using Serilog.Events;
using Serilog.Formatting.Compact.Reader;

namespace SeqCli.Ingestion
{
class ClefLogEventReader : ILogEventReader, IDisposable
{
readonly LogEventReader _reader;

public ClefLogEventReader(TextReader input)
{
_reader = new LogEventReader(input ?? throw new ArgumentNullException(nameof(input)));
}

public Task<LogEvent> TryReadAsync()
{
if (_reader.TryRead(out var evt))
return Task.FromResult(evt);

return Task.FromResult<LogEvent>(null);
}

public void Dispose()
{
_reader.Dispose();
}
}
}
10 changes: 10 additions & 0 deletions src/SeqCli/Ingestion/ILogEventReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Threading.Tasks;
using Serilog.Events;

namespace SeqCli.Ingestion
{
interface ILogEventReader
{
Task<LogEvent> TryReadAsync();
}
}
22 changes: 14 additions & 8 deletions src/SeqCli/Ingestion/LogShipper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
using Serilog.Core;
using Serilog.Events;
using Serilog.Formatting.Compact;
using Serilog.Formatting.Compact.Reader;

namespace SeqCli.Ingestion
{
Expand All @@ -38,7 +37,7 @@ static class LogShipper

public static async Task<int> ShipEvents(
SeqConnection connection,
LogEventReader reader,
ILogEventReader reader,
List<ILogEventEnricher> enrichers,
InvalidDataHandling invalidDataHandling,
Func<LogEvent, bool> filter = null)
Expand All @@ -47,7 +46,7 @@ public static async Task<int> ShipEvents(
if (reader == null) throw new ArgumentNullException(nameof(reader));
if (enrichers == null) throw new ArgumentNullException(nameof(enrichers));

var batch = ReadBatch(reader, filter, BatchSize, invalidDataHandling);
var batch = await ReadBatchAsync(reader, filter, BatchSize, invalidDataHandling);
while (batch.Length > 0)
{
StringContent content;
Expand All @@ -67,7 +66,7 @@ public static async Task<int> ShipEvents(

if (result.IsSuccessStatusCode)
{
batch = ReadBatch(reader, filter, BatchSize, invalidDataHandling);
batch = await ReadBatchAsync(reader, filter, BatchSize, invalidDataHandling);
continue;
}

Expand All @@ -80,7 +79,7 @@ public static async Task<int> ShipEvents(

Log.Error("Failed with status code {StatusCode}: {ErrorMessage}",
result.StatusCode,
(string)error.ErrorMessage);
(string)error.Error);
}
catch
{
Expand All @@ -95,16 +94,23 @@ public static async Task<int> ShipEvents(
return 0;
}

static LogEvent[] ReadBatch(LogEventReader reader, Func<LogEvent, bool> filter,
int count, InvalidDataHandling invalidDataHandling)
static async Task<LogEvent[]> ReadBatchAsync(
ILogEventReader reader,
Func<LogEvent, bool> filter,
int count,
InvalidDataHandling invalidDataHandling)
{
var batch = new List<LogEvent>();
do
{
try
{
while (batch.Count < count && reader.TryRead(out var evt))
while (batch.Count < count)
{
var evt = await reader.TryReadAsync();
if (evt == null)
break;

if (filter == null || filter(evt))
{
batch.Add(evt);
Expand Down
60 changes: 60 additions & 0 deletions src/SeqCli/PlainText/BuiltInPatterns.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using SeqCli.PlainText.Parsers;
using Superpower;
using Superpower.Model;
using Superpower.Parsers;

namespace SeqCli.PlainText
{
static class BuiltInPatterns
{
public static TextParser<object> Identifier { get; } =
IdentifierEx.CStyle
.Select(span => (object) span);

public static TextParser<object> Token { get; } =
SpanEx.NonWhiteSpace.Select(span => (object)span);

public static TextParser<object> MultiLineMessage { get; } =
SpanEx.MatchedBy(
Character.Matching(ch => !char.IsWhiteSpace(ch), "non whitespace character")
.IgnoreThen(Character.AnyChar.Many()))
.Select(span => (object)span);

public static TextParser<object> MultiLineContent { get; } =
SpanEx.MatchedBy(Character.AnyChar.Many())
.Select(span => (object)span);

public static TextParser<object> SingleLineContent { get; } =
SpanEx.MatchedBy(Character.ExceptIn('\r', '\n').Many())
.Select(span => (object)span);

public static TextParser<object> LiteralText(string literalText)
{
return Span.EqualTo(literalText).Select(span => (object) span);
}

public static TextParser<object> NonGreedyContent(PatternElement[] following)
{
if (following.Length == 0)
return SpanEx.MatchedBy(Character.AnyChar.Many()).Select(span => (object) span);

var rest = following[0].Parser;
for (var i = 1; i < following.Length; ++i)
{
rest = rest.IgnoreThen(following[i].Parser);
}

return i =>
{
var remainder = i;
while (!rest.IsMatch(remainder))
{
remainder = remainder.ConsumeChar().Remainder;
}

var span = i.Until(remainder);
return Result.Value((object) span, i, remainder);
};
}
}
}
7 changes: 6 additions & 1 deletion src/SeqCli/PlainText/FrameReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

namespace SeqCli.PlainText
{
class FrameReader
class FrameReader : IDisposable
{
readonly TextReader _source;
readonly TimeSpan _trailingLineArrivalDeadline;
Expand Down Expand Up @@ -122,5 +122,10 @@ bool IsFrameStart(string line)
return result.HasValue && result.Value.Length > 0;
}
}

public void Dispose()
{
_unawaitedNextLine?.Dispose();
}
}
}
Loading