Last active 1741593448

Program.cs Raw
1using System.Collections.Concurrent;
2using System.Data;
3using Kusto.Cloud.Platform.Data;
4using Kusto.Cloud.Platform.Utils;
5using Kusto.Data;
6using Kusto.Data.Common;
7using Kusto.Data.Net.Client;
8using Kusto.Ingest;
9using Microsoft.Extensions.Logging;
10using Newtonsoft.Json;
11
12public class KustoIngestService
13{
14 private readonly IKustoIngestClient _ingestClient;
15 private readonly ICslAdminProvider _adminClient;
16 private readonly string _database;
17 private readonly Microsoft.Extensions.Logging.ILogger<KustoIngestService> _logger;
18
19 public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId, Microsoft.Extensions.Logging.ILogger<KustoIngestService> logger)
20 {
21 var kustoCsb = new KustoConnectionStringBuilder(kustoUri)
22 .WithAadApplicationKeyAuthentication(appId, appKey, tenantId);
23
24 _ingestClient = KustoIngestFactory.CreateDirectIngestClient(kustoCsb);
25 _adminClient = KustoClientFactory.CreateCslAdminProvider(kustoCsb);
26 _database = database;
27 _logger = logger;
28 }
29
30 public async Task SaveToKustoInBatchAsync<T>(T[] dataToWrite, string table)
31 {
32 foreach (var parition in Program.Partition(dataToWrite, 150))
33 {
34 var dataTable = parition.ToDataTable();
35 await IngestDataAsync(dataTable, table);
36 }
37
38 await IngestDataAsync(dataToWrite.ToDataTable(), table);
39 }
40
41 public async Task IngestDataAsync(DataTable dataTable, string tableName)
42 {
43 try
44 {
45 _logger.LogInformation("Starting ingestion of {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName);
46 var ingestionProperties = new KustoIngestionProperties(_database, tableName);
47 await _ingestClient.IngestFromDataReaderAsync(dataTable.CreateDataReader(), ingestionProperties);
48 _logger.LogInformation("Successfully ingested {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName);
49 }
50 catch (Exception ex)
51 {
52 _logger.LogError(ex, "Error during ingestion into table {TableName}", tableName);
53 throw;
54 }
55 }
56
57 public async Task EnsureTableExistsAsync<T>(string tableName)
58 {
59 _logger.LogInformation("Ensuring table {TableName} exists.", tableName);
60
61 var schema = await GetDatabaseSchemaAsync();
62 if (schema == null)
63 {
64 _logger.LogWarning("Database schema is null, creating new table {TableName}", tableName);
65 await CreateMergeTable<T>(tableName);
66 }
67 else if (!schema.Tables.TryGetValue(tableName, out var tableSchema))
68 {
69 _logger.LogInformation("Table {TableName} does not exist. Creating it.", tableName);
70 await CreateMergeTable<T>(tableName);
71 }
72 else
73 {
74 _logger.LogInformation("Table {TableName} exists. Verifying schema.", tableName);
75 var existingColumns = new HashSet<string>(tableSchema.OrderedColumns.Select(c => c.Name));
76 var newColumns = typeof(T).GetProperties().Select(p => p.Name).Where(c => !existingColumns.Contains(c));
77
78 if (newColumns.Any())
79 {
80 _logger.LogWarning("New columns detected in table {TableName}, updating schema.", tableName);
81 await CreateMergeTable<T>(tableName);
82 }
83 }
84 }
85
86 private async Task<DatabaseSchema> GetDatabaseSchemaAsync()
87 {
88 _logger.LogInformation("Fetching database schema for {Database}", _database);
89 try
90 {
91 var command = CslCommandGenerator.GenerateDatabaseSchemaShowAsJsonCommand(_database);
92 using var reader = await _adminClient.ExecuteControlCommandAsync(_database, command);
93 var schemaJson = reader.ToEnumerable<string>().FirstOrDefault();
94 var clusterSchema = JsonConvert.DeserializeObject<ClusterSchema>(schemaJson);
95 return clusterSchema?.Databases.GetValueOrDefault(_database);
96 }
97 catch (Exception ex)
98 {
99 _logger.LogError(ex, "Failed to retrieve database schema for {Database}", _database);
100 throw;
101 }
102 }
103
104 private async Task CreateMergeTable<T>(string tableName)
105 {
106 _logger.LogInformation("Creating or merging table schema for {TableName}", tableName);
107 var columns = typeof(T).GetProperties().Select(p => ColumnSchema.FromNameAndCslType(p.Name, ConvertTypeToCslType(p.PropertyType)));
108 var command = CslCommandGenerator.GenerateTableCreateMergeCommand(new TableSchema(tableName, columns));
109 await _adminClient.ExecuteControlCommandAsync(_database, command);
110 _logger.LogInformation("Table {TableName} schema created/merged successfully", tableName);
111 }
112
113 private static string ConvertTypeToCslType(Type type) => type switch
114 {
115 _ when type == typeof(bool) => "bool",
116 _ when type == typeof(DateTime) => "datetime",
117 _ when type == typeof(Guid) => "guid",
118 _ when type == typeof(int) => "int",
119 _ when type == typeof(long) => "long",
120 _ when type == typeof(double) => "real",
121 _ when type == typeof(string) => "string",
122 _ when type == typeof(TimeSpan) => "timespan",
123 _ when type.IsEnum => "string",
124 _ when type == typeof(byte[]) => "string",
125 _ => "dynamic"
126 };
127}
128
129// 数据模型
130public class MyData
131{
132 public int Id { get; set; }
133 public string Name { get; set; }
134 public DateTime Timestamp { get; set; }
135}
136
137// 启动入口
138public class Program
139{
140 public static async Task Main()
141 {
142 var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
143 var logger = loggerFactory.CreateLogger<KustoIngestService>();
144
145 var kustoUri = "https://testkustoanduin.eastasia.kusto.windows.net";
146 var database = "yourdatabase";
147 var appId = "1c4b1053-5aef-45dc-9d42-a69ea84b422d";
148 var appKey = "7fm8Q~CxycQfV3qW5v6G71vRGVCuYM2eLfZ0obC5";
149 var tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838";
150
151 var ingestService = new KustoIngestService(
152 kustoUri: kustoUri,
153 database: database,
154 appId: appId,
155 appKey: appKey,
156 tenantId: tenantId,
157 logger: logger
158 );
159
160 await ingestService.EnsureTableExistsAsync<MyData>("your-kusto-table");
161
162 Console.WriteLine("Generating records...");
163 var data = GenerateTestData(10_0000);
164
165 Console.WriteLine("Ingesting data into Kusto...");
166 var kustoBuffer = new KustoBuffer<MyData>(ingestService, "your-kusto-table");
167 foreach (var parition in Partition(data, 2000))
168 {
169 kustoBuffer.Add(parition.ToArray());
170 }
171 Console.ReadLine();
172 }
173
174 private static List<MyData> GenerateTestData(int count)
175 {
176 var data = new List<MyData>(count);
177 var now = DateTime.UtcNow;
178
179 Parallel.For(0, count, i =>
180 {
181 var item = new MyData
182 {
183 Id = i + 1,
184 Name = $"Item_{i}",
185 Timestamp = now.AddSeconds(i)
186 };
187
188 lock (data) { data.Add(item); }
189 });
190
191 return data;
192 }
193
194 public static IEnumerable<List<T>> Partition<T>(IEnumerable<T> source, int maxSize)
195 {
196 if (source == null)
197 {
198 throw new ArgumentNullException(nameof(source));
199 }
200 if (maxSize <= 0)
201 {
202 throw new ArgumentOutOfRangeException(nameof(maxSize), "MaxSize must larger than 0");
203 }
204
205 var buffer = new List<T>();
206 foreach (var item in source)
207 {
208 buffer.Add(item);
209 if (buffer.Count == maxSize)
210 {
211 yield return buffer;
212 buffer = new List<T>();
213 }
214 }
215 if (buffer.Count > 0)
216 {
217 yield return buffer;
218 }
219 }
220}
221
222/// <summary>
223/// KustoBuffer provides a non-blocking way to buffer incoming data and batch write them to Kusto.
224/// This design is inspired by the proven ArrayDb implementation:
225///
226/// https://github.com/AiursoftWeb/ArrayDb/blob/master/src/Aiursoft.ArrayDb.WriteBuffer/BufferedObjectBucket.cs
227/// </summary>
228public class KustoBuffer<T>
229{
230 // Dependency: Kusto service for persisting data
231 private readonly KustoIngestService _kustoService;
232 private readonly string _tableName;
233
234 // Configurable parameters: maximum sleep time when cold and threshold to stop sleeping
235 private readonly int _maxSleepMilliSecondsWhenCold;
236 private readonly int _stopSleepingWhenWriteBufferItemsMoreThan;
237
238 // Engine tasks controlling the write process
239 private Task _engine = Task.CompletedTask;
240 private Task _coolDownEngine = Task.CompletedTask;
241
242 // Locks for protecting buffer swapping and engine status switching
243 private readonly ReaderWriterLockSlim _bufferLock = new ReaderWriterLockSlim();
244 private readonly object _bufferWriteSwapLock = new object();
245 private readonly object _engineStatusSwitchLock = new object();
246
247 // Double buffering: _activeBuffer is used for enqueuing new data,
248 // while _secondaryBuffer is used for swapping and persisting data.
249 private ConcurrentQueue<T> _activeBuffer = new ConcurrentQueue<T>();
250 private ConcurrentQueue<T> _secondaryBuffer = new ConcurrentQueue<T>();
251
252 public bool IsCold => _engine.IsCompleted && _coolDownEngine.IsCompleted;
253
254 public bool IsHot => !IsCold;
255
256 public KustoBuffer(
257 KustoIngestService kustoService,
258 string tableName,
259 int maxSleepMilliSecondsWhenCold = 2000,
260 int stopSleepingWhenWriteBufferItemsMoreThan = 1000)
261 {
262 _kustoService = kustoService ?? throw new ArgumentNullException(nameof(kustoService));
263 _tableName = tableName;
264 _maxSleepMilliSecondsWhenCold = maxSleepMilliSecondsWhenCold;
265 _stopSleepingWhenWriteBufferItemsMoreThan = stopSleepingWhenWriteBufferItemsMoreThan;
266 }
267
268 private static int CalculateSleepTime(double maxSleepMilliSecondsWhenCold, double stopSleepingWhenWriteBufferItemsMoreThan, int writeBufferItemsCount)
269 {
270 if (stopSleepingWhenWriteBufferItemsMoreThan <= 0)
271 {
272 throw new ArgumentException("B must be a positive number.");
273 }
274
275 if (writeBufferItemsCount > stopSleepingWhenWriteBufferItemsMoreThan)
276 {
277 return 0;
278 }
279
280 var y = maxSleepMilliSecondsWhenCold * (1 - (Math.Log(1 + writeBufferItemsCount) / Math.Log(1 + stopSleepingWhenWriteBufferItemsMoreThan)));
281 return (int)y;
282 }
283
284 /// <summary>
285 /// Adds objects to the buffer in a non-blocking manner.
286 /// This method wakes up the engine to persist the data.
287 /// </summary>
288 public void Add(params T[] objs)
289 {
290 lock (_bufferWriteSwapLock)
291 {
292 foreach (var obj in objs)
293 {
294 _activeBuffer.Enqueue(obj);
295 }
296 }
297
298 if (objs.Length == 0)
299 {
300 return;
301 }
302
303 // Get the engine status in advanced to avoid lock contention.
304 if (!IsCold)
305 {
306 // Most of the cases in a high-frequency environment, the engine is still running.
307 return;
308 }
309
310 // Engine might be sleeping. Wake it up.
311 lock (_engineStatusSwitchLock)
312 {
313 // Avoid multiple threads to wake up the engine at the same time.
314 if (!IsCold)
315 {
316 return;
317 }
318
319 _engine = Task.Run(WriteBuffered);
320 }
321 }
322
323 /// <summary>
324 /// Persists the buffered data in batches to the Kusto database.
325 /// This method is guaranteed to be executed by a single thread at a time.
326 /// </summary>
327 private async Task WriteBuffered()
328 {
329 _bufferLock.EnterWriteLock();
330 try
331 {
332 ConcurrentQueue<T> bufferToPersist;
333 lock (_bufferWriteSwapLock)
334 {
335 // Swap active and secondary buffers
336 bufferToPersist = _activeBuffer;
337 _activeBuffer = _secondaryBuffer;
338 _secondaryBuffer = new ConcurrentQueue<T>();
339 }
340
341 var dataToWrite = bufferToPersist.ToArray();
342
343 // Release the buffer to avoid memory leak.
344 bufferToPersist.Clear();
345
346 // If there is data, batch write to Kusto
347 if (dataToWrite.Length > 0)
348 {
349 // Process the buffer to persist
350 await _kustoService.SaveToKustoInBatchAsync(dataToWrite, _tableName);
351 }
352 }
353 finally
354 {
355 _bufferLock.ExitWriteLock();
356 }
357
358 // While we are writing, new data may be added to the buffer. If so, we need to write it too.
359 if (!_activeBuffer.IsEmpty)
360 {
361 // Restart the engine to write the new added data.
362 // Before engine quits, it wakes up cool down engine to ensure the engine will be restarted.
363 // Before cool down quits, it wakes up the engine to ensure the engine will be restarted.
364 // So if one of the two tasks is running, the engine will be restarted. And this buffer is in a hot state.
365 // In a hot state, you don't have to start the engine again.
366 _coolDownEngine = Task.Run(async () =>
367 {
368 // Slow down a little bit to wait for more data to come.
369 // If we persist too fast, and the buffer is almost empty, frequent write will cause data fragmentation.
370 // If we persist too slow, and a lot of new data has been added to the buffer, and the engine wasted time in sleeping.
371 // So the time to sleep is calculated by the number of items in the buffer.
372 var sleepTime = CalculateSleepTime(
373 _maxSleepMilliSecondsWhenCold,
374 _stopSleepingWhenWriteBufferItemsMoreThan,
375 _activeBuffer.Count);
376 await Task.Delay(sleepTime);
377
378 // Wake up the engine to write the new added data.
379 _engine = Task.Run(WriteBuffered);
380 });
381 }
382 }
383
384 /// <summary>
385 /// Synchronizes the buffer, ensuring that all pending data is persisted.
386 /// </summary>
387 public async Task SyncAsync()
388 {
389 // Case 1:
390 // Engine is working. However, the buffer may still have data that after this phase, the data is still not written.
391 // Wait two rounds of engine to finish to ensure all data is written.
392 // Cool down engine will ensure restart the engine to write the remaining data.
393 // Wait for the engine to finish.
394
395 // Case 2:
396 // The engine is not working. In this case, it might be in the cool down phase.
397 // The first wait is just await a completed task.
398 // Cool down engine will ensure restart the engine to write the remaining data.
399 // Then wait for the engine to finish.
400 await _engine;
401 await _coolDownEngine;
402 await _engine;
403 }
404}
405
406