Last active 1741593448

anduin's Avatar anduin revised this gist 1741593447. Go to revision

1 file changed, 405 insertions

Program.cs(file created)

@@ -0,0 +1,405 @@
1 + using System.Collections.Concurrent;
2 + using System.Data;
3 + using Kusto.Cloud.Platform.Data;
4 + using Kusto.Cloud.Platform.Utils;
5 + using Kusto.Data;
6 + using Kusto.Data.Common;
7 + using Kusto.Data.Net.Client;
8 + using Kusto.Ingest;
9 + using Microsoft.Extensions.Logging;
10 + using Newtonsoft.Json;
11 +
12 + public class KustoIngestService
13 + {
14 + private readonly IKustoIngestClient _ingestClient;
15 + private readonly ICslAdminProvider _adminClient;
16 + private readonly string _database;
17 + private readonly Microsoft.Extensions.Logging.ILogger<KustoIngestService> _logger;
18 +
19 + public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId, Microsoft.Extensions.Logging.ILogger<KustoIngestService> logger)
20 + {
21 + var kustoCsb = new KustoConnectionStringBuilder(kustoUri)
22 + .WithAadApplicationKeyAuthentication(appId, appKey, tenantId);
23 +
24 + _ingestClient = KustoIngestFactory.CreateDirectIngestClient(kustoCsb);
25 + _adminClient = KustoClientFactory.CreateCslAdminProvider(kustoCsb);
26 + _database = database;
27 + _logger = logger;
28 + }
29 +
30 + public async Task SaveToKustoInBatchAsync<T>(T[] dataToWrite, string table)
31 + {
32 + foreach (var parition in Program.Partition(dataToWrite, 150))
33 + {
34 + var dataTable = parition.ToDataTable();
35 + await IngestDataAsync(dataTable, table);
36 + }
37 +
38 + await IngestDataAsync(dataToWrite.ToDataTable(), table);
39 + }
40 +
41 + public async Task IngestDataAsync(DataTable dataTable, string tableName)
42 + {
43 + try
44 + {
45 + _logger.LogInformation("Starting ingestion of {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName);
46 + var ingestionProperties = new KustoIngestionProperties(_database, tableName);
47 + await _ingestClient.IngestFromDataReaderAsync(dataTable.CreateDataReader(), ingestionProperties);
48 + _logger.LogInformation("Successfully ingested {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName);
49 + }
50 + catch (Exception ex)
51 + {
52 + _logger.LogError(ex, "Error during ingestion into table {TableName}", tableName);
53 + throw;
54 + }
55 + }
56 +
57 + public async Task EnsureTableExistsAsync<T>(string tableName)
58 + {
59 + _logger.LogInformation("Ensuring table {TableName} exists.", tableName);
60 +
61 + var schema = await GetDatabaseSchemaAsync();
62 + if (schema == null)
63 + {
64 + _logger.LogWarning("Database schema is null, creating new table {TableName}", tableName);
65 + await CreateMergeTable<T>(tableName);
66 + }
67 + else if (!schema.Tables.TryGetValue(tableName, out var tableSchema))
68 + {
69 + _logger.LogInformation("Table {TableName} does not exist. Creating it.", tableName);
70 + await CreateMergeTable<T>(tableName);
71 + }
72 + else
73 + {
74 + _logger.LogInformation("Table {TableName} exists. Verifying schema.", tableName);
75 + var existingColumns = new HashSet<string>(tableSchema.OrderedColumns.Select(c => c.Name));
76 + var newColumns = typeof(T).GetProperties().Select(p => p.Name).Where(c => !existingColumns.Contains(c));
77 +
78 + if (newColumns.Any())
79 + {
80 + _logger.LogWarning("New columns detected in table {TableName}, updating schema.", tableName);
81 + await CreateMergeTable<T>(tableName);
82 + }
83 + }
84 + }
85 +
86 + private async Task<DatabaseSchema> GetDatabaseSchemaAsync()
87 + {
88 + _logger.LogInformation("Fetching database schema for {Database}", _database);
89 + try
90 + {
91 + var command = CslCommandGenerator.GenerateDatabaseSchemaShowAsJsonCommand(_database);
92 + using var reader = await _adminClient.ExecuteControlCommandAsync(_database, command);
93 + var schemaJson = reader.ToEnumerable<string>().FirstOrDefault();
94 + var clusterSchema = JsonConvert.DeserializeObject<ClusterSchema>(schemaJson);
95 + return clusterSchema?.Databases.GetValueOrDefault(_database);
96 + }
97 + catch (Exception ex)
98 + {
99 + _logger.LogError(ex, "Failed to retrieve database schema for {Database}", _database);
100 + throw;
101 + }
102 + }
103 +
104 + private async Task CreateMergeTable<T>(string tableName)
105 + {
106 + _logger.LogInformation("Creating or merging table schema for {TableName}", tableName);
107 + var columns = typeof(T).GetProperties().Select(p => ColumnSchema.FromNameAndCslType(p.Name, ConvertTypeToCslType(p.PropertyType)));
108 + var command = CslCommandGenerator.GenerateTableCreateMergeCommand(new TableSchema(tableName, columns));
109 + await _adminClient.ExecuteControlCommandAsync(_database, command);
110 + _logger.LogInformation("Table {TableName} schema created/merged successfully", tableName);
111 + }
112 +
113 + private static string ConvertTypeToCslType(Type type) => type switch
114 + {
115 + _ when type == typeof(bool) => "bool",
116 + _ when type == typeof(DateTime) => "datetime",
117 + _ when type == typeof(Guid) => "guid",
118 + _ when type == typeof(int) => "int",
119 + _ when type == typeof(long) => "long",
120 + _ when type == typeof(double) => "real",
121 + _ when type == typeof(string) => "string",
122 + _ when type == typeof(TimeSpan) => "timespan",
123 + _ when type.IsEnum => "string",
124 + _ when type == typeof(byte[]) => "string",
125 + _ => "dynamic"
126 + };
127 + }
128 +
129 + // 数据模型
130 + public class MyData
131 + {
132 + public int Id { get; set; }
133 + public string Name { get; set; }
134 + public DateTime Timestamp { get; set; }
135 + }
136 +
137 + // 启动入口
138 + public class Program
139 + {
140 + public static async Task Main()
141 + {
142 + var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
143 + var logger = loggerFactory.CreateLogger<KustoIngestService>();
144 +
145 + var kustoUri = "https://testkustoanduin.eastasia.kusto.windows.net";
146 + var database = "yourdatabase";
147 + var appId = "1c4b1053-5aef-45dc-9d42-a69ea84b422d";
148 + var appKey = "7fm8Q~CxycQfV3qW5v6G71vRGVCuYM2eLfZ0obC5";
149 + var tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838";
150 +
151 + var ingestService = new KustoIngestService(
152 + kustoUri: kustoUri,
153 + database: database,
154 + appId: appId,
155 + appKey: appKey,
156 + tenantId: tenantId,
157 + logger: logger
158 + );
159 +
160 + await ingestService.EnsureTableExistsAsync<MyData>("your-kusto-table");
161 +
162 + Console.WriteLine("Generating records...");
163 + var data = GenerateTestData(10_0000);
164 +
165 + Console.WriteLine("Ingesting data into Kusto...");
166 + var kustoBuffer = new KustoBuffer<MyData>(ingestService, "your-kusto-table");
167 + foreach (var parition in Partition(data, 2000))
168 + {
169 + kustoBuffer.Add(parition.ToArray());
170 + }
171 + Console.ReadLine();
172 + }
173 +
174 + private static List<MyData> GenerateTestData(int count)
175 + {
176 + var data = new List<MyData>(count);
177 + var now = DateTime.UtcNow;
178 +
179 + Parallel.For(0, count, i =>
180 + {
181 + var item = new MyData
182 + {
183 + Id = i + 1,
184 + Name = $"Item_{i}",
185 + Timestamp = now.AddSeconds(i)
186 + };
187 +
188 + lock (data) { data.Add(item); }
189 + });
190 +
191 + return data;
192 + }
193 +
194 + public static IEnumerable<List<T>> Partition<T>(IEnumerable<T> source, int maxSize)
195 + {
196 + if (source == null)
197 + {
198 + throw new ArgumentNullException(nameof(source));
199 + }
200 + if (maxSize <= 0)
201 + {
202 + throw new ArgumentOutOfRangeException(nameof(maxSize), "MaxSize must larger than 0");
203 + }
204 +
205 + var buffer = new List<T>();
206 + foreach (var item in source)
207 + {
208 + buffer.Add(item);
209 + if (buffer.Count == maxSize)
210 + {
211 + yield return buffer;
212 + buffer = new List<T>();
213 + }
214 + }
215 + if (buffer.Count > 0)
216 + {
217 + yield return buffer;
218 + }
219 + }
220 + }
221 +
222 + /// <summary>
223 + /// KustoBuffer provides a non-blocking way to buffer incoming data and batch write them to Kusto.
224 + /// This design is inspired by the proven ArrayDb implementation:
225 + ///
226 + /// https://github.com/AiursoftWeb/ArrayDb/blob/master/src/Aiursoft.ArrayDb.WriteBuffer/BufferedObjectBucket.cs
227 + /// </summary>
228 + public class KustoBuffer<T>
229 + {
230 + // Dependency: Kusto service for persisting data
231 + private readonly KustoIngestService _kustoService;
232 + private readonly string _tableName;
233 +
234 + // Configurable parameters: maximum sleep time when cold and threshold to stop sleeping
235 + private readonly int _maxSleepMilliSecondsWhenCold;
236 + private readonly int _stopSleepingWhenWriteBufferItemsMoreThan;
237 +
238 + // Engine tasks controlling the write process
239 + private Task _engine = Task.CompletedTask;
240 + private Task _coolDownEngine = Task.CompletedTask;
241 +
242 + // Locks for protecting buffer swapping and engine status switching
243 + private readonly ReaderWriterLockSlim _bufferLock = new ReaderWriterLockSlim();
244 + private readonly object _bufferWriteSwapLock = new object();
245 + private readonly object _engineStatusSwitchLock = new object();
246 +
247 + // Double buffering: _activeBuffer is used for enqueuing new data,
248 + // while _secondaryBuffer is used for swapping and persisting data.
249 + private ConcurrentQueue<T> _activeBuffer = new ConcurrentQueue<T>();
250 + private ConcurrentQueue<T> _secondaryBuffer = new ConcurrentQueue<T>();
251 +
252 + public bool IsCold => _engine.IsCompleted && _coolDownEngine.IsCompleted;
253 +
254 + public bool IsHot => !IsCold;
255 +
256 + public KustoBuffer(
257 + KustoIngestService kustoService,
258 + string tableName,
259 + int maxSleepMilliSecondsWhenCold = 2000,
260 + int stopSleepingWhenWriteBufferItemsMoreThan = 1000)
261 + {
262 + _kustoService = kustoService ?? throw new ArgumentNullException(nameof(kustoService));
263 + _tableName = tableName;
264 + _maxSleepMilliSecondsWhenCold = maxSleepMilliSecondsWhenCold;
265 + _stopSleepingWhenWriteBufferItemsMoreThan = stopSleepingWhenWriteBufferItemsMoreThan;
266 + }
267 +
268 + private static int CalculateSleepTime(double maxSleepMilliSecondsWhenCold, double stopSleepingWhenWriteBufferItemsMoreThan, int writeBufferItemsCount)
269 + {
270 + if (stopSleepingWhenWriteBufferItemsMoreThan <= 0)
271 + {
272 + throw new ArgumentException("B must be a positive number.");
273 + }
274 +
275 + if (writeBufferItemsCount > stopSleepingWhenWriteBufferItemsMoreThan)
276 + {
277 + return 0;
278 + }
279 +
280 + var y = maxSleepMilliSecondsWhenCold * (1 - (Math.Log(1 + writeBufferItemsCount) / Math.Log(1 + stopSleepingWhenWriteBufferItemsMoreThan)));
281 + return (int)y;
282 + }
283 +
284 + /// <summary>
285 + /// Adds objects to the buffer in a non-blocking manner.
286 + /// This method wakes up the engine to persist the data.
287 + /// </summary>
288 + public void Add(params T[] objs)
289 + {
290 + lock (_bufferWriteSwapLock)
291 + {
292 + foreach (var obj in objs)
293 + {
294 + _activeBuffer.Enqueue(obj);
295 + }
296 + }
297 +
298 + if (objs.Length == 0)
299 + {
300 + return;
301 + }
302 +
303 + // Get the engine status in advanced to avoid lock contention.
304 + if (!IsCold)
305 + {
306 + // Most of the cases in a high-frequency environment, the engine is still running.
307 + return;
308 + }
309 +
310 + // Engine might be sleeping. Wake it up.
311 + lock (_engineStatusSwitchLock)
312 + {
313 + // Avoid multiple threads to wake up the engine at the same time.
314 + if (!IsCold)
315 + {
316 + return;
317 + }
318 +
319 + _engine = Task.Run(WriteBuffered);
320 + }
321 + }
322 +
323 + /// <summary>
324 + /// Persists the buffered data in batches to the Kusto database.
325 + /// This method is guaranteed to be executed by a single thread at a time.
326 + /// </summary>
327 + private async Task WriteBuffered()
328 + {
329 + _bufferLock.EnterWriteLock();
330 + try
331 + {
332 + ConcurrentQueue<T> bufferToPersist;
333 + lock (_bufferWriteSwapLock)
334 + {
335 + // Swap active and secondary buffers
336 + bufferToPersist = _activeBuffer;
337 + _activeBuffer = _secondaryBuffer;
338 + _secondaryBuffer = new ConcurrentQueue<T>();
339 + }
340 +
341 + var dataToWrite = bufferToPersist.ToArray();
342 +
343 + // Release the buffer to avoid memory leak.
344 + bufferToPersist.Clear();
345 +
346 + // If there is data, batch write to Kusto
347 + if (dataToWrite.Length > 0)
348 + {
349 + // Process the buffer to persist
350 + await _kustoService.SaveToKustoInBatchAsync(dataToWrite, _tableName);
351 + }
352 + }
353 + finally
354 + {
355 + _bufferLock.ExitWriteLock();
356 + }
357 +
358 + // While we are writing, new data may be added to the buffer. If so, we need to write it too.
359 + if (!_activeBuffer.IsEmpty)
360 + {
361 + // Restart the engine to write the new added data.
362 + // Before engine quits, it wakes up cool down engine to ensure the engine will be restarted.
363 + // Before cool down quits, it wakes up the engine to ensure the engine will be restarted.
364 + // So if one of the two tasks is running, the engine will be restarted. And this buffer is in a hot state.
365 + // In a hot state, you don't have to start the engine again.
366 + _coolDownEngine = Task.Run(async () =>
367 + {
368 + // Slow down a little bit to wait for more data to come.
369 + // If we persist too fast, and the buffer is almost empty, frequent write will cause data fragmentation.
370 + // If we persist too slow, and a lot of new data has been added to the buffer, and the engine wasted time in sleeping.
371 + // So the time to sleep is calculated by the number of items in the buffer.
372 + var sleepTime = CalculateSleepTime(
373 + _maxSleepMilliSecondsWhenCold,
374 + _stopSleepingWhenWriteBufferItemsMoreThan,
375 + _activeBuffer.Count);
376 + await Task.Delay(sleepTime);
377 +
378 + // Wake up the engine to write the new added data.
379 + _engine = Task.Run(WriteBuffered);
380 + });
381 + }
382 + }
383 +
384 + /// <summary>
385 + /// Synchronizes the buffer, ensuring that all pending data is persisted.
386 + /// </summary>
387 + public async Task SyncAsync()
388 + {
389 + // Case 1:
390 + // Engine is working. However, the buffer may still have data that after this phase, the data is still not written.
391 + // Wait two rounds of engine to finish to ensure all data is written.
392 + // Cool down engine will ensure restart the engine to write the remaining data.
393 + // Wait for the engine to finish.
394 +
395 + // Case 2:
396 + // The engine is not working. In this case, it might be in the cool down phase.
397 + // The first wait is just await a completed task.
398 + // Cool down engine will ensure restart the engine to write the remaining data.
399 + // Then wait for the engine to finish.
400 + await _engine;
401 + await _coolDownEngine;
402 + await _engine;
403 + }
404 + }
405 +
Newer Older