Skip to content

Commit

Permalink
Sequence rows support (#106)
Browse files Browse the repository at this point in the history
Error handling and sanitation when inserting. Also added a method which is called when receiving certain errors that verify the contents of the sequence by comparing data types to column types in CDF.
  • Loading branch information
einarmo authored Sep 9, 2021
1 parent 527c612 commit b142258
Show file tree
Hide file tree
Showing 15 changed files with 1,385 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cognite.Extensions/Assets/AssetSanitation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public static (IEnumerable<AssetCreate>, IEnumerable<CogniteError>) CleanAssetRe
IEnumerable<AssetCreate> assets,
SanitationMode mode)
{
if (mode == SanitationMode.None) return (assets, Enumerable.Empty<CogniteError>());
if (assets == null)
{
throw new ArgumentNullException(nameof(assets));
Expand Down
4 changes: 4 additions & 0 deletions Cognite.Extensions/CdfMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ static class CdfMetrics
"Number and duration of raw requests to CDF", "endpoint");
public static Summary Sequences { get; } = Metrics.CreateSummary("extractor_utils_cdf_sequence_requests",
"Number and duration of sequence requests to CDF", "endpoint");
public static Summary SequenceRows { get; } = Metrics.CreateSummary("extractor_utils_cdf_sequence_row_requests",
"Number and duration of sequence row requests to CDF", "endpoint");

public static Counter AssetsSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_assets_skipped",
"Number of assets skipped due to errors");
Expand All @@ -25,5 +27,7 @@ static class CdfMetrics
"Number of events skipped due to errors");
public static Counter SequencesSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_sequences_skipped",
"Number of sequences skipped due to errors");
public static Counter SequenceRowsSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_sequence_rows_skipped",
"Number of sequence rows skipped due to errors");
}
}
30 changes: 29 additions & 1 deletion Cognite.Extensions/CogniteResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public static CogniteError ParseException(Exception ex, RequestType type)
{
ParseSequencesException(rex, result);
}
else if (type == RequestType.CreateSequenceRows)
{
ParseSequenceRowException(rex, result);
}
return result;
}
else
Expand Down Expand Up @@ -222,6 +226,10 @@ public class CogniteError
/// </summary>
public IEnumerable<object> Skipped { get; set; }
/// <summary>
/// Further information about the error, for some errors.
/// </summary>
public IEnumerable<object> Data { get; set; }
/// <summary>
/// Exception that caused this error, if any.
/// </summary>
public Exception Exception { get; set; }
Expand Down Expand Up @@ -357,6 +365,22 @@ public enum ResourceType
/// </summary>
ColumnMetadata,
/// <summary>
/// Collection of rows when creating in sequence
/// </summary>
SequenceRows,
/// <summary>
/// Row in a sequence
/// </summary>
SequenceRow,
/// <summary>
/// Values of a sequence row
/// </summary>
SequenceRowValues,
/// <summary>
/// Row number of a sequence row
/// </summary>
SequenceRowNumber,
/// <summary>
/// None or unknown
/// </summary>
None = -1
Expand All @@ -381,7 +405,11 @@ public enum RequestType
/// <summary>
/// Create sequences
/// </summary>
CreateSequences
CreateSequences,
/// <summary>
/// Create sequence rows
/// </summary>
CreateSequenceRows
}


Expand Down
1 change: 1 addition & 0 deletions Cognite.Extensions/CogniteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ public static void AddExtensionLoggers(this IServiceProvider provider)
TimeSeriesExtensions.SetLogger(logger);
RawExtensions.SetLogger(logger);
EventExtensions.SetLogger(logger);
SequenceExtensions.SetLogger(logger);
}
}

Expand Down
1 change: 1 addition & 0 deletions Cognite.Extensions/Events/EventSanitation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public static (IEnumerable<EventCreate>, IEnumerable<CogniteError>) CleanEventRe
IEnumerable<EventCreate> events,
SanitationMode mode)
{
if (mode == SanitationMode.None) return (events, Enumerable.Empty<CogniteError>());
if (events == null)
{
throw new ArgumentNullException(nameof(events));
Expand Down
133 changes: 133 additions & 0 deletions Cognite.Extensions/Sequences/SequenceExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -332,5 +332,138 @@ private static async Task<CogniteResult<Sequence>> CreateSequencesHandleErrors(
}
return new CogniteResult<Sequence>(errors, null);
}

/// <summary>
/// Insert sequence rows into given list of sequences.
/// Chunks by both number of sequences per request, and number of rows per sequence.
/// Optionally sanitizes the request, and handles errors that occur while running.
/// </summary>
/// <param name="sequences">CogniteSdk sequence resource object</param>
/// <param name="toCreate">List of sequences and rows to create</param>
/// <param name="keyChunkSize">Maximum number of sequences in each request</param>
/// <param name="valueChunkSize">Maximum number of sequence rows per sequence</param>
/// <param name="sequencesChunk">Maximum number of sequences to read at a time if reading to handle errors</param>
/// <param name="throttleSize">Maximum number of parallel requests</param>
/// <param name="retryMode">How to handle errors</param>
/// <param name="sanitationMode">How to sanitize the request before sending</param>
/// <param name="token">Cancellation token</param>
/// <returns>Result containing optional errors if something went wrong</returns>
public static async Task<CogniteResult> InsertAsync(
this SequencesResource sequences,
IEnumerable<SequenceDataCreate> toCreate,
int keyChunkSize,
int valueChunkSize,
int sequencesChunk,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token)
{
IEnumerable<CogniteError> errors;
(toCreate, errors) = Sanitation.CleanSequenceDataRequest(toCreate, sanitationMode);

var dict = toCreate.ToDictionary(create => create.Id.HasValue ? Identity.Create(create.Id.Value) : Identity.Create(create.ExternalId), new IdentityComparer());
var chunks = dict
.Select(kvp => (kvp.Key, kvp.Value.Rows))
.ChunkBy(keyChunkSize, valueChunkSize)
.Select(chunk => chunk
.Select(pair => new SequenceDataCreate
{
Columns = dict[pair.Key].Columns,
Id = pair.Key.Id,
ExternalId = pair.Key.ExternalId,
Rows = pair.Values
}))
.ToList();

int size = chunks.Count + (errors.Any() ? 1 : 0);
var results = new CogniteResult[size];

if (errors.Any())
{
results[size - 1] = new CogniteResult<Sequence>(errors, null);
if (size == 1) return results[size - 1];
}
if (size == 0) return new CogniteResult<Sequence>(null, null);

_logger.LogDebug("Inserting sequences rows. Number of sequences: {Number}. Number of chunks: {Chunks}", toCreate.Count(), chunks.Count);
var generators = chunks
.Select<IEnumerable<SequenceDataCreate>, Func<Task>>(
(chunk, idx) => async () => {
var result = await InsertSequenceRowsHandleErrors(sequences, chunk, sequencesChunk, throttleSize, retryMode, token).ConfigureAwait(false);
results[idx] = result;
});

int taskNum = 0;
await generators.RunThrottled(
throttleSize,
(_) => {
if (chunks.Count > 1)
_logger.LogDebug("{MethodName} completed {NumDone}/{TotalNum} tasks",
nameof(InsertAsync), ++taskNum, chunks.Count);
},
token).ConfigureAwait(false);

return CogniteResult.Merge(results);
}

private static async Task<CogniteResult> InsertSequenceRowsHandleErrors(
SequencesResource sequences,
IEnumerable<SequenceDataCreate> toCreate,
int sequencesChunk,
int throttleSize,
RetryMode retryMode,
CancellationToken token)
{
var errors = new List<CogniteError>();
while (toCreate != null && toCreate.Any() && !token.IsCancellationRequested)
{
try
{
using (CdfMetrics.SequenceRows.WithLabels("create"))
{
await sequences.CreateRowsAsync(toCreate, token).ConfigureAwait(false);
}

_logger.LogDebug("Created {rows} rows for {seq} sequences in CDF", toCreate.Sum(seq => seq.Rows.Count()), toCreate.Count());
return new CogniteResult(errors);
}
catch (Exception ex)
{
_logger.LogDebug("Failed to create rows for {seq} sequences", toCreate.Count());
var error = ResultHandlers.ParseException(ex, RequestType.CreateSequenceRows);
if (error.Complete) errors.Add(error);
if (error.Type == ErrorType.FatalFailure
&& (retryMode == RetryMode.OnFatal
|| retryMode == RetryMode.OnFatalKeepDuplicates))
{
await Task.Delay(1000, token).ConfigureAwait(false);
}
else if (retryMode == RetryMode.None) break;
else
{
if (!error.Complete)
{
var newErrors = await
ResultHandlers.VerifySequencesFromCDF(sequences, toCreate, sequencesChunk, throttleSize, token)
.ConfigureAwait(false);
foreach (var err in newErrors)
{
errors.Add(err);
var cleaned = ResultHandlers.CleanFromError(err, toCreate);
toCreate = toCreate.Intersect(cleaned);
}
}
else
{
toCreate = ResultHandlers.CleanFromError(error, toCreate);
}

}
}
}

return new CogniteResult(errors);
}
}
}
Loading

0 comments on commit b142258

Please sign in to comment.