diff --git a/Cognite.Extensions/Assets/AssetSanitation.cs b/Cognite.Extensions/Assets/AssetSanitation.cs index 63cd209b..62cced56 100644 --- a/Cognite.Extensions/Assets/AssetSanitation.cs +++ b/Cognite.Extensions/Assets/AssetSanitation.cs @@ -105,6 +105,7 @@ public static (IEnumerable, IEnumerable) CleanAssetRe IEnumerable assets, SanitationMode mode) { + if (mode == SanitationMode.None) return (assets, Enumerable.Empty()); if (assets == null) { throw new ArgumentNullException(nameof(assets)); diff --git a/Cognite.Extensions/CdfMetrics.cs b/Cognite.Extensions/CdfMetrics.cs index 009036cd..991f4975 100644 --- a/Cognite.Extensions/CdfMetrics.cs +++ b/Cognite.Extensions/CdfMetrics.cs @@ -16,6 +16,8 @@ static class CdfMetrics "Number and duration of raw requests to CDF", "endpoint"); public static Summary Sequences { get; } = Metrics.CreateSummary("extractor_utils_cdf_sequence_requests", "Number and duration of sequence requests to CDF", "endpoint"); + public static Summary SequenceRows { get; } = Metrics.CreateSummary("extractor_utils_cdf_sequence_row_requests", + "Number and duration of sequence row requests to CDF", "endpoint"); public static Counter AssetsSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_assets_skipped", "Number of assets skipped due to errors"); @@ -25,5 +27,7 @@ static class CdfMetrics "Number of events skipped due to errors"); public static Counter SequencesSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_sequences_skipped", "Number of sequences skipped due to errors"); + public static Counter SequenceRowsSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_sequence_rows_skipped", + "Number of sequence rows skipped due to errors"); } } diff --git a/Cognite.Extensions/CogniteResult.cs b/Cognite.Extensions/CogniteResult.cs index ec196d8f..557af8b8 100644 --- a/Cognite.Extensions/CogniteResult.cs +++ b/Cognite.Extensions/CogniteResult.cs @@ -72,6 +72,10 @@ public static CogniteError ParseException(Exception ex, RequestType type) { ParseSequencesException(rex, result); } + else if (type == RequestType.CreateSequenceRows) + { + ParseSequenceRowException(rex, result); + } return result; } else @@ -222,6 +226,10 @@ public class CogniteError /// public IEnumerable Skipped { get; set; } /// + /// Further information about the error, for some errors. + /// + public IEnumerable Data { get; set; } + /// /// Exception that caused this error, if any. /// public Exception Exception { get; set; } @@ -357,6 +365,22 @@ public enum ResourceType /// ColumnMetadata, /// + /// Collection of rows when creating in sequence + /// + SequenceRows, + /// + /// Row in a sequence + /// + SequenceRow, + /// + /// Values of a sequence row + /// + SequenceRowValues, + /// + /// Row number of a sequence row + /// + SequenceRowNumber, + /// /// None or unknown /// None = -1 @@ -381,7 +405,11 @@ public enum RequestType /// /// Create sequences /// - CreateSequences + CreateSequences, + /// + /// Create sequence rows + /// + CreateSequenceRows } diff --git a/Cognite.Extensions/CogniteUtils.cs b/Cognite.Extensions/CogniteUtils.cs index 2405221a..7befaaff 100644 --- a/Cognite.Extensions/CogniteUtils.cs +++ b/Cognite.Extensions/CogniteUtils.cs @@ -685,6 +685,7 @@ public static void AddExtensionLoggers(this IServiceProvider provider) TimeSeriesExtensions.SetLogger(logger); RawExtensions.SetLogger(logger); EventExtensions.SetLogger(logger); + SequenceExtensions.SetLogger(logger); } } diff --git a/Cognite.Extensions/Events/EventSanitation.cs b/Cognite.Extensions/Events/EventSanitation.cs index 7b04c0c1..9f3605bb 100644 --- a/Cognite.Extensions/Events/EventSanitation.cs +++ b/Cognite.Extensions/Events/EventSanitation.cs @@ -105,6 +105,7 @@ public static (IEnumerable, IEnumerable) CleanEventRe IEnumerable events, SanitationMode mode) { + if (mode == SanitationMode.None) return (events, Enumerable.Empty()); if (events == null) { throw new ArgumentNullException(nameof(events)); diff --git a/Cognite.Extensions/Sequences/SequenceExtensions.cs b/Cognite.Extensions/Sequences/SequenceExtensions.cs index 07ef6fce..970b6118 100644 --- a/Cognite.Extensions/Sequences/SequenceExtensions.cs +++ b/Cognite.Extensions/Sequences/SequenceExtensions.cs @@ -332,5 +332,138 @@ private static async Task> CreateSequencesHandleErrors( } return new CogniteResult(errors, null); } + + /// + /// Insert sequence rows into given list of sequences. + /// Chunks by both number of sequences per request, and number of rows per sequence. + /// Optionally sanitizes the request, and handles errors that occur while running. + /// + /// CogniteSdk sequence resource object + /// List of sequences and rows to create + /// Maximum number of sequences in each request + /// Maximum number of sequence rows per sequence + /// Maximum number of sequences to read at a time if reading to handle errors + /// Maximum number of parallel requests + /// How to handle errors + /// How to sanitize the request before sending + /// Cancellation token + /// Result containing optional errors if something went wrong + public static async Task InsertAsync( + this SequencesResource sequences, + IEnumerable toCreate, + int keyChunkSize, + int valueChunkSize, + int sequencesChunk, + int throttleSize, + RetryMode retryMode, + SanitationMode sanitationMode, + CancellationToken token) + { + IEnumerable errors; + (toCreate, errors) = Sanitation.CleanSequenceDataRequest(toCreate, sanitationMode); + + var dict = toCreate.ToDictionary(create => create.Id.HasValue ? Identity.Create(create.Id.Value) : Identity.Create(create.ExternalId), new IdentityComparer()); + var chunks = dict + .Select(kvp => (kvp.Key, kvp.Value.Rows)) + .ChunkBy(keyChunkSize, valueChunkSize) + .Select(chunk => chunk + .Select(pair => new SequenceDataCreate + { + Columns = dict[pair.Key].Columns, + Id = pair.Key.Id, + ExternalId = pair.Key.ExternalId, + Rows = pair.Values + })) + .ToList(); + + int size = chunks.Count + (errors.Any() ? 1 : 0); + var results = new CogniteResult[size]; + + if (errors.Any()) + { + results[size - 1] = new CogniteResult(errors, null); + if (size == 1) return results[size - 1]; + } + if (size == 0) return new CogniteResult(null, null); + + _logger.LogDebug("Inserting sequences rows. Number of sequences: {Number}. Number of chunks: {Chunks}", toCreate.Count(), chunks.Count); + var generators = chunks + .Select, Func>( + (chunk, idx) => async () => { + var result = await InsertSequenceRowsHandleErrors(sequences, chunk, sequencesChunk, throttleSize, retryMode, token).ConfigureAwait(false); + results[idx] = result; + }); + + int taskNum = 0; + await generators.RunThrottled( + throttleSize, + (_) => { + if (chunks.Count > 1) + _logger.LogDebug("{MethodName} completed {NumDone}/{TotalNum} tasks", + nameof(InsertAsync), ++taskNum, chunks.Count); + }, + token).ConfigureAwait(false); + + return CogniteResult.Merge(results); + } + + private static async Task InsertSequenceRowsHandleErrors( + SequencesResource sequences, + IEnumerable toCreate, + int sequencesChunk, + int throttleSize, + RetryMode retryMode, + CancellationToken token) + { + var errors = new List(); + while (toCreate != null && toCreate.Any() && !token.IsCancellationRequested) + { + try + { + using (CdfMetrics.SequenceRows.WithLabels("create")) + { + await sequences.CreateRowsAsync(toCreate, token).ConfigureAwait(false); + } + + _logger.LogDebug("Created {rows} rows for {seq} sequences in CDF", toCreate.Sum(seq => seq.Rows.Count()), toCreate.Count()); + return new CogniteResult(errors); + } + catch (Exception ex) + { + _logger.LogDebug("Failed to create rows for {seq} sequences", toCreate.Count()); + var error = ResultHandlers.ParseException(ex, RequestType.CreateSequenceRows); + if (error.Complete) errors.Add(error); + if (error.Type == ErrorType.FatalFailure + && (retryMode == RetryMode.OnFatal + || retryMode == RetryMode.OnFatalKeepDuplicates)) + { + await Task.Delay(1000, token).ConfigureAwait(false); + } + else if (retryMode == RetryMode.None) break; + else + { + if (!error.Complete) + { + var newErrors = await + ResultHandlers.VerifySequencesFromCDF(sequences, toCreate, sequencesChunk, throttleSize, token) + .ConfigureAwait(false); + foreach (var err in newErrors) + { + errors.Add(err); + var cleaned = ResultHandlers.CleanFromError(err, toCreate); + toCreate = toCreate.Intersect(cleaned); + } + } + else + { + toCreate = ResultHandlers.CleanFromError(error, toCreate); + } + + } + } + } + + return new CogniteResult(errors); + } } } diff --git a/Cognite.Extensions/Sequences/SequenceResultHandlers.cs b/Cognite.Extensions/Sequences/SequenceResultHandlers.cs index 6ed9b8ee..3e075ae7 100644 --- a/Cognite.Extensions/Sequences/SequenceResultHandlers.cs +++ b/Cognite.Extensions/Sequences/SequenceResultHandlers.cs @@ -1,8 +1,11 @@ using CogniteSdk; +using CogniteSdk.Resources; using System; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace Cognite.Extensions { @@ -45,6 +48,7 @@ private static void ParseSequencesException(ResponseException ex, CogniteError e } } } + /// /// Clean list of SequenceCreate objects based on CogniteError /// @@ -104,5 +108,246 @@ public static IEnumerable CleanFromError( } return ret; } + + private static void ParseSequenceRowException(ResponseException ex, CogniteError err) + { + if (ex.Missing?.Any() ?? false) + { + err.Type = ErrorType.ItemMissing; + err.Resource = ResourceType.Id; + err.Values = ex.Missing.Select(dict => + { + if (dict.TryGetValue("id", out var idVal) && idVal is MultiValue.Long longVal) + { + return Identity.Create(longVal.Value); + } + else if (dict.TryGetValue("externalId", out var extIdVal) && extIdVal is MultiValue.String stringVal) + { + return Identity.Create(stringVal.Value); + } + return null; + }).Where(id => id != null); + } + // Error messages are completely different in greenfield and bluefield + else if (ex.Code == 400 && (ex.Message.StartsWith("error in sequence", StringComparison.InvariantCultureIgnoreCase) + || ex.Message.StartsWith("expected", StringComparison.InvariantCultureIgnoreCase))) + { + err.Type = ErrorType.SanitationFailed; + err.Resource = ResourceType.SequenceRow; + err.Complete = false; + } + else if (ex.Code == 404) + { + err.Type = ErrorType.ItemMissing; + err.Resource = ResourceType.ColumnExternalId; + err.Complete = false; + } + } + + /// + /// Clean list of SequenceDataCreates based on error. + /// + /// Error to clean from + /// Sequence data creates to clean + /// Sequences data creates that did not cause + public static IEnumerable CleanFromError( + CogniteError error, + IEnumerable creates) + { + if (creates == null) throw new ArgumentNullException(nameof(creates)); + if (error == null) return creates; + + var ret = new List(); + var skipped = new List(); + + if (!error.Values?.Any() ?? true) + { + error.Values = creates.Select(seq => seq.Id.HasValue ? Identity.Create(seq.Id.Value) : Identity.Create(seq.ExternalId)); + return Enumerable.Empty(); + } + + var items = new HashSet(error.Values, new IdentityComparer()); + + foreach (var seq in creates) + { + bool added = false; + var idt = seq.Id.HasValue ? Identity.Create(seq.Id.Value) : Identity.Create(seq.ExternalId); + switch (error.Resource) + { + case ResourceType.Id: + if (!items.Contains(idt)) added = true; + break; + case ResourceType.SequenceRow: + if (!items.Contains(idt)) added = true; + break; + case ResourceType.ColumnExternalId: + if (!items.Contains(idt)) added = true; + break; + } + if (added) + { + ret.Add(seq); + } + else + { + skipped.Add(seq); + } + } + + if (error.Skipped == null || !error.Skipped.Any()) + { + if (skipped.Any()) + { + error.Skipped = skipped; + } + else + { + error.Skipped = creates; + return Array.Empty(); + } + } + + return ret; + + } + + /// + /// Ensure that the list of sequence row creates correctly match the corresponding sequences in CDF, + /// checks both missing columns and mismatched data types. + /// + /// CogniteSdk Sequences resource + /// SequenceDataCreates to check + /// Chunk size for reading sequences from CDF + /// Number of parallel requests to read sequences from CDF + /// Cancellation token + /// Up to two containing data about failed sequences + public static async Task> VerifySequencesFromCDF( + SequencesResource resource, + IEnumerable creates, + int sequencesChunkSize, + int sequencesThrottleSize, + CancellationToken token) + { + var comparer = new IdentityComparer(); + var createMap = creates + .ToDictionary(seq => seq.Id.HasValue ? Identity.Create(seq.Id.Value) : Identity.Create(seq.ExternalId), comparer); + + var sequences = await resource + .GetByIdsIgnoreErrors(createMap.Keys, sequencesChunkSize, sequencesThrottleSize, token) + .ConfigureAwait(false); + + var sequenceMap = sequences.ToDictionary(seq => + { + var idIdt = Identity.Create(seq.Id); + if (createMap.ContainsKey(idIdt)) return idIdt; + else return Identity.Create(seq.ExternalId); + }, comparer); + + var columnErrors = new List(); + var rowErrors = new List(); + + foreach (var kvp in createMap) + { + var foundSeq = sequenceMap[kvp.Key]; + var create = kvp.Value; + var colMap = foundSeq.Columns.ToDictionary(seq => seq.ExternalId); + + var badColumns = create.Columns.Where(col => !colMap.ContainsKey(col)).ToList(); + if (badColumns.Any()) + { + columnErrors.Add(new SequenceRowError + { + BadRows = create.Rows, + BadColumns = badColumns, + Id = kvp.Key + }); + create.Rows = Enumerable.Empty(); + continue; + } + + var orderedColumns = create.Columns + .Select(col => colMap[col]) + .ToArray(); + + var badRows = new List(); + + // Verify each row in the sequence + foreach (var row in create.Rows) + { + int idx = 0; + var fieldEnum = row.Values.GetEnumerator(); + + while (fieldEnum.MoveNext()) + { + var column = orderedColumns[idx++]; + if (fieldEnum.Current != null && fieldEnum.Current.Type != column.ValueType) + { + badRows.Add(row); + break; + } + } + } + + if (badRows.Any()) + { + rowErrors.Add(new SequenceRowError + { + BadRows = badRows, + Id = kvp.Key + }); + create.Rows = create.Rows.Except(badRows).ToList(); + } + } + + var errors = new List(); + if (columnErrors.Any()) + { + errors.Add(new CogniteError + { + Message = "Columns missing in sequences", + Status = 404, + Data = columnErrors, + Resource = ResourceType.ColumnExternalId, + Type = ErrorType.ItemMissing, + Skipped = columnErrors.Select(seq => createMap[seq.Id]).ToList(), + Values = columnErrors.Select(seq => seq.Id) + }); + } + if (rowErrors.Any()) + { + errors.Add(new CogniteError + { + Message = "Error in sequence rows", + Status = 400, + Data = rowErrors, + Resource = ResourceType.SequenceRowValues, + Type = ErrorType.SanitationFailed, + Skipped = rowErrors.SelectMany(seq => seq.BadRows), + Values = rowErrors + .Where(seq => !createMap[seq.Id].Rows.Any()) + .Select(seq => seq.Id) + .ToList() + }); + } + return errors; + } + } + /// + /// Contains information about skipped rows per sequence in a row insert request + /// + public class SequenceRowError + { + /// + /// Missing columns, if any + /// + public IEnumerable BadColumns { get; set; } + /// + /// Id of skipped sequence + /// + public Identity Id { get; set; } + /// + /// Bad rows + /// + public IEnumerable BadRows { get; set; } } } diff --git a/Cognite.Extensions/Sequences/SequenceSanitation.cs b/Cognite.Extensions/Sequences/SequenceSanitation.cs index 6ec7f039..a15b5a13 100644 --- a/Cognite.Extensions/Sequences/SequenceSanitation.cs +++ b/Cognite.Extensions/Sequences/SequenceSanitation.cs @@ -81,6 +81,58 @@ public static void Sanitize(this SequenceCreate seq) } } + /// + /// Sanitize a SequenceDataCreate object. + /// + /// + public static void Sanitize(this SequenceDataCreate seq) + { + if (seq == null) throw new ArgumentNullException(nameof(seq)); + seq.ExternalId = seq.ExternalId.Truncate(ExternalIdMax); + if (seq.Rows == null) return; + foreach (var row in seq.Rows) row.Sanitize(); + } + + private static IEnumerable Sanitize(this IEnumerable values) + { + if (values == null) yield break; + foreach (var val in values) + { + if (val == null) yield return null; + if (val is MultiValue.String strVal) + { + yield return new MultiValue.String(strVal.Value.Truncate(CogniteUtils.StringLengthMax)); + } + else if (val is MultiValue.Double doubleVal) + { + if (!double.IsNaN(doubleVal.Value) && !double.IsInfinity(doubleVal.Value)) + { + double value = doubleVal.Value; + value = Math.Max(CogniteUtils.NumericValueMin, value); + value = Math.Min(CogniteUtils.NumericValueMax, value); + yield return value == doubleVal.Value ? doubleVal : new MultiValue.Double(value); + } + else + { + yield return null; + } + } + else yield return val; + } + } + + /// + /// Ensure that all row values are valid. i.e. within -1E100 and 1E100, not infinity or NaN, + /// all string values less than 256 characters. + /// + /// + public static void Sanitize(this SequenceRow row) + { + if (row == null) return; + row.Values = row.Values.Sanitize(); + } + + /// /// Check that given SequenceCreate satisfies CDF limits. /// @@ -114,6 +166,53 @@ public static void Sanitize(this SequenceCreate seq) return null; } + /// + /// Check that given SequenceDataCreate satisifes CDF limits and requirements. + /// + /// Sequence data create to check + /// Null if create satisfies limits, otherwise the resource type that fails + public static ResourceType? Verify(this SequenceDataCreate seq) + { + if (seq == null) throw new ArgumentNullException(nameof(seq)); + if (seq.ExternalId == null && seq.Id == null || !seq.ExternalId.CheckLength(ExternalIdMax)) return ResourceType.ExternalId; + if (seq.Columns == null || !seq.Columns.Any()) return ResourceType.SequenceColumns; + if (seq.Rows == null || !seq.Rows.Any()) return ResourceType.SequenceRows; + return null; + } + + /// + /// Check that the given sequence row is valid + /// + /// Row to check + /// Sequence this row belongs to, should be verified + /// + public static ResourceType? Verify(this SequenceRow row, SequenceDataCreate seq) + { + if (seq == null) throw new ArgumentNullException(nameof(seq)); + if (row == null) return ResourceType.SequenceRow; + if (row.Values == null) return ResourceType.SequenceRowValues; + if (row.Values.Count() != seq.Columns.Count()) return ResourceType.SequenceRowValues; + foreach (var val in row.Values) + { + if (row.RowNumber < 0) + { + return ResourceType.SequenceRowNumber; + } + if (val == null) continue; + if (val is MultiValue.Double doubleVal) + { + if (double.IsNaN(doubleVal.Value) || double.IsInfinity(doubleVal.Value) + || doubleVal.Value > CogniteUtils.NumericValueMax || doubleVal.Value < CogniteUtils.NumericValueMin) + return ResourceType.SequenceRowValues; + } + else if (val is MultiValue.String stringVal) + { + if (!stringVal.Value.CheckLength(CogniteUtils.StringLengthMax)) return ResourceType.SequenceRowValues; + } + } + return null; + } + /// /// Clean list of SequenceCreate objects, sanitizing each and removing any duplicates. /// The first encountered duplicate is kept. @@ -126,6 +225,7 @@ public static (IEnumerable, IEnumerable) CleanSequ IEnumerable sequences, SanitationMode mode) { + if (mode == SanitationMode.None) return (sequences, Enumerable.Empty()); if (sequences == null) throw new ArgumentNullException(nameof(sequences)); var result = new List(); @@ -230,5 +330,194 @@ public static (IEnumerable, IEnumerable) CleanSequ } return (result, errors); } + + /// + /// Clean list of SequenceDataCreate objects, sanitizing each and removing any duplicates. + /// The first encountered duplicate is kept. + /// Invalid sequences due to duplicate column externalIds or other fatal issues are also removed. + /// Invalid rows are removed individually. + /// + /// SequenceCreate request to clean + /// The type of sanitation to apply + /// Cleaned create request and optional errors if any ids were duplicated + public static (IEnumerable, IEnumerable) CleanSequenceDataRequest( + IEnumerable sequences, + SanitationMode mode) + { + if (mode == SanitationMode.None) return (sequences, Enumerable.Empty()); + if (sequences == null) throw new ArgumentNullException(nameof(sequences)); + + var result = new List(); + var errors = new List(); + + var comparer = new IdentityComparer(); + + var ids = new HashSet(comparer); + var duplicated = new HashSet(comparer); + var bad = new List<(ResourceType, SequenceDataCreate)>(); + + var badRowSequences = new List<(ResourceType, SequenceRowError)>(); + + foreach (var seq in sequences) + { + var columns = new HashSet(); + var duplicatedColumns = new HashSet(); + bool toAdd = true; + + if (mode == SanitationMode.Clean) + { + seq.Sanitize(); + } + var failedField = seq.Verify(); + if (failedField.HasValue) + { + bad.Add((failedField.Value, seq)); + toAdd = false; + } + + var idt = seq.Id.HasValue ? Identity.Create(seq.Id.Value) : Identity.Create(seq.ExternalId); + + if (!ids.Add(idt)) + { + duplicated.Add(idt); + toAdd = false; + } + + var badRows = new List<(ResourceType, SequenceRow)>(); + + var rowNums = new HashSet(); + var duplicateRows = new List(); + + if (seq.Columns != null && seq.Rows != null) + { + var goodRows = new List(seq.Rows.Count()); + foreach (var row in seq.Rows) + { + bool addRow = true; + failedField = row.Verify(seq); + if (failedField.HasValue) + { + badRows.Add((failedField.Value, row)); + addRow = false; + } + + if (!rowNums.Add(row.RowNumber)) + { + duplicateRows.Add(row); + addRow = false; + } + + if (addRow) + { + goodRows.Add(row); + } + else + { + CdfMetrics.SequenceRowsSkipped.Inc(); + } + } + seq.Rows = goodRows; + if (!seq.Rows.Any()) + { + bad.Add((ResourceType.SequenceRows, seq)); + toAdd = false; + } + } + + if (seq.Columns != null) + { + foreach (var col in seq.Columns) + { + if (col == null) + { + bad.Add((ResourceType.ColumnExternalId, seq)); + break; + } + if (!columns.Add(col)) + { + duplicatedColumns.Add(col); + toAdd = false; + } + } + } + + if (duplicatedColumns.Any()) + { + errors.Add(new CogniteError + { + Status = 409, + Message = "Duplicate columns", + Resource = ResourceType.ColumnExternalId, + Type = ErrorType.ItemDuplicated, + Values = duplicatedColumns.Select(col => Identity.Create(col)), + Skipped = new[] { seq } + }); + } + if (duplicateRows.Any()) + { + errors.Add(new CogniteError + { + Status = 409, + Message = "Duplicate row numbers", + Resource = ResourceType.SequenceRowNumber, + Type = ErrorType.ItemDuplicated, + Values = duplicateRows.Select(row => Identity.Create(row.RowNumber)), + Skipped = duplicateRows + }); + } + + if (badRows.Any()) + { + badRowSequences.AddRange(badRows.GroupBy(pair => pair.Item1).Select(group => ( + group.Key, + new SequenceRowError + { + Id = idt, + BadRows = group.Select(pair => pair.Item2).ToList() + } + ))); + } + + if (toAdd) + { + result.Add(seq); + } + + } + + if (duplicated.Any()) + { + errors.Add(new CogniteError + { + Status = 409, + Message = "Duplicate internal or external ids", + Resource = ResourceType.Id, + Type = ErrorType.ItemDuplicated, + Values = duplicated.ToArray() + }); + } + if (bad.Any()) + { + errors.AddRange(bad.GroupBy(pair => pair.Item1).Select(group => new CogniteError + { + Skipped = group.Select(pair => pair.Item2).ToList(), + Resource = group.Key, + Type = ErrorType.SanitationFailed, + Status = 400 + })); + } + if (badRowSequences.Any()) + { + errors.AddRange(badRowSequences.GroupBy(pair => pair.Item1).Select(group => new CogniteError + { + Skipped = group.SelectMany(pair => pair.Item2.BadRows), + Resource = group.Key, + Type = ErrorType.SanitationFailed, + Status = 400, + Data = group.Select(pair => pair.Item2) + })); + } + return (result, errors); + } } } diff --git a/Cognite.Extensions/TimeSeries/TimeSeriesSanitation.cs b/Cognite.Extensions/TimeSeries/TimeSeriesSanitation.cs index e6b78cbf..cc28e8ef 100644 --- a/Cognite.Extensions/TimeSeries/TimeSeriesSanitation.cs +++ b/Cognite.Extensions/TimeSeries/TimeSeriesSanitation.cs @@ -92,6 +92,7 @@ public static (IEnumerable, IEnumerable) CleanTi IEnumerable timeseries, SanitationMode mode) { + if (mode == SanitationMode.None) return (timeseries, Enumerable.Empty()); if (timeseries == null) { throw new ArgumentNullException(nameof(timeseries)); diff --git a/ExtractorUtils.Test/CDFTester.cs b/ExtractorUtils.Test/CDFTester.cs index 5e9f4c2c..ce1ef14f 100644 --- a/ExtractorUtils.Test/CDFTester.cs +++ b/ExtractorUtils.Test/CDFTester.cs @@ -25,6 +25,7 @@ class CDFTester : IDisposable public string Project { get; private set; } public string Host { get; private set; } public string Prefix { get; private set; } + public BaseConfig Config { get; } private readonly string _configPath; public CDFTester(string[] config) @@ -33,7 +34,7 @@ public CDFTester(string[] config) _configPath = $"test-config-{Interlocked.Increment(ref _configIdx)}"; System.IO.File.WriteAllLines(_configPath, config); var services = new ServiceCollection(); - services.AddConfig(_configPath, 2); + Config = services.AddConfig(_configPath, 2); services.AddLogger(); services.AddCogniteClient("net-extractor-utils-test", userAgent: "Utils-Tests/v1.0.0 (Test)"); Provider = services.BuildServiceProvider(); @@ -86,6 +87,8 @@ public static string[] GetConfig(CogniteHost host) " assets: 20", " events: 20", " sequences: 10", + " sequence-row-sequences: 10", + " sequence-rows: 100", " cdf-throttling:", " time-series: 2", " assets: 2", diff --git a/ExtractorUtils.Test/integration/SequenceIntegrationTest.cs b/ExtractorUtils.Test/integration/SequenceIntegrationTest.cs index ad4fd90d..d942c9c3 100644 --- a/ExtractorUtils.Test/integration/SequenceIntegrationTest.cs +++ b/ExtractorUtils.Test/integration/SequenceIntegrationTest.cs @@ -330,5 +330,450 @@ await tester.Destination.EnsureSequencesExistsAsync(new[] await SafeDelete(ids, tester); } } + + + private async Task<(string extId, long id)[]> CreateTestSequences(CDFTester tester) + { + var columns = new[] + { + new SequenceColumnWrite + { + ExternalId = "col1", + ValueType = MultiValueType.DOUBLE + }, + new SequenceColumnWrite + { + ExternalId = "col2", + ValueType = MultiValueType.LONG + }, + new SequenceColumnWrite + { + ExternalId = "col3", + ValueType = MultiValueType.STRING + } + }; + + var sequences = new[] { + new SequenceCreate + { + ExternalId = $"{tester.Prefix} test-create-rows-1", + Columns = columns + }, + new SequenceCreate + { + ExternalId = $"{tester.Prefix} test-create-rows-2", + Columns = columns + }, + new SequenceCreate + { + ExternalId = $"{tester.Prefix} test-create-rows-3", + Columns = columns + } + }; + var results = await tester.Destination.EnsureSequencesExistsAsync(sequences, RetryMode.None, SanitationMode.None, tester.Source.Token); + return results.Results.Select(seq => (seq.ExternalId, seq.Id)).ToArray(); + } + + [Theory] + [InlineData(CogniteHost.GreenField)] + [InlineData(CogniteHost.BlueField)] + public async Task TestRowsCreate(CogniteHost host) + { + using var tester = new CDFTester(host); + var ids = await CreateTestSequences(tester); + var columns = new[] { "col1", "col3", "col2" }; + var writes = new[] + { + new SequenceDataCreate + { + Columns = columns, + ExternalId = ids[0].extId, + Rows = Enumerable.Range(0, 10).Select(i => + new SequenceRow + { + RowNumber = i, + Values = new MultiValue[] { MultiValue.Create(123.2), MultiValue.Create("test"), MultiValue.Create(i) } + }).ToArray() + }, + new SequenceDataCreate + { + Columns = columns, + Id = ids[1].id, + Rows = Enumerable.Range(0, 10).Select(i => + new SequenceRow + { + RowNumber = i, + Values = new MultiValue[] { MultiValue.Create(123.2), MultiValue.Create("test"), MultiValue.Create(i) } + }).ToArray() + }, + new SequenceDataCreate + { + Columns = columns, + ExternalId = ids[2].extId, + Rows = Enumerable.Range(0, 10).Select(i => + new SequenceRow + { + RowNumber = i, + Values = new MultiValue[] { MultiValue.Create(123.2), MultiValue.Create("test"), MultiValue.Create(i) } + }).ToArray() + } + }; + tester.Config.Cognite.CdfChunking.SequenceRowSequences = 2; + tester.Config.Cognite.CdfChunking.SequenceRows = 5; + + try + { + var result = await tester.Destination.InsertSequenceRowsAsync(writes, RetryMode.None, SanitationMode.None, tester.Source.Token); + Assert.Empty(result.Errors); + + bool found = true; + int[] counts = new int[3]; + // Need to potentially wait a while for results to show... + // For some reason this is much slower on greenfield + for (int i = 0; i < 10; i++) + { + found = true; + for (int j = 0; j < 3; j++) + { + var retrieved = await tester.Destination.CogniteClient.Sequences.ListRowsAsync( + new SequenceRowQuery + { + Limit = 1000, + ExternalId = ids[j].extId + }, tester.Source.Token); + found &= retrieved.Rows.Count() == 10; + counts[j] = retrieved.Rows.Count(); + Assert.Null(retrieved.NextCursor); + } + if (found) break; + await Task.Delay(1000); + } + Assert.True(found, string.Join(',', counts)); + } + finally + { + await SafeDelete(ids.Select(id => id.extId).ToArray(), tester); + } + } + + [Theory] + [InlineData(CogniteHost.GreenField)] + [InlineData(CogniteHost.BlueField)] + public async Task TestRowsSanitation(CogniteHost host) + { + using var tester = new CDFTester(host); + var ids = await CreateTestSequences(tester); + + var columns = new[] { "col1", "col3", "col2" }; + + SequenceRow GetRow(int num, double dVal, long lVal, string sVal) + { + return new SequenceRow + { + RowNumber = num, + Values = new MultiValue[] { MultiValue.Create(dVal), MultiValue.Create(sVal), MultiValue.Create(lVal) } + }; + } + + + var creates1 = new[] + { + // Duplicate extId + new SequenceDataCreate + { + ExternalId = ids[0].extId, + Columns = columns, + Rows = new[] + { + GetRow(0, 123.4, 1, "test") + } + }, + new SequenceDataCreate + { + ExternalId = ids[0].extId, + Columns = columns, + Rows = new[] + { + GetRow(0, 123.4, 1, "test") + } + }, + // Duplicate internalId + new SequenceDataCreate + { + Id = ids[1].id, + Columns = columns, + Rows = new[] + { + GetRow(0, 123.4, 1, "test") + } + }, + new SequenceDataCreate + { + Id = ids[1].id, + Columns = columns, + Rows = new[] + { + GetRow(0, 123.4, 1, "test") + } + }, + // Duplicate columns + new SequenceDataCreate + { + Id = ids[2].id, + Columns = new string[] { columns[0], columns[0], columns[0] }, + Rows = new [] + { + GetRow(0, 123.4, 1, "test") + } + } + }; + + SequenceDataCreate[] GetCreates2() + { + return new[] + { + // Misc bad rows + new SequenceDataCreate + { + ExternalId = ids[0].extId, + Columns = columns, + Rows = new [] + { + // Bad double + GetRow(0, double.NaN, 1, "test"), + // Too large double + GetRow(1, 1E101, 1, "test"), + // Too long string + GetRow(2, 123.4, 1, new string('æ', 300)), + // Duplicate row number + GetRow(3, 123.4, 1, "test"), + GetRow(3, 123.4, 1, "test"), + // Negative row number + GetRow(-1, 123.4, 1, "test"), + // Null values + new SequenceRow { RowNumber = 4, Values = null }, + // Too few values + new SequenceRow { RowNumber = 5, Values = new MultiValue[] { null, null } } + } + }, + // Null columns + new SequenceDataCreate + { + ExternalId = ids[1].extId, + Columns = null, + Rows = new [] + { + GetRow(0, 123.4, 1, "test") + } + }, + // Null rows + new SequenceDataCreate + { + ExternalId = ids[2].extId, + Columns = columns, + Rows = null + } + }; + } + + + try + { + var result = await tester.Destination.InsertSequenceRowsAsync(creates1, RetryMode.OnError, SanitationMode.Remove, tester.Source.Token); + var errs = result.Errors.ToArray(); + Assert.Equal(2, errs.Length); + Assert.Equal(ResourceType.ColumnExternalId, errs[0].Resource); + Assert.Equal(ErrorType.ItemDuplicated, errs[0].Type); + Assert.Single(errs[0].Skipped); + Assert.Equal(ResourceType.Id, errs[1].Resource); + Assert.Equal(ErrorType.ItemDuplicated, errs[1].Type); + Assert.Equal(2, errs[1].Values.Count()); + + var creates2 = GetCreates2(); + result = await tester.Destination.InsertSequenceRowsAsync(creates2, RetryMode.OnError, SanitationMode.Remove, tester.Source.Token); + errs = result.Errors.ToArray(); + Assert.Equal(5, errs.Length); + Assert.Equal(ResourceType.SequenceRowNumber, errs[0].Resource); + Assert.Equal(ErrorType.ItemDuplicated, errs[0].Type); + Assert.Single(errs[0].Skipped); + + Assert.Equal(ResourceType.SequenceColumns, errs[1].Resource); + Assert.Equal(ErrorType.SanitationFailed, errs[1].Type); + Assert.Single(errs[1].Skipped); + + Assert.Equal(ResourceType.SequenceRows, errs[2].Resource); + Assert.Equal(ErrorType.SanitationFailed, errs[2].Type); + Assert.Single(errs[2].Skipped); + + Assert.Equal(ResourceType.SequenceRowValues, errs[3].Resource); + Assert.Equal(ErrorType.SanitationFailed, errs[3].Type); + Assert.Equal(5, errs[3].Skipped.Count()); + + Assert.Equal(ResourceType.SequenceRowNumber, errs[4].Resource); + Assert.Equal(ErrorType.SanitationFailed, errs[4].Type); + Assert.Single(errs[4].Skipped); + + // The inserts are modified in-place + creates2 = GetCreates2(); + result = await tester.Destination.InsertSequenceRowsAsync(creates2, RetryMode.OnError, SanitationMode.Clean, tester.Source.Token); + errs = result.Errors.ToArray(); + Assert.Equal(5, errs.Length); + Assert.Equal(ResourceType.SequenceRowNumber, errs[0].Resource); + Assert.Equal(ErrorType.ItemDuplicated, errs[0].Type); + Assert.Single(errs[0].Skipped); + + Assert.Equal(ResourceType.SequenceColumns, errs[1].Resource); + Assert.Equal(ErrorType.SanitationFailed, errs[1].Type); + Assert.Single(errs[1].Skipped); + + Assert.Equal(ResourceType.SequenceRows, errs[2].Resource); + Assert.Equal(ErrorType.SanitationFailed, errs[2].Type); + Assert.Single(errs[2].Skipped); + + Assert.Equal(ResourceType.SequenceRowNumber, errs[3].Resource); + Assert.Equal(ErrorType.SanitationFailed, errs[3].Type); + Assert.Single(errs[3].Skipped); + + // Three of the bad rows have now been cleaned and should not be removed + Assert.Equal(ResourceType.SequenceRowValues, errs[4].Resource); + Assert.Equal(ErrorType.SanitationFailed, errs[4].Type); + Assert.Equal(2, errs[4].Skipped.Count()); + } + finally + { + await SafeDelete(ids.Select(id => id.extId).ToArray(), tester); + } + } + + [Theory] + [InlineData(CogniteHost.GreenField)] + [InlineData(CogniteHost.BlueField)] + public async Task TestRowsErrorHandling(CogniteHost host) + { + using var tester = new CDFTester(host); + var ids = await CreateTestSequences(tester); + + var columns = new[] { "col1", "col3", "col2" }; + + SequenceRow GetRow(int num, double dVal, long lVal, string sVal) + { + return new SequenceRow + { + RowNumber = num, + Values = new MultiValue[] { MultiValue.Create(dVal), MultiValue.Create(sVal), MultiValue.Create(lVal) } + }; + } + + var creates = new[] + { + // Mismatched data types + new SequenceDataCreate + { + ExternalId = ids[0].extId, + Columns = columns, + Rows = new[] + { + new SequenceRow + { + // String for double + RowNumber = 0, Values = new MultiValue[] { MultiValue.Create("string"), null, null } + }, + new SequenceRow + { + // string for long + RowNumber = 1, Values = new MultiValue[] { null, MultiValue.Create("string"), null } + }, + new SequenceRow + { + // number for string + RowNumber = 2, Values = new MultiValue[] { null, null, MultiValue.Create(123.4) } + }, + new SequenceRow + { + // long for double (should be OK) + RowNumber = 3, Values = new MultiValue[] { MultiValue.Create(123), MultiValue.Create(123), null } + }, + new SequenceRow + { + // double for long + RowNumber = 4, Values = new MultiValue[] { MultiValue.Create(123.4), MultiValue.Create(123.4), null } + } + } + }, + // Missing columns + new SequenceDataCreate + { + ExternalId = ids[1].extId, + Columns = new [] { "col1", "col4", "col5" }, + Rows = new[] + { + GetRow(0, 123.4, 123, "test"), + GetRow(1, 123.4, 123, "test") + } + }, + // Missing extId + new SequenceDataCreate + { + ExternalId = "missing-sequence", + Columns = columns, + Rows = new[] { GetRow(0, 123.4, 123, "test") } + }, + // Missing id + new SequenceDataCreate + { + Id = 123, + Columns = columns, + Rows = new[] { GetRow(0, 123.4, 123, "test") } + }, + // All rows bad + new SequenceDataCreate + { + ExternalId = ids[2].extId, + Columns = columns, + Rows = new[] + { + new SequenceRow + { + // String for double + RowNumber = 0, Values = new MultiValue[] { MultiValue.Create("string"), null, null } + }, + new SequenceRow + { + // string for long + RowNumber = 1, Values = new MultiValue[] { null, MultiValue.Create("string"), null } + } + } + }, + }; + + try + { + var result = await tester.Destination.InsertSequenceRowsAsync(creates, RetryMode.OnError, SanitationMode.None, tester.Source.Token); + var errs = result.Errors.ToArray(); + Assert.Equal(3, errs.Length); + + Assert.Equal(ResourceType.Id, errs[0].Resource); + Assert.Equal(ErrorType.ItemMissing, errs[0].Type); + Assert.Equal(2, errs[0].Values.Count()); + Assert.Equal(2, errs[0].Skipped.Count()); + + Assert.Equal(ResourceType.ColumnExternalId, errs[1].Resource); + Assert.Equal(ErrorType.ItemMissing, errs[1].Type); + Assert.Single(errs[1].Skipped); + Assert.Single(errs[1].Data); + Assert.Equal(2, errs[1].Data.OfType().Sum(err => err.BadRows.Count())); + Assert.Equal(2, errs[1].Data.OfType().Sum(err => err.BadColumns.Count())); + + Assert.Equal(ResourceType.SequenceRowValues, errs[2].Resource); + Assert.Equal(ErrorType.SanitationFailed, errs[2].Type); + Assert.Equal(5, errs[2].Skipped.Count()); + Assert.Single(errs[2].Values); + Assert.Equal(2, errs[2].Data.Count()); + } + finally + { + await SafeDelete(ids.Select(id => id.extId).ToArray(), tester); + } + } } } diff --git a/ExtractorUtils.Test/unit/SanitationTest.cs b/ExtractorUtils.Test/unit/SanitationTest.cs index 2f543b09..76247fa7 100644 --- a/ExtractorUtils.Test/unit/SanitationTest.cs +++ b/ExtractorUtils.Test/unit/SanitationTest.cs @@ -343,6 +343,95 @@ public void TestVerifySequence() } Assert.Null(seq.Verify()); } + + [Fact] + public void TestSanitizeSequenceData() + { + var rows = Enumerable.Range(0, 100).Select(num => new SequenceRow + { + RowNumber = num, + Values = new MultiValue[] + { + new MultiValue.String(new string('æ', 500)), + new MultiValue.Long(123), + new MultiValue.Double(double.PositiveInfinity), + new MultiValue.Double(double.NegativeInfinity), + new MultiValue.Double(double.MinValue), + new MultiValue.Double(double.MaxValue), + new MultiValue.Double(double.NaN), + null + } + }).ToList(); + var seq = new SequenceDataCreate + { + ExternalId = new string('æ', 500), + Rows = rows + }; + + seq.Sanitize(); + + Assert.Equal(new string('æ', 255), seq.ExternalId); + foreach (var row in rows) + { + Assert.Equal(new string('æ', 255), (row.Values.ElementAt(0) as MultiValue.String).Value); + Assert.Equal(123, (row.Values.ElementAt(1) as MultiValue.Long).Value); + Assert.Null(row.Values.ElementAt(2)); + Assert.Null(row.Values.ElementAt(3)); + Assert.Equal(-1E100, (row.Values.ElementAt(4) as MultiValue.Double).Value); + Assert.Equal(1E100, (row.Values.ElementAt(5) as MultiValue.Double).Value); + Assert.Null(row.Values.ElementAt(6)); + Assert.Null(row.Values.ElementAt(7)); + } + } + + [Fact] + public void TestVerifySequenceData() + { + var removeFields = new[] + { + ResourceType.ExternalId, ResourceType.ExternalId, ResourceType.SequenceColumns, + ResourceType.SequenceColumns, ResourceType.SequenceRows, ResourceType.SequenceRows + }; + var seq = new SequenceDataCreate + { + Columns = Array.Empty(), + Rows = Array.Empty(), + ExternalId = new string('æ', 300) + }; + + foreach (var field in removeFields) + { + var errType = seq.Verify(); + Assert.Equal(field, errType); + switch (errType) + { + case ResourceType.ExternalId: + if (seq.ExternalId == null) seq.ExternalId = "test"; + else seq.ExternalId = null; + break; + case ResourceType.SequenceColumns: + if (seq.Columns == null) seq.Columns = new[] + { + "test" + }; + else seq.Columns = null; + break; + case ResourceType.SequenceRows: + if (seq.Rows == null) seq.Rows = new[] + { + new SequenceRow + { + RowNumber = 1, + Values = new MultiValue[] { null } + } + }; + else seq.Rows = null; + break; + + } + } + + } [Theory] [InlineData("æææææ", 4)] [InlineData("123412341234", 9)] @@ -466,5 +555,103 @@ public void TestSanitizeSequenceRequest() Assert.Single(err.Skipped); Assert.Single(err.Values); } + [Fact] + public void TestSanitizeSequenceDataRequest() + { + var defCols = new[] { "test" }; + var defRows = new[] { new SequenceRow { RowNumber = 1, Values = new MultiValue[] { null } } }; + + var sequences = new[] + { + // Duplicate externalId + new SequenceDataCreate { ExternalId = "test1", Columns = defCols, Rows = defRows }, + new SequenceDataCreate { ExternalId = "test1", Columns = defCols, Rows = defRows }, + // Duplicate internalId + new SequenceDataCreate { Id = 1, Columns = defCols, Rows = defRows }, + new SequenceDataCreate { Id = 1, Columns = defCols, Rows = defRows }, + // Null columns + new SequenceDataCreate { ExternalId = "test2", Columns = null, Rows = defRows }, + // Null rows + new SequenceDataCreate { ExternalId = "test3", Columns = defCols, Rows = null }, + // Duplicate row numbers + new SequenceDataCreate { ExternalId = "test4", Columns = defCols, Rows = new [] + { + defRows[0], defRows[0] + } }, + // Invalid row due to wrong number of fields + new SequenceDataCreate { ExternalId = "test5", Columns = defCols, Rows = new [] + { + defRows[0], new SequenceRow { RowNumber = 2, Values = new MultiValue[] { null, null } } + } }, + // Invalid row due to failed validation + new SequenceDataCreate { ExternalId = "test6", Columns = defCols, Rows = new [] + { + defRows[0], new SequenceRow {RowNumber = 2, Values = new MultiValue[] { MultiValue.Create(double.NaN) }} + } }, + // Invalid row due to bad row number + new SequenceDataCreate { ExternalId = "test7", Columns = defCols, Rows = new [] + { + defRows[0], new SequenceRow {RowNumber = -50, Values = new MultiValue[] { null }} + } }, + // All rows invalid + new SequenceDataCreate { ExternalId = "test8", Columns = defCols, Rows = new [] + { + new SequenceRow {RowNumber = 2, Values = new MultiValue[] { MultiValue.Create(double.PositiveInfinity) }} + } }, + // Duplicated columns + new SequenceDataCreate { ExternalId = "test9", Columns = new [] { "test", "test", "test2" }, Rows = new [] + { + new SequenceRow { RowNumber = 1, Values = new MultiValue[] { null, null, null } } + } } + }; + + var (result, errors) = Sanitation.CleanSequenceDataRequest(sequences, SanitationMode.Remove); + Assert.Equal(6, result.Count()); + Assert.Equal(7, errors.Count()); + + var errs = errors.ToList(); + + var err = errs[0]; + Assert.Equal(ResourceType.SequenceRowNumber, err.Resource); + Assert.Equal(ErrorType.ItemDuplicated, err.Type); + Assert.Equal(409, err.Status); + Assert.Single(err.Values); + Assert.Single(err.Skipped); + + err = errs[1]; + Assert.Equal(ResourceType.ColumnExternalId, err.Resource); + Assert.Equal(ErrorType.ItemDuplicated, err.Type); + Assert.Equal(409, err.Status); + Assert.Single(err.Values); + Assert.Single(err.Skipped); + + err = errs[2]; + Assert.Equal(ResourceType.Id, err.Resource); + Assert.Equal(ErrorType.ItemDuplicated, err.Type); + Assert.Equal(409, err.Status); + Assert.Equal(2, err.Values.Count()); + + err = errs[3]; + Assert.Equal(ResourceType.SequenceColumns, err.Resource); + Assert.Equal(ErrorType.SanitationFailed, err.Type); + Assert.Single(err.Skipped); + + err = errs[4]; + Assert.Equal(ResourceType.SequenceRows, err.Resource); + Assert.Equal(ErrorType.SanitationFailed, err.Type); + Assert.Equal(2, err.Skipped.Count()); + + err = errs[5]; + Assert.Equal(ResourceType.SequenceRowValues, err.Resource); + Assert.Equal(ErrorType.SanitationFailed, err.Type); + Assert.Equal(3, err.Skipped.Count()); + Assert.Equal(3, err.Data.Count()); + + err = errs[6]; + Assert.Equal(ResourceType.SequenceRowNumber, err.Resource); + Assert.Equal(ErrorType.SanitationFailed, err.Type); + Assert.Single(err.Skipped); + Assert.Single(err.Data); + } } } diff --git a/ExtractorUtils/Cognite/CogniteDestination.cs b/ExtractorUtils/Cognite/CogniteDestination.cs index fdee5d50..43678bab 100644 --- a/ExtractorUtils/Cognite/CogniteDestination.cs +++ b/ExtractorUtils/Cognite/CogniteDestination.cs @@ -761,6 +761,37 @@ public async Task> EnsureSequencesExistsAsync( sanitationMode, token).ConfigureAwait(false); } + + /// + /// Insert the given list of rows into CDF. + /// Both individual rows and full sequences can be removed due to mismatched datatypes, + /// duplicated externalIds, or similar, by setting + /// and . + /// + /// Sequences with rows to insert + /// How to handle retries. Keeping duplicates is not valid for this method. + /// The type of sanitation to apply to sequences before creating. + /// Errors that are normally handled by sanitation will not be handled if received from CDF. + /// Cancellation token + /// A containing errors that occured during insertion + public async Task InsertSequenceRowsAsync( + IEnumerable sequences, + RetryMode retryMode, + SanitationMode sanitationMode, + CancellationToken token) + { + _logger.LogInformation("Inserting {Rows} rows for {Seq} sequences into CDF", + sequences.Sum(seq => seq.Rows?.Count() ?? 0), sequences.Count()); + return await _client.Sequences.InsertAsync( + sequences, + _config.CdfChunking.SequenceRowSequences, + _config.CdfChunking.SequenceRows, + _config.CdfChunking.Sequences, + _config.CdfThrottling.Sequences, + retryMode, + sanitationMode, + token).ConfigureAwait(false); + } #endregion } } \ No newline at end of file diff --git a/ExtractorUtils/Configuration/BaseConfig.cs b/ExtractorUtils/Configuration/BaseConfig.cs index 302d0307..2e0afd90 100644 --- a/ExtractorUtils/Configuration/BaseConfig.cs +++ b/ExtractorUtils/Configuration/BaseConfig.cs @@ -273,6 +273,16 @@ public class ChunkingConfig /// Maximum number of sequences to create or retrieve per request /// public int Sequences { get; set; } = 1_000; + + /// + /// Maximum number of sequences per row insert request. + /// + public int SequenceRowSequences { get; set; } = 1000; + + /// + /// Maximum number of rows per sequence per row insert request + /// + public int SequenceRows { get; set; } = 10_000; } /// diff --git a/ExtractorUtils/config/config.example.yml b/ExtractorUtils/config/config.example.yml index a094a14b..0740d5b9 100644 --- a/ExtractorUtils/config/config.example.yml +++ b/ExtractorUtils/config/config.example.yml @@ -70,6 +70,10 @@ cognite: events: 1000 # Maximum number of sequences per get/create sequences request sequences: 1000 + # Maximum number of sequences per create sequence rows request + sequence-row-sequences: 1000 + # Maximum number of rows per sequence when creating rows + sequence-rows: 10000 # Configure how requests to CDF should be throttled cdf-throttling: # Maximum number of parallel requests per timeseries operation