Skip to content

Commit

Permalink
Upload queue improvements (#282)
Browse files Browse the repository at this point in the history
* Upload queue improvements

 - Better handle fatal errors when retrying.
 - Avoid mixing sync and async in BaseUploadQueue
 - Return the list of failed items to the user, when possible.
 - Make the BaseUploadQueue constructor public

* Fixes from review
  • Loading branch information
einarmo authored Jul 25, 2023
1 parent 18a8241 commit da993df
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 34 deletions.
8 changes: 6 additions & 2 deletions Cognite.Extensions/Assets/AssetExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,17 @@ private static async Task<CogniteResult<Asset, AssetUpdateItem>> UpdateAssetsHan
_logger.LogDebug("Failed to update {Count} assets: {Message}",
toUpdate.Count(), ex.Message);
var error = ResultHandlers.ParseException<AssetUpdateItem>(ex, RequestType.UpdateAssets);
if (error.Complete) errors.Add(error);
if (error.Type == ErrorType.FatalFailure
&& (retryMode == RetryMode.OnFatal
|| retryMode == RetryMode.OnFatalKeepDuplicates))
{
await Task.Delay(1000, token).ConfigureAwait(false);
}
else if (retryMode == RetryMode.None) break;
else if (retryMode == RetryMode.None)
{
errors.Add(error);
break;
}
else
{
if (!error.Complete)
Expand All @@ -461,6 +464,7 @@ private static async Task<CogniteResult<Asset, AssetUpdateItem>> UpdateAssetsHan
}
else
{
errors.Add(error);
toUpdate = ResultHandlers.CleanFromError(error, toUpdate);
}
}
Expand Down
6 changes: 6 additions & 0 deletions Cognite.Extensions/CogniteResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ public CogniteResult<TRep> Replace<TRep>(Func<TError, TRep> replace)
{
return new CogniteResult<TRep>(Errors?.Select(e => e.ReplaceSkipped(replace)));
}


/// <summary>
/// All items that were skipped in this result, accross all errors.
/// </summary>
public IEnumerable<TError> AllSkipped => Errors?.Where(e => e.Skipped != null)?.SelectMany(e => e.Skipped!) ?? Enumerable.Empty<TError>();
}

/// <summary>
Expand Down
8 changes: 6 additions & 2 deletions Cognite.Extensions/Events/EventExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,20 @@ private static async Task<CogniteResult<Event, EventCreate>> CreateEventsHandleE
_logger.LogDebug("Failed to create {cnt} events: {msg}",
toCreate.Count(), ex.Message);
var error = ResultHandlers.ParseException<EventCreate>(ex, RequestType.CreateEvents);
errors.Add(error);
if (error.Type == ErrorType.FatalFailure
&& (retryMode == RetryMode.OnFatal
|| retryMode == RetryMode.OnFatalKeepDuplicates))
{
await Task.Delay(1000, token).ConfigureAwait(false);
}
else if (retryMode == RetryMode.None) break;
else if (retryMode == RetryMode.None)
{
errors.Add(error);
break;
}
else
{
errors.Add(error);
toCreate = ResultHandlers.CleanFromError(error, toCreate);
}
}
Expand Down
9 changes: 7 additions & 2 deletions Cognite.Extensions/Sequences/SequenceExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,18 @@ private static async Task<CogniteResult<SequenceRowError>> InsertSequenceRowsHan
{
_logger.LogDebug("Failed to create rows for {seq} sequences", toCreate.Count());
var error = ResultHandlers.ParseException<SequenceRowError>(ex, RequestType.CreateSequenceRows);
if (error.Complete) errors.Add(error);

if (error.Type == ErrorType.FatalFailure
&& (retryMode == RetryMode.OnFatal
|| retryMode == RetryMode.OnFatalKeepDuplicates))
{
await Task.Delay(1000, token).ConfigureAwait(false);
}
else if (retryMode == RetryMode.None) break;
else if (retryMode == RetryMode.None)
{
errors.Add(error);
break;
}
else
{
if (!error.Complete)
Expand All @@ -465,6 +469,7 @@ private static async Task<CogniteResult<SequenceRowError>> InsertSequenceRowsHan
}
else
{
errors.Add(error);
toCreate = ResultHandlers.CleanFromError(error, toCreate);
}

Expand Down
12 changes: 10 additions & 2 deletions Cognite.Extensions/TimeSeries/DataPointExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,18 @@ private static async Task<CogniteResult<DataPointInsertError>> InsertDataPointsH
{
_logger.LogDebug("Failed to create datapoints for {seq} timeseries: {msg}", points.Count, ex.Message);
var error = ResultHandlers.ParseException<DataPointInsertError>(ex, RequestType.CreateDatapoints);
if (error.Complete) errors.Add(error);

if (error.Type == ErrorType.FatalFailure
&& (retryMode == RetryMode.OnFatal
|| retryMode == RetryMode.OnFatalKeepDuplicates))
{
await Task.Delay(1000, token).ConfigureAwait(false);
}
else if (retryMode == RetryMode.None) break;
else if (retryMode == RetryMode.None)
{
errors.Add(error);
break;
}
else
{
if (!error.Complete)
Expand All @@ -320,6 +324,10 @@ private static async Task<CogniteResult<DataPointInsertError>> InsertDataPointsH
.ConfigureAwait(false);
errors.Add(error);
}
else
{
errors.Add(error);
}
points = ResultHandlers.CleanFromError(error, points);
}
}
Expand Down
16 changes: 12 additions & 4 deletions Cognite.Extensions/TimeSeries/TimeSeriesExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,20 @@ private static async Task<CogniteResult<TimeSeries, TimeSeriesCreate>> CreateTim
_logger.LogDebug("Failed to create {cnt} timeseries: {msg}",
toCreate.Count(), ex.Message);
var error = ResultHandlers.ParseException<TimeSeriesCreate>(ex, RequestType.CreateTimeSeries);
errors.Add(error);
if (error.Type == ErrorType.FatalFailure
&& (retryMode == RetryMode.OnFatal
|| retryMode == RetryMode.OnFatalKeepDuplicates))
{
await Task.Delay(1000, token).ConfigureAwait(false);
}
else if (retryMode == RetryMode.None) break;
else if (retryMode == RetryMode.None)
{
errors.Add(error);
break;
}
else
{
errors.Add(error);
toCreate = ResultHandlers.CleanFromError(error, toCreate);
}
}
Expand Down Expand Up @@ -428,16 +432,20 @@ private static async Task<CogniteResult<TimeSeries, TimeSeriesUpdateItem>> Updat
_logger.LogDebug("Failed to create {Count} timeseries: {Message}",
items.Count(), ex.Message);
var error = ResultHandlers.ParseException<TimeSeriesUpdateItem>(ex, RequestType.UpdateTimeSeries);
errors.Add(error);
if (error.Type == ErrorType.FatalFailure
&& (retryMode == RetryMode.OnFatal
|| retryMode == RetryMode.OnFatalKeepDuplicates))
{
await Task.Delay(1000, token).ConfigureAwait(false);
}
else if (retryMode == RetryMode.None) break;
else if (retryMode == RetryMode.None)
{
errors.Add(error);
break;
}
else
{
errors.Add(error);
items = ResultHandlers.CleanFromError(error, items);
}
}
Expand Down
35 changes: 28 additions & 7 deletions ExtractorUtils/queues/BaseUploadQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,18 @@ public class QueueUploadResult<T>
/// </summary>
public Exception? Exception { get; }
/// <summary>
/// Items that failed to upload
/// </summary>
public IEnumerable<T>? Failed { get; }
/// <summary>
/// Constructor for successfull or empty upload.
/// </summary>
/// <param name="uploaded"></param>
public QueueUploadResult(IEnumerable<T> uploaded)
/// <param name="failed"></param>
public QueueUploadResult(IEnumerable<T> uploaded, IEnumerable<T> failed)
{
Uploaded = uploaded;
Failed = failed;
}
/// <summary>
/// Constructor for failed upload.
Expand Down Expand Up @@ -104,7 +110,7 @@ public abstract class BaseUploadQueue<T> : IUploadQueue<T>
/// <summary>
/// Logger to use
/// </summary>
protected ILogger<CogniteDestination> DestLogger { get; private set; }
protected ILogger DestLogger { get; private set; }

private readonly ConcurrentQueue<T> _items;
private readonly int _maxSize;
Expand All @@ -115,11 +121,19 @@ public abstract class BaseUploadQueue<T> : IUploadQueue<T>
private Task? _uploadLoopTask;
private Task? _uploadTask;

internal BaseUploadQueue(
/// <summary>
/// Constructor
/// </summary>
/// <param name="destination">Cognite destination used for uploads</param>
/// <param name="interval">Maximum interval between uploads. Set to zero or Timeout.InfiniteTimeSpan to disable periodic uploads</param>
/// <param name="maxSize">Maximum size of the upload queue, once it reaches this size an upload will always be triggered</param>
/// <param name="logger">Logger to use</param>
/// <param name="callback">Callback on finished upload, whether it failed or not</param>
protected BaseUploadQueue(
CogniteDestination destination,
TimeSpan interval,
int maxSize,
ILogger<CogniteDestination> logger,
ILogger logger,
Func<QueueUploadResult<T>, Task>? callback)
{
_maxSize = maxSize;
Expand Down Expand Up @@ -188,10 +202,17 @@ public async Task Start(CancellationToken token)
_timer?.Start();
DestLogger.LogDebug("Queue of type {Type} started", GetType().Name);

// Use a separate token to avoid propagating the loop cancellation to Chunking.RunThrottled
// Use a separate token source to allow cancelling the uploader separate from the upload loop.
_internalSource = new CancellationTokenSource();

_uploadLoopTask = Task.Run(() =>
var tokenCancellationTask = Task.Run(async () => {
try
{
await Task.Delay(Timeout.Infinite, token).ConfigureAwait(false);
}
catch { }
}, CancellationToken.None);
_uploadLoopTask = Task.Run(async () =>
{
try
{
Expand All @@ -201,7 +222,7 @@ public async Task Start(CancellationToken token)
_uploadTask = TriggerUploadAndCallback(_internalSource.Token);
// stop waiting if the source token gets cancelled, but do not
// cancel the upload task
_uploadTask.Wait(_tokenSource.Token);
await Task.WhenAny(_uploadTask, tokenCancellationTask).ConfigureAwait(false);
_pushEvent?.Reset();
_timer?.Start();
}
Expand Down
19 changes: 15 additions & 4 deletions ExtractorUtils/queues/EventUploadQueue.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Cognite.Extensions;
using CogniteSdk;
using Microsoft.Extensions.Logging;
using Oryx.Cognite;
using Prometheus;
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -96,18 +97,24 @@ private async Task ReadFromBuffer(CancellationToken token)
// If the queue is offline for a day, and generates a hundred gigabytes of events,
// the file could become unreadable.
events = await CogniteUtils.ReadEventsAsync(stream, token, 10_000);

if (events.Any())
{
var result = await Destination.EnsureEventsExistsAsync(events, RetryMode.OnError, SanitationMode.Clean, token);

DestLogger.LogResult(result, RequestType.CreateEvents, true);

var fatalError = result.Errors?.FirstOrDefault(err => err.Type == ErrorType.FatalFailure);
if (fatalError != null)
{
DestLogger.LogWarning("Failed to read from buffer: {msg}", fatalError.Message);
DestLogger.LogWarning("Failed to create items from buffer: {msg}", fatalError.Message);
return;
}

if (Callback != null) await Callback(new QueueUploadResult<EventCreate>(events));
var skipped = result.AllSkipped.ToList();
var uploaded = events.Except(skipped);

if (Callback != null) await Callback(new QueueUploadResult<EventCreate>(uploaded, skipped));
}
} while (events.Any());
}
Expand Down Expand Up @@ -153,13 +160,15 @@ protected override async Task<QueueUploadResult<EventCreate>> UploadEntries(IEnu
await ReadFromBuffer(token);
}
}
return new QueueUploadResult<EventCreate>(Enumerable.Empty<EventCreate>());
return new QueueUploadResult<EventCreate>(Enumerable.Empty<EventCreate>(), Enumerable.Empty<EventCreate>());
}

DestLogger.LogTrace("Dequeued {Number} events to upload to CDF", items.Count());

var result = await Destination.EnsureEventsExistsAsync(items, RetryMode.OnError, SanitationMode.Clean, token);

DestLogger.LogResult(result, RequestType.CreateEvents, true);

var fatalError = result.Errors?.FirstOrDefault(err => err.Type == ErrorType.FatalFailure);
if (fatalError != null)
{
Expand All @@ -174,7 +183,9 @@ protected override async Task<QueueUploadResult<EventCreate>> UploadEntries(IEnu
{
await ReadFromBuffer(token);
}
return new QueueUploadResult<EventCreate>(items);
var skipped = result.AllSkipped.ToList();
var uploaded = items.Except(skipped);
return new QueueUploadResult<EventCreate>(uploaded, skipped);
}
}
}
4 changes: 2 additions & 2 deletions ExtractorUtils/queues/RawUploadQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void EnqueueRow(string key, T columns)
var rows = items.ToDictionary(pair => pair.key, pair => pair.columns);
if (!rows.Any())
{
return new QueueUploadResult<(string key, T columns)>(Enumerable.Empty<(string key, T columns)>());
return new QueueUploadResult<(string key, T columns)>(Enumerable.Empty<(string key, T columns)>(), Enumerable.Empty<(string key, T columns)>());
}
DestLogger.LogTrace("Dequeued {Number} {Type} rows to upload to CDF Raw", rows.Count, typeof(T).Name);
try
Expand All @@ -72,7 +72,7 @@ public void EnqueueRow(string key, T columns)
return new QueueUploadResult<(string key, T columns)>(ex);
}
_numberRows.WithLabels(typeof(T).Name).Inc(rows.Count);
return new QueueUploadResult<(string key, T columns)>(items);
return new QueueUploadResult<(string key, T columns)>(items, Enumerable.Empty<(string key, T columns)>());
}
}
}
Loading

0 comments on commit da993df

Please sign in to comment.