using System.Collections.Concurrent; using System.Data; using Kusto.Cloud.Platform.Data; using Kusto.Cloud.Platform.Utils; using Kusto.Data; using Kusto.Data.Common; using Kusto.Data.Net.Client; using Kusto.Ingest; using Microsoft.Extensions.Logging; using Newtonsoft.Json; public class KustoIngestService { private readonly IKustoIngestClient _ingestClient; private readonly ICslAdminProvider _adminClient; private readonly string _database; private readonly Microsoft.Extensions.Logging.ILogger _logger; public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId, Microsoft.Extensions.Logging.ILogger logger) { var kustoCsb = new KustoConnectionStringBuilder(kustoUri) .WithAadApplicationKeyAuthentication(appId, appKey, tenantId); _ingestClient = KustoIngestFactory.CreateDirectIngestClient(kustoCsb); _adminClient = KustoClientFactory.CreateCslAdminProvider(kustoCsb); _database = database; _logger = logger; } public async Task SaveToKustoInBatchAsync(T[] dataToWrite, string table) { foreach (var parition in Program.Partition(dataToWrite, 150)) { var dataTable = parition.ToDataTable(); await IngestDataAsync(dataTable, table); } await IngestDataAsync(dataToWrite.ToDataTable(), table); } public async Task IngestDataAsync(DataTable dataTable, string tableName) { try { _logger.LogInformation("Starting ingestion of {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName); var ingestionProperties = new KustoIngestionProperties(_database, tableName); await _ingestClient.IngestFromDataReaderAsync(dataTable.CreateDataReader(), ingestionProperties); _logger.LogInformation("Successfully ingested {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName); } catch (Exception ex) { _logger.LogError(ex, "Error during ingestion into table {TableName}", tableName); throw; } } public async Task EnsureTableExistsAsync(string tableName) { _logger.LogInformation("Ensuring table {TableName} exists.", tableName); var schema = await GetDatabaseSchemaAsync(); if (schema == null) { _logger.LogWarning("Database schema is null, creating new table {TableName}", tableName); await CreateMergeTable(tableName); } else if (!schema.Tables.TryGetValue(tableName, out var tableSchema)) { _logger.LogInformation("Table {TableName} does not exist. Creating it.", tableName); await CreateMergeTable(tableName); } else { _logger.LogInformation("Table {TableName} exists. Verifying schema.", tableName); var existingColumns = new HashSet(tableSchema.OrderedColumns.Select(c => c.Name)); var newColumns = typeof(T).GetProperties().Select(p => p.Name).Where(c => !existingColumns.Contains(c)); if (newColumns.Any()) { _logger.LogWarning("New columns detected in table {TableName}, updating schema.", tableName); await CreateMergeTable(tableName); } } } private async Task GetDatabaseSchemaAsync() { _logger.LogInformation("Fetching database schema for {Database}", _database); try { var command = CslCommandGenerator.GenerateDatabaseSchemaShowAsJsonCommand(_database); using var reader = await _adminClient.ExecuteControlCommandAsync(_database, command); var schemaJson = reader.ToEnumerable().FirstOrDefault(); var clusterSchema = JsonConvert.DeserializeObject(schemaJson); return clusterSchema?.Databases.GetValueOrDefault(_database); } catch (Exception ex) { _logger.LogError(ex, "Failed to retrieve database schema for {Database}", _database); throw; } } private async Task CreateMergeTable(string tableName) { _logger.LogInformation("Creating or merging table schema for {TableName}", tableName); var columns = typeof(T).GetProperties().Select(p => ColumnSchema.FromNameAndCslType(p.Name, ConvertTypeToCslType(p.PropertyType))); var command = CslCommandGenerator.GenerateTableCreateMergeCommand(new TableSchema(tableName, columns)); await _adminClient.ExecuteControlCommandAsync(_database, command); _logger.LogInformation("Table {TableName} schema created/merged successfully", tableName); } private static string ConvertTypeToCslType(Type type) => type switch { _ when type == typeof(bool) => "bool", _ when type == typeof(DateTime) => "datetime", _ when type == typeof(Guid) => "guid", _ when type == typeof(int) => "int", _ when type == typeof(long) => "long", _ when type == typeof(double) => "real", _ when type == typeof(string) => "string", _ when type == typeof(TimeSpan) => "timespan", _ when type.IsEnum => "string", _ when type == typeof(byte[]) => "string", _ => "dynamic" }; } // 数据模型 public class MyData { public int Id { get; set; } public string Name { get; set; } public DateTime Timestamp { get; set; } } // 启动入口 public class Program { public static async Task Main() { var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole()); var logger = loggerFactory.CreateLogger(); var kustoUri = "https://testkustoanduin.eastasia.kusto.windows.net"; var database = "yourdatabase"; var appId = "1c4b1053-5aef-45dc-9d42-a69ea84b422d"; var appKey = "7fm8Q~CxycQfV3qW5v6G71vRGVCuYM2eLfZ0obC5"; var tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838"; var ingestService = new KustoIngestService( kustoUri: kustoUri, database: database, appId: appId, appKey: appKey, tenantId: tenantId, logger: logger ); await ingestService.EnsureTableExistsAsync("your-kusto-table"); Console.WriteLine("Generating records..."); var data = GenerateTestData(10_0000); Console.WriteLine("Ingesting data into Kusto..."); var kustoBuffer = new KustoBuffer(ingestService, "your-kusto-table"); foreach (var parition in Partition(data, 2000)) { kustoBuffer.Add(parition.ToArray()); } Console.ReadLine(); } private static List GenerateTestData(int count) { var data = new List(count); var now = DateTime.UtcNow; Parallel.For(0, count, i => { var item = new MyData { Id = i + 1, Name = $"Item_{i}", Timestamp = now.AddSeconds(i) }; lock (data) { data.Add(item); } }); return data; } public static IEnumerable> Partition(IEnumerable source, int maxSize) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (maxSize <= 0) { throw new ArgumentOutOfRangeException(nameof(maxSize), "MaxSize must larger than 0"); } var buffer = new List(); foreach (var item in source) { buffer.Add(item); if (buffer.Count == maxSize) { yield return buffer; buffer = new List(); } } if (buffer.Count > 0) { yield return buffer; } } } /// /// KustoBuffer provides a non-blocking way to buffer incoming data and batch write them to Kusto. /// This design is inspired by the proven ArrayDb implementation: /// /// https://github.com/AiursoftWeb/ArrayDb/blob/master/src/Aiursoft.ArrayDb.WriteBuffer/BufferedObjectBucket.cs /// public class KustoBuffer { // Dependency: Kusto service for persisting data private readonly KustoIngestService _kustoService; private readonly string _tableName; // Configurable parameters: maximum sleep time when cold and threshold to stop sleeping private readonly int _maxSleepMilliSecondsWhenCold; private readonly int _stopSleepingWhenWriteBufferItemsMoreThan; // Engine tasks controlling the write process private Task _engine = Task.CompletedTask; private Task _coolDownEngine = Task.CompletedTask; // Locks for protecting buffer swapping and engine status switching private readonly ReaderWriterLockSlim _bufferLock = new ReaderWriterLockSlim(); private readonly object _bufferWriteSwapLock = new object(); private readonly object _engineStatusSwitchLock = new object(); // Double buffering: _activeBuffer is used for enqueuing new data, // while _secondaryBuffer is used for swapping and persisting data. private ConcurrentQueue _activeBuffer = new ConcurrentQueue(); private ConcurrentQueue _secondaryBuffer = new ConcurrentQueue(); public bool IsCold => _engine.IsCompleted && _coolDownEngine.IsCompleted; public bool IsHot => !IsCold; public KustoBuffer( KustoIngestService kustoService, string tableName, int maxSleepMilliSecondsWhenCold = 2000, int stopSleepingWhenWriteBufferItemsMoreThan = 1000) { _kustoService = kustoService ?? throw new ArgumentNullException(nameof(kustoService)); _tableName = tableName; _maxSleepMilliSecondsWhenCold = maxSleepMilliSecondsWhenCold; _stopSleepingWhenWriteBufferItemsMoreThan = stopSleepingWhenWriteBufferItemsMoreThan; } private static int CalculateSleepTime(double maxSleepMilliSecondsWhenCold, double stopSleepingWhenWriteBufferItemsMoreThan, int writeBufferItemsCount) { if (stopSleepingWhenWriteBufferItemsMoreThan <= 0) { throw new ArgumentException("B must be a positive number."); } if (writeBufferItemsCount > stopSleepingWhenWriteBufferItemsMoreThan) { return 0; } var y = maxSleepMilliSecondsWhenCold * (1 - (Math.Log(1 + writeBufferItemsCount) / Math.Log(1 + stopSleepingWhenWriteBufferItemsMoreThan))); return (int)y; } /// /// Adds objects to the buffer in a non-blocking manner. /// This method wakes up the engine to persist the data. /// public void Add(params T[] objs) { lock (_bufferWriteSwapLock) { foreach (var obj in objs) { _activeBuffer.Enqueue(obj); } } if (objs.Length == 0) { return; } // Get the engine status in advanced to avoid lock contention. if (!IsCold) { // Most of the cases in a high-frequency environment, the engine is still running. return; } // Engine might be sleeping. Wake it up. lock (_engineStatusSwitchLock) { // Avoid multiple threads to wake up the engine at the same time. if (!IsCold) { return; } _engine = Task.Run(WriteBuffered); } } /// /// Persists the buffered data in batches to the Kusto database. /// This method is guaranteed to be executed by a single thread at a time. /// private async Task WriteBuffered() { _bufferLock.EnterWriteLock(); try { ConcurrentQueue bufferToPersist; lock (_bufferWriteSwapLock) { // Swap active and secondary buffers bufferToPersist = _activeBuffer; _activeBuffer = _secondaryBuffer; _secondaryBuffer = new ConcurrentQueue(); } var dataToWrite = bufferToPersist.ToArray(); // Release the buffer to avoid memory leak. bufferToPersist.Clear(); // If there is data, batch write to Kusto if (dataToWrite.Length > 0) { // Process the buffer to persist await _kustoService.SaveToKustoInBatchAsync(dataToWrite, _tableName); } } finally { _bufferLock.ExitWriteLock(); } // While we are writing, new data may be added to the buffer. If so, we need to write it too. if (!_activeBuffer.IsEmpty) { // Restart the engine to write the new added data. // Before engine quits, it wakes up cool down engine to ensure the engine will be restarted. // Before cool down quits, it wakes up the engine to ensure the engine will be restarted. // So if one of the two tasks is running, the engine will be restarted. And this buffer is in a hot state. // In a hot state, you don't have to start the engine again. _coolDownEngine = Task.Run(async () => { // Slow down a little bit to wait for more data to come. // If we persist too fast, and the buffer is almost empty, frequent write will cause data fragmentation. // If we persist too slow, and a lot of new data has been added to the buffer, and the engine wasted time in sleeping. // So the time to sleep is calculated by the number of items in the buffer. var sleepTime = CalculateSleepTime( _maxSleepMilliSecondsWhenCold, _stopSleepingWhenWriteBufferItemsMoreThan, _activeBuffer.Count); await Task.Delay(sleepTime); // Wake up the engine to write the new added data. _engine = Task.Run(WriteBuffered); }); } } /// /// Synchronizes the buffer, ensuring that all pending data is persisted. /// public async Task SyncAsync() { // Case 1: // Engine is working. However, the buffer may still have data that after this phase, the data is still not written. // Wait two rounds of engine to finish to ensure all data is written. // Cool down engine will ensure restart the engine to write the remaining data. // Wait for the engine to finish. // Case 2: // The engine is not working. In this case, it might be in the cool down phase. // The first wait is just await a completed task. // Cool down engine will ensure restart the engine to write the remaining data. // Then wait for the engine to finish. await _engine; await _coolDownEngine; await _engine; } }