Skip to content

Commit

Permalink
Data sanitation (#64)
Browse files Browse the repository at this point in the history
Sanitize events, assets and timeseries before we create them. This is a precursor to more sophisticated error handling. The first step to a fault-tolerant framework for requests to CDF is to handle any issues that come up before we make the request.
  • Loading branch information
einarmo authored Aug 10, 2020
1 parent 9bc026a commit ed227d5
Show file tree
Hide file tree
Showing 5 changed files with 351 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Cognite.Extensions/AssetExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public static async Task EnsureExistsAsync(
int throttleSize,
CancellationToken token)
{
foreach (var asset in assetsToEnsure) asset.Sanitize();

var chunks = assetsToEnsure
.ChunkBy(chunkSize);
_logger.LogDebug("Ensuring assets. Number of assets: {Number}. Number of chunks: {Chunks}", assetsToEnsure.Count(), chunks.Count());
Expand Down Expand Up @@ -165,6 +167,7 @@ private static async Task<IEnumerable<Asset>> GetOrCreateAssetsChunk(
var toCreate = await buildAssets(missing);
if (toCreate.Any())
{
foreach (var asset in toCreate) asset.Sanitize();
IEnumerable<Asset> newAssets;
using (CdfMetrics.Assets.WithLabels("create"))
{
Expand Down
2 changes: 2 additions & 0 deletions Cognite.Extensions/EventExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ private static async Task<IEnumerable<Event>> GetOrCreateEventsChunk(
var toCreate = await buildEvents(missing);
if (toCreate.Any())
{
foreach (var evt in toCreate) evt.Sanitize();
IEnumerable<Event> newEvents;
using (CdfMetrics.Events.WithLabels("create").NewTimer())
{
Expand Down Expand Up @@ -172,6 +173,7 @@ public static async Task EnsureExistsAsync(
bool failOnError,
CancellationToken token)
{
foreach (var evt in events) evt.Sanitize();
var chunks = events
.ChunkBy(chunkSize)
.ToList();
Expand Down
226 changes: 226 additions & 0 deletions Cognite.Extensions/Sanitation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
using CogniteSdk;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Cognite.Extensions
{
public static class Sanitation
{
public const int ExternalIdMax = 255;

public const int AssetNameMax = 140;
public const int AssetDescriptionMax = 500;
public const int AssetMetadataMaxBytes = 10240;
public const int AssetMetadataMaxPerKey = 128;
public const int AssetMetadataMaxPerValue = 10240;
public const int AssetMetadataMaxPairs = 256;
public const int AssetSourceMax = 128;

public const int TimeSeriesNameMax = 255;
public const int TimeSeriesDescriptionMax = 1000;
public const int TimeSeriesUnitMax = 32;
public const int TimeSeriesMetadataMaxPerKey = 32;
public const int TimeSeriesMetadataMaxPerValue = 512;
public const int TimeSeriesMetadataMaxPairs = 16;

public const int EventTypeMax = 64;
public const int EventDescriptionMax = 500;
public const int EventSourceMax = 128;
public const int EventMetadataMaxPerKey = 128;
public const int EventMetadataMaxPerValue = 128_000;
public const int EventMetadataMaxPairs = 256;
public const int EventmetadataMaxBytes = 200_000;
public const int EventAssetIdsMax = 10_000;
/// <summary>
/// Reduce the length of given string to maxLength, if it is longer.
/// </summary>
/// <param name="str">String to be shortened</param>
/// <param name="maxLength">Maximum length of final string</param>
/// <returns>String which contains the first <paramref name="maxLength"/> characters of the passed string.</returns>
public static string Truncate(this string str, int maxLength)
{
if (string.IsNullOrEmpty(str) || str.Length <= maxLength) return str;
return str.Substring(0, maxLength);
}

/// <summary>
/// Reduce the length of given CogniteExternalId to maxLength, if it is longer.
/// </summary>
/// <param name="id">CogniteExternalId to be shortened</param>
/// <param name="maxLength">Maximum length of final string</param>
/// <returns>CogniteExternalId which contains the first <paramref name="maxLength"/> characters of the passed value.</returns>
public static CogniteExternalId Truncate(this CogniteExternalId id, int maxLength)
{
if (id == null) return id;
var str = id.ExternalId;
if (string.IsNullOrEmpty(str) || str.Length <= maxLength) return id;
return new CogniteExternalId(str.Substring(0, maxLength));
}

/// <summary>
/// Limit the maximum number of UTF8 bytes in the given string.
/// </summary>
/// <param name="str">String to truncate</param>
/// <param name="n">Maximum number of UTF8 bytes in the final string</param>
/// <returns>A truncated string, may be the same if no truncating was necessary</returns>
public static string LimitUtf8ByteCount(this string str, int n)
{
if (string.IsNullOrEmpty(str) || Encoding.UTF8.GetByteCount(str) <= n) return str;

var a = Encoding.UTF8.GetBytes(str);
if (n > 0 && (a[n] & 0xC0) == 0x80)
{
// remove all bytes whose two highest bits are 10
// and one more (start of multi-byte sequence - highest bits should be 11)
while (--n > 0 && (a[n] & 0xC0) == 0x80) ;
}
// convert back to string (with the limit adjusted)
return Encoding.UTF8.GetString(a, 0, n);
}

/// <summary>
/// Transform an enumerable into a dictionary. Unlike the LINQ version,
/// this simply uses the last value if there are duplicates, instead of throwing an error.
/// </summary>
/// <typeparam name="TInput">Input enumerable type</typeparam>
/// <typeparam name="TKey">Output key type</typeparam>
/// <typeparam name="TValue">Output value type</typeparam>
/// <param name="input">Input enumerable</param>
/// <param name="keySelector">Function to select key from input</param>
/// <param name="valueSelector">Function to select value from input</param>
/// <param name="comparer">IEqualityComparer to use for dictionary</param>
/// <returns>A dictionary form <typeparamref name="TKey"/> to <typeparamref name="TValue"/></returns>
public static Dictionary<TKey, TValue> ToDictionarySafe<TInput, TKey, TValue>(
this IEnumerable<TInput> input,
Func<TInput, TKey> keySelector,
Func<TInput, TValue> valueSelector,
IEqualityComparer<TKey> comparer = null)
{
if (input == null) throw new ArgumentNullException(nameof(input));
var ret = new Dictionary<TKey, TValue>(comparer);
foreach (var elem in input)
{
ret[keySelector(elem)] = valueSelector(elem);
}
return ret;
}

/// <summary>
/// Sanitize a string, string metadata dictionary by limiting the number of UTF8 bytes per key,
/// value and total, as well as the total number of key, value pairs.
/// </summary>
/// <param name="data">Metadata to limit</param>
/// <param name="maxPerKey">Maximum number of bytes per key</param>
/// <param name="maxKeys">Maximum number of key, value pairs</param>
/// <param name="maxPerValue">Maximum number of bytes per value</param>
/// <param name="maxBytes">Maximum number of total bytes</param>
/// <returns>A sanitized dictionary</returns>
public static Dictionary<string, string> SanitizeMetadata(this Dictionary<string, string> data,
int maxPerKey,
int maxKeys,
int maxPerValue,
int maxBytes)
{
if (data == null || !data.Any()) return data;
int count = 0;
int byteCount = 0;
return data
.Where(kvp => kvp.Key != null)
.Select(kvp => (kvp.Key.LimitUtf8ByteCount(maxPerKey), kvp.Value.LimitUtf8ByteCount(maxPerValue)))
.TakeWhile(pair =>
{
count++;
byteCount += Encoding.UTF8.GetByteCount(pair.Item1) + Encoding.UTF8.GetByteCount(pair.Item2);
return count <= maxKeys && byteCount <= maxBytes;
})
.ToDictionarySafe(pair => pair.Item1, pair => pair.Item2);
}

/// <summary>
/// Sanitize a string, string metadata dictionary by limiting the number of UTF8 bytes per key
/// and value, as well as the total number of key, value pairs.
/// </summary>
/// <param name="data">Metadata to limit</param>
/// <param name="maxPerKey">Maximum number of bytes per key</param>
/// <param name="maxKeys">Maximum number of keys</param>
/// <param name="maxPerValue">Maximum number of bytes per value</param>
/// <returns></returns>
public static Dictionary<string, string> SanitizeMetadata(this Dictionary<string, string> data,
int maxPerKey,
int maxKeys,
int maxPerValue)
{
if (data == null || !data.Any()) return data;
return data
.Where(kvp => kvp.Key != null)
.Select(kvp => (kvp.Key.LimitUtf8ByteCount(maxPerKey), kvp.Value.LimitUtf8ByteCount(maxPerValue)))
.Take(maxKeys)
.ToDictionarySafe(pair => pair.Item1, pair => pair.Item2);
}

/// <summary>
/// Sanitize an AssetCreate so that it can be safely sent to CDF.
/// Requests may still fail due to conflicts or missing ids.
/// </summary>
/// <param name="asset">Asset to sanitize</param>
public static void Sanitize(this AssetCreate asset)
{
if (asset == null) throw new ArgumentNullException(nameof(asset));
asset.ExternalId = asset.ExternalId?.Truncate(ExternalIdMax);
asset.Name = asset.Name?.Truncate(AssetNameMax);
if (asset.ParentId < 1) asset.ParentId = null;
asset.ParentExternalId = asset.ParentExternalId?.Truncate(ExternalIdMax);
asset.Description = asset.Description?.Truncate(AssetDescriptionMax);
if (asset.DataSetId < 1) asset.DataSetId = null;
asset.Metadata = asset.Metadata?.SanitizeMetadata(AssetMetadataMaxPerKey, AssetMetadataMaxPairs, AssetMetadataMaxPerValue, AssetMetadataMaxBytes);
asset.Source = asset.Source.Truncate(AssetSourceMax);
asset.Labels = asset.Labels?
.Where(label => label != null && label.ExternalId != null)
.Select(label => label.Truncate(ExternalIdMax))
.Take(10);
}

/// <summary>
/// Sanitize a TimeSeriesCreate object so that it can be safely sent to CDF.
/// Requests may still fail due to conflicts or missing ids.
/// </summary>
/// <param name="ts">TimeSeries to sanitize</param>
public static void Sanitize(this TimeSeriesCreate ts)
{
if (ts == null) throw new ArgumentNullException(nameof(ts));
ts.ExternalId = ts.ExternalId?.Truncate(ExternalIdMax);
ts.Name = ts.Name?.Truncate(TimeSeriesNameMax);
if (ts.AssetId < 1) ts.AssetId = null;
ts.Description = ts.Description?.Truncate(TimeSeriesDescriptionMax);
if (ts.DataSetId < 1) ts.DataSetId = null;
ts.Metadata = ts.Metadata?.SanitizeMetadata(TimeSeriesMetadataMaxPerKey, TimeSeriesMetadataMaxPairs, TimeSeriesMetadataMaxPerValue);
ts.Unit = ts.Unit?.Truncate(TimeSeriesUnitMax);
ts.LegacyName = ts.LegacyName?.Truncate(ExternalIdMax);
}

/// <summary>
/// Sanitize a EventCreate object so that it can be safely sent to CDF.
/// Requests may still fail due to conflicts or missing ids.
/// </summary>
/// <param name="evt">TimeSeries to sanitize</param>
public static void Sanitize(this EventCreate evt)
{
if (evt == null) throw new ArgumentNullException(nameof(evt));
evt.ExternalId = evt.ExternalId?.Truncate(ExternalIdMax);
evt.Type = evt.Type?.Truncate(EventTypeMax);
evt.Subtype = evt.Subtype?.Truncate(EventTypeMax);
evt.Source = evt.Source?.Truncate(EventSourceMax);
evt.Description = evt.Description?.Truncate(EventDescriptionMax);
evt.AssetIds = evt.AssetIds?
.Where(id => id > 0)
.Take(EventAssetIdsMax);
if (evt.StartTime < 0) evt.StartTime = 0;
if (evt.EndTime < 0) evt.EndTime = 0;
if (evt.StartTime > evt.EndTime) evt.EndTime = evt.StartTime;
if (evt.DataSetId < 1) evt.DataSetId = null;
evt.Metadata = evt.Metadata?.SanitizeMetadata(EventMetadataMaxPerKey, EventMetadataMaxPairs, EventMetadataMaxPerValue, EventmetadataMaxBytes);
}
}
}
2 changes: 2 additions & 0 deletions Cognite.Extensions/TimeSeriesExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public static async Task EnsureTimeSeriesExistsAsync(
int throttleSize,
CancellationToken token)
{
foreach (var ts in timeSeriesToEnsure) ts.Sanitize();
var chunks = timeSeriesToEnsure
.ChunkBy(chunkSize)
.ToList();
Expand Down Expand Up @@ -213,6 +214,7 @@ private static async Task<IEnumerable<TimeSeries>> GetOrCreateTimeSeriesChunk(
var toCreate = await buildTimeSeries(missing);
if (toCreate.Any())
{
foreach (var ts in toCreate) ts.Sanitize();
IEnumerable<TimeSeries> newTs;
using (CdfMetrics.TimeSeries.WithLabels("create").NewTimer())
{
Expand Down
118 changes: 118 additions & 0 deletions ExtractorUtils.Test/SanitationTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using Cognite.Extensions;
using CogniteSdk;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using Xunit;

namespace ExtractorUtils.Test
{
public class SanitationTest
{
[Fact]
public void TestSanitizeAsset()
{
var asset = new AssetCreate
{
ExternalId = new string('æ', 300),
Description = new string('æ', 1000),
DataSetId = -2502,
Labels = new CogniteExternalId[] { null, new CogniteExternalId(null) }.Concat(Enumerable.Range(0, 100).Select(i => new CogniteExternalId(new string('æ', 300)))),
Metadata = Enumerable.Range(0, 100)
.ToDictionary(i => $"key{i.ToString("000")}{new string('æ', 100)}", i => new string('æ', 200)),
Name = new string('ø', 1000),
ParentExternalId = new string('æ', 300),
ParentId = -1234,
Source = new string('æ', 12345)
};

asset.Sanitize();

Assert.Equal(new string('æ', 255), asset.ExternalId);
Assert.Equal(new string('æ', 500), asset.Description);
Assert.Null(asset.DataSetId);
Assert.Equal(10, asset.Labels.Count());
Assert.All(asset.Labels, ext => Assert.Equal(new string('æ', 255), ext.ExternalId));
Assert.Equal(19, asset.Metadata.Count);
// 'æ' is 2 bytes, key{i} will be 6 bytes, so 128-6 = 122, 122/2 = 61, 61 + 6 = 67
Assert.All(asset.Metadata, kvp => Assert.Equal(67, kvp.Key.Length));
Assert.All(asset.Metadata, kvp => Assert.Equal(new string('æ', 200), kvp.Value));
Assert.Equal(new string('ø', 140), asset.Name);
Assert.Equal(new string('æ', 255), asset.ParentExternalId);
Assert.Null(asset.ParentId);
Assert.Equal(new string('æ', 128), asset.Source);
}
[Fact]
public void TestSanitizeTimeSeries()
{
var ts = new TimeSeriesCreate
{
ExternalId = new string('æ', 300),
Description = new string('æ', 2000),
DataSetId = -2952,
AssetId = -1239,
LegacyName = new string('æ', 300),
Metadata = Enumerable.Range(0, 100)
.ToDictionary(i => $"key{i.ToString("000")}{new string('æ', 100)}", i => new string('æ', 600)),
Name = new string('æ', 300),
Unit = new string('æ', 200)
};

ts.Sanitize();

Assert.Equal(new string('æ', 255), ts.ExternalId);
Assert.Equal(new string('æ', 1000), ts.Description);
Assert.Null(ts.DataSetId);
Assert.Null(ts.AssetId);
Assert.Equal(new string('æ', 255), ts.LegacyName);
Assert.Equal(16, ts.Metadata.Count);
// 32-6 = 26, 26/2 = 13, 13+6 = 19.
Assert.All(ts.Metadata, kvp => Assert.Equal(19, kvp.Key.Length));
Assert.All(ts.Metadata, kvp => Assert.Equal(256, kvp.Value.Length));
Assert.Equal(new string('æ', 255), ts.Name);
Assert.Equal(new string('æ', 32), ts.Unit);
}
[Fact]
public void TestSanitizeEvent()
{
var evt = new EventCreate
{
AssetIds = Enumerable.Range(-100, 100000).Select(i => (long)i),
DataSetId = -125,
Description = new string('æ', 1000),
EndTime = -12345,
StartTime = -12345,
ExternalId = new string('æ', 300),
Metadata = Enumerable.Range(0, 200)
.ToDictionary(i => $"key{i.ToString("000")}{new string('æ', 100)}", i => new string('æ', 600)),
Source = new string('æ', 200),
Subtype = new string('æ', 300),
Type = new string('æ', 300)
};

evt.Sanitize();

Assert.Equal(10000, evt.AssetIds.Count());
Assert.All(evt.AssetIds, id => Assert.True(id > 0));
Assert.Null(evt.DataSetId);
Assert.Equal(new string('æ', 500), evt.Description);
Assert.Equal(0, evt.StartTime);
Assert.Equal(0, evt.EndTime);
Assert.Equal(new string('æ', 255), evt.ExternalId);
Assert.Equal(150, evt.Metadata.Count);
Assert.Equal(new string('æ', 128), evt.Source);
Assert.Equal(new string('æ', 64), evt.Type);
Assert.Equal(new string('æ', 64), evt.Subtype);
}
[Theory]
[InlineData("æææææ", 4)]
[InlineData("123412341234", 9)]
[InlineData("123456æææ", 7)]
public void TestUtf8Truncate(string str, int finalLength)
{
Assert.Equal(finalLength, str?.LimitUtf8ByteCount(9)?.Length ?? 0);
}
}
}

0 comments on commit ed227d5

Please sign in to comment.