diff --git a/Cognite.Extensions/Assets/AssetExtensions.cs b/Cognite.Extensions/Assets/AssetExtensions.cs index 7e50b68c..91db7a9a 100644 --- a/Cognite.Extensions/Assets/AssetExtensions.cs +++ b/Cognite.Extensions/Assets/AssetExtensions.cs @@ -438,14 +438,17 @@ private static async Task> UpdateAssetsHan _logger.LogDebug("Failed to update {Count} assets: {Message}", toUpdate.Count(), ex.Message); var error = ResultHandlers.ParseException(ex, RequestType.UpdateAssets); - if (error.Complete) errors.Add(error); if (error.Type == ErrorType.FatalFailure && (retryMode == RetryMode.OnFatal || retryMode == RetryMode.OnFatalKeepDuplicates)) { await Task.Delay(1000, token).ConfigureAwait(false); } - else if (retryMode == RetryMode.None) break; + else if (retryMode == RetryMode.None) + { + errors.Add(error); + break; + } else { if (!error.Complete) @@ -461,6 +464,7 @@ private static async Task> UpdateAssetsHan } else { + errors.Add(error); toUpdate = ResultHandlers.CleanFromError(error, toUpdate); } } diff --git a/Cognite.Extensions/CogniteResult.cs b/Cognite.Extensions/CogniteResult.cs index e882e444..8f802e1b 100644 --- a/Cognite.Extensions/CogniteResult.cs +++ b/Cognite.Extensions/CogniteResult.cs @@ -352,6 +352,12 @@ public CogniteResult Replace(Func replace) { return new CogniteResult(Errors?.Select(e => e.ReplaceSkipped(replace))); } + + + /// + /// All items that were skipped in this result, accross all errors. + /// + public IEnumerable AllSkipped => Errors?.Where(e => e.Skipped != null)?.SelectMany(e => e.Skipped!) ?? Enumerable.Empty(); } /// diff --git a/Cognite.Extensions/Events/EventExtensions.cs b/Cognite.Extensions/Events/EventExtensions.cs index a195dbd9..6d611734 100644 --- a/Cognite.Extensions/Events/EventExtensions.cs +++ b/Cognite.Extensions/Events/EventExtensions.cs @@ -279,16 +279,20 @@ private static async Task> CreateEventsHandleE _logger.LogDebug("Failed to create {cnt} events: {msg}", toCreate.Count(), ex.Message); var error = ResultHandlers.ParseException(ex, RequestType.CreateEvents); - errors.Add(error); if (error.Type == ErrorType.FatalFailure && (retryMode == RetryMode.OnFatal || retryMode == RetryMode.OnFatalKeepDuplicates)) { await Task.Delay(1000, token).ConfigureAwait(false); } - else if (retryMode == RetryMode.None) break; + else if (retryMode == RetryMode.None) + { + errors.Add(error); + break; + } else { + errors.Add(error); toCreate = ResultHandlers.CleanFromError(error, toCreate); } } diff --git a/Cognite.Extensions/Sequences/SequenceExtensions.cs b/Cognite.Extensions/Sequences/SequenceExtensions.cs index d24426f7..f2c5c4f8 100644 --- a/Cognite.Extensions/Sequences/SequenceExtensions.cs +++ b/Cognite.Extensions/Sequences/SequenceExtensions.cs @@ -441,14 +441,18 @@ private static async Task> InsertSequenceRowsHan { _logger.LogDebug("Failed to create rows for {seq} sequences", toCreate.Count()); var error = ResultHandlers.ParseException(ex, RequestType.CreateSequenceRows); - if (error.Complete) errors.Add(error); + if (error.Type == ErrorType.FatalFailure && (retryMode == RetryMode.OnFatal || retryMode == RetryMode.OnFatalKeepDuplicates)) { await Task.Delay(1000, token).ConfigureAwait(false); } - else if (retryMode == RetryMode.None) break; + else if (retryMode == RetryMode.None) + { + errors.Add(error); + break; + } else { if (!error.Complete) @@ -465,6 +469,7 @@ private static async Task> InsertSequenceRowsHan } else { + errors.Add(error); toCreate = ResultHandlers.CleanFromError(error, toCreate); } diff --git a/Cognite.Extensions/TimeSeries/DataPointExtensions.cs b/Cognite.Extensions/TimeSeries/DataPointExtensions.cs index f3b18251..ae7fb12a 100644 --- a/Cognite.Extensions/TimeSeries/DataPointExtensions.cs +++ b/Cognite.Extensions/TimeSeries/DataPointExtensions.cs @@ -302,14 +302,18 @@ private static async Task> InsertDataPointsH { _logger.LogDebug("Failed to create datapoints for {seq} timeseries: {msg}", points.Count, ex.Message); var error = ResultHandlers.ParseException(ex, RequestType.CreateDatapoints); - if (error.Complete) errors.Add(error); + if (error.Type == ErrorType.FatalFailure && (retryMode == RetryMode.OnFatal || retryMode == RetryMode.OnFatalKeepDuplicates)) { await Task.Delay(1000, token).ConfigureAwait(false); } - else if (retryMode == RetryMode.None) break; + else if (retryMode == RetryMode.None) + { + errors.Add(error); + break; + } else { if (!error.Complete) @@ -320,6 +324,10 @@ private static async Task> InsertDataPointsH .ConfigureAwait(false); errors.Add(error); } + else + { + errors.Add(error); + } points = ResultHandlers.CleanFromError(error, points); } } diff --git a/Cognite.Extensions/TimeSeries/TimeSeriesExtensions.cs b/Cognite.Extensions/TimeSeries/TimeSeriesExtensions.cs index d3807848..b831b6b6 100644 --- a/Cognite.Extensions/TimeSeries/TimeSeriesExtensions.cs +++ b/Cognite.Extensions/TimeSeries/TimeSeriesExtensions.cs @@ -324,16 +324,20 @@ private static async Task> CreateTim _logger.LogDebug("Failed to create {cnt} timeseries: {msg}", toCreate.Count(), ex.Message); var error = ResultHandlers.ParseException(ex, RequestType.CreateTimeSeries); - errors.Add(error); if (error.Type == ErrorType.FatalFailure && (retryMode == RetryMode.OnFatal || retryMode == RetryMode.OnFatalKeepDuplicates)) { await Task.Delay(1000, token).ConfigureAwait(false); } - else if (retryMode == RetryMode.None) break; + else if (retryMode == RetryMode.None) + { + errors.Add(error); + break; + } else { + errors.Add(error); toCreate = ResultHandlers.CleanFromError(error, toCreate); } } @@ -428,16 +432,20 @@ private static async Task> Updat _logger.LogDebug("Failed to create {Count} timeseries: {Message}", items.Count(), ex.Message); var error = ResultHandlers.ParseException(ex, RequestType.UpdateTimeSeries); - errors.Add(error); if (error.Type == ErrorType.FatalFailure && (retryMode == RetryMode.OnFatal || retryMode == RetryMode.OnFatalKeepDuplicates)) { await Task.Delay(1000, token).ConfigureAwait(false); } - else if (retryMode == RetryMode.None) break; + else if (retryMode == RetryMode.None) + { + errors.Add(error); + break; + } else { + errors.Add(error); items = ResultHandlers.CleanFromError(error, items); } } diff --git a/ExtractorUtils/queues/BaseUploadQueue.cs b/ExtractorUtils/queues/BaseUploadQueue.cs index 49e2def5..6e692a51 100644 --- a/ExtractorUtils/queues/BaseUploadQueue.cs +++ b/ExtractorUtils/queues/BaseUploadQueue.cs @@ -69,12 +69,18 @@ public class QueueUploadResult /// public Exception? Exception { get; } /// + /// Items that failed to upload + /// + public IEnumerable? Failed { get; } + /// /// Constructor for successfull or empty upload. /// /// - public QueueUploadResult(IEnumerable uploaded) + /// + public QueueUploadResult(IEnumerable uploaded, IEnumerable failed) { Uploaded = uploaded; + Failed = failed; } /// /// Constructor for failed upload. @@ -104,7 +110,7 @@ public abstract class BaseUploadQueue : IUploadQueue /// /// Logger to use /// - protected ILogger DestLogger { get; private set; } + protected ILogger DestLogger { get; private set; } private readonly ConcurrentQueue _items; private readonly int _maxSize; @@ -115,11 +121,19 @@ public abstract class BaseUploadQueue : IUploadQueue private Task? _uploadLoopTask; private Task? _uploadTask; - internal BaseUploadQueue( + /// + /// Constructor + /// + /// Cognite destination used for uploads + /// Maximum interval between uploads. Set to zero or Timeout.InfiniteTimeSpan to disable periodic uploads + /// Maximum size of the upload queue, once it reaches this size an upload will always be triggered + /// Logger to use + /// Callback on finished upload, whether it failed or not + protected BaseUploadQueue( CogniteDestination destination, TimeSpan interval, int maxSize, - ILogger logger, + ILogger logger, Func, Task>? callback) { _maxSize = maxSize; @@ -188,10 +202,17 @@ public async Task Start(CancellationToken token) _timer?.Start(); DestLogger.LogDebug("Queue of type {Type} started", GetType().Name); - // Use a separate token to avoid propagating the loop cancellation to Chunking.RunThrottled + // Use a separate token source to allow cancelling the uploader separate from the upload loop. _internalSource = new CancellationTokenSource(); - _uploadLoopTask = Task.Run(() => + var tokenCancellationTask = Task.Run(async () => { + try + { + await Task.Delay(Timeout.Infinite, token).ConfigureAwait(false); + } + catch { } + }, CancellationToken.None); + _uploadLoopTask = Task.Run(async () => { try { @@ -201,7 +222,7 @@ public async Task Start(CancellationToken token) _uploadTask = TriggerUploadAndCallback(_internalSource.Token); // stop waiting if the source token gets cancelled, but do not // cancel the upload task - _uploadTask.Wait(_tokenSource.Token); + await Task.WhenAny(_uploadTask, tokenCancellationTask).ConfigureAwait(false); _pushEvent?.Reset(); _timer?.Start(); } diff --git a/ExtractorUtils/queues/EventUploadQueue.cs b/ExtractorUtils/queues/EventUploadQueue.cs index d742bdc4..1b0e64d5 100644 --- a/ExtractorUtils/queues/EventUploadQueue.cs +++ b/ExtractorUtils/queues/EventUploadQueue.cs @@ -1,6 +1,7 @@ using Cognite.Extensions; using CogniteSdk; using Microsoft.Extensions.Logging; +using Oryx.Cognite; using Prometheus; using System; using System.Collections.Generic; @@ -96,18 +97,24 @@ private async Task ReadFromBuffer(CancellationToken token) // If the queue is offline for a day, and generates a hundred gigabytes of events, // the file could become unreadable. events = await CogniteUtils.ReadEventsAsync(stream, token, 10_000); + if (events.Any()) { var result = await Destination.EnsureEventsExistsAsync(events, RetryMode.OnError, SanitationMode.Clean, token); + DestLogger.LogResult(result, RequestType.CreateEvents, true); + var fatalError = result.Errors?.FirstOrDefault(err => err.Type == ErrorType.FatalFailure); if (fatalError != null) { - DestLogger.LogWarning("Failed to read from buffer: {msg}", fatalError.Message); + DestLogger.LogWarning("Failed to create items from buffer: {msg}", fatalError.Message); return; } - if (Callback != null) await Callback(new QueueUploadResult(events)); + var skipped = result.AllSkipped.ToList(); + var uploaded = events.Except(skipped); + + if (Callback != null) await Callback(new QueueUploadResult(uploaded, skipped)); } } while (events.Any()); } @@ -153,13 +160,15 @@ protected override async Task> UploadEntries(IEnu await ReadFromBuffer(token); } } - return new QueueUploadResult(Enumerable.Empty()); + return new QueueUploadResult(Enumerable.Empty(), Enumerable.Empty()); } DestLogger.LogTrace("Dequeued {Number} events to upload to CDF", items.Count()); var result = await Destination.EnsureEventsExistsAsync(items, RetryMode.OnError, SanitationMode.Clean, token); + DestLogger.LogResult(result, RequestType.CreateEvents, true); + var fatalError = result.Errors?.FirstOrDefault(err => err.Type == ErrorType.FatalFailure); if (fatalError != null) { @@ -174,7 +183,9 @@ protected override async Task> UploadEntries(IEnu { await ReadFromBuffer(token); } - return new QueueUploadResult(items); + var skipped = result.AllSkipped.ToList(); + var uploaded = items.Except(skipped); + return new QueueUploadResult(uploaded, skipped); } } } diff --git a/ExtractorUtils/queues/RawUploadQueue.cs b/ExtractorUtils/queues/RawUploadQueue.cs index a3a864c3..0a407bcf 100644 --- a/ExtractorUtils/queues/RawUploadQueue.cs +++ b/ExtractorUtils/queues/RawUploadQueue.cs @@ -60,7 +60,7 @@ public void EnqueueRow(string key, T columns) var rows = items.ToDictionary(pair => pair.key, pair => pair.columns); if (!rows.Any()) { - return new QueueUploadResult<(string key, T columns)>(Enumerable.Empty<(string key, T columns)>()); + return new QueueUploadResult<(string key, T columns)>(Enumerable.Empty<(string key, T columns)>(), Enumerable.Empty<(string key, T columns)>()); } DestLogger.LogTrace("Dequeued {Number} {Type} rows to upload to CDF Raw", rows.Count, typeof(T).Name); try @@ -72,7 +72,7 @@ public void EnqueueRow(string key, T columns) return new QueueUploadResult<(string key, T columns)>(ex); } _numberRows.WithLabels(typeof(T).Name).Inc(rows.Count); - return new QueueUploadResult<(string key, T columns)>(items); + return new QueueUploadResult<(string key, T columns)>(items, Enumerable.Empty<(string key, T columns)>()); } } } diff --git a/ExtractorUtils/queues/TimeSeriesUploadQueue.cs b/ExtractorUtils/queues/TimeSeriesUploadQueue.cs index 6120b3a8..19f920f4 100644 --- a/ExtractorUtils/queues/TimeSeriesUploadQueue.cs +++ b/ExtractorUtils/queues/TimeSeriesUploadQueue.cs @@ -133,7 +133,7 @@ private async Task WriteToBuffer(Dictionary> dp } [System.Diagnostics.CodeAnalysis.SuppressMessage("Reliability", "CA2007: Do not directly await a Task", Justification = "Awaiter configured by the caller")] - private async Task InsertDataPoints(IDictionary> dps, CancellationToken token) + private async Task> InsertDataPoints(IDictionary> dps, CancellationToken token) { CogniteResult result; if (_createMissingTimeseries) @@ -149,6 +149,7 @@ private async Task InsertDataPoints(IDictionary DestLogger.LogResult(result, RequestType.CreateDatapoints, false, LogLevel.Debug); + var skipped = new List<(Identity id, Datapoint dp)>(); if (result.Errors != null) { var fatal = result.Errors.FirstOrDefault(err => err.Type == ErrorType.FatalFailure); @@ -160,13 +161,26 @@ private async Task InsertDataPoints(IDictionary { if (err.Skipped != null && err.Skipped.Any()) { - foreach (var dpErr in err.Skipped.OfType()) + foreach (var dpErr in err.Skipped) { - dps.Remove(dpErr.Id); + if (dps.TryGetValue(dpErr.Id, out var byDp)) + { + var uploaded = byDp.Except(dpErr.DataPoints); + if (uploaded.Any()) + { + dps[dpErr.Id] = uploaded; + skipped.AddRange(dpErr.DataPoints.Select(dp => (dpErr.Id, dp))); + } + else + { + dps.Remove(dpErr.Id); + } + } } } } } + return skipped; } [System.Diagnostics.CodeAnalysis.SuppressMessage("Reliability", "CA2007: Do not directly await a Task", Justification = "Awaiter configured by the caller")] @@ -183,10 +197,10 @@ private async Task ReadFromBuffer(CancellationToken token) dps = await CogniteUtils.ReadDatapointsAsync(stream, token, 1_000_000); if (dps.Any()) { - await InsertDataPoints(dps, token); + var skipped = await InsertDataPoints(dps, token); await HandleUploadResult(dps, token); if (Callback != null) await Callback(new QueueUploadResult<(Identity id, Datapoint dp)>( - dps.SelectMany(kvp => kvp.Value.Select(dp => (kvp.Key, dp))).ToList())); + dps.SelectMany(kvp => kvp.Value.Select(dp => (kvp.Key, dp))).ToList(), skipped)); } } while (dps.Any()); } @@ -255,18 +269,19 @@ private async Task HandleUploadResult(IDictionary(Enumerable.Empty<(Identity id, Datapoint dp)>()); + return new QueueUploadResult<(Identity id, Datapoint dp)>(Enumerable.Empty<(Identity id, Datapoint dp)>(), Enumerable.Empty<(Identity id, Datapoint dp)>()); } - if (!dps.Any()) return new QueueUploadResult<(Identity, Datapoint)>(Enumerable.Empty<(Identity, Datapoint)>()); + if (!dps.Any()) return new QueueUploadResult<(Identity, Datapoint)>(Enumerable.Empty<(Identity, Datapoint)>(), Enumerable.Empty<(Identity id, Datapoint dp)>()); DestLogger.LogTrace("Dequeued {Number} datapoints to upload to CDF", dps.Count()); var dpMap = dps.GroupBy(pair => pair.id, pair => pair.dp).ToDictionary(group => group.Key, group => (IEnumerable)group); + IEnumerable<(Identity id, Datapoint dp)> skipped; try { - await InsertDataPoints(dpMap, token); + skipped = await InsertDataPoints(dpMap, token); } catch (Exception ex) { @@ -293,7 +308,7 @@ private async Task HandleUploadResult(IDictionary kvp.Value.Select(dp => (kvp.Key, dp))).ToList(); _numberPoints.Inc(uploaded.Count); - return new QueueUploadResult<(Identity, Datapoint)>(uploaded); + return new QueueUploadResult<(Identity, Datapoint)>(uploaded, skipped); } } }