anduin revised this gist . Go to revision
1 file changed, 405 insertions
Program.cs(file created)
@@ -0,0 +1,405 @@ | |||
1 | + | using System.Collections.Concurrent; | |
2 | + | using System.Data; | |
3 | + | using Kusto.Cloud.Platform.Data; | |
4 | + | using Kusto.Cloud.Platform.Utils; | |
5 | + | using Kusto.Data; | |
6 | + | using Kusto.Data.Common; | |
7 | + | using Kusto.Data.Net.Client; | |
8 | + | using Kusto.Ingest; | |
9 | + | using Microsoft.Extensions.Logging; | |
10 | + | using Newtonsoft.Json; | |
11 | + | ||
12 | + | public class KustoIngestService | |
13 | + | { | |
14 | + | private readonly IKustoIngestClient _ingestClient; | |
15 | + | private readonly ICslAdminProvider _adminClient; | |
16 | + | private readonly string _database; | |
17 | + | private readonly Microsoft.Extensions.Logging.ILogger<KustoIngestService> _logger; | |
18 | + | ||
19 | + | public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId, Microsoft.Extensions.Logging.ILogger<KustoIngestService> logger) | |
20 | + | { | |
21 | + | var kustoCsb = new KustoConnectionStringBuilder(kustoUri) | |
22 | + | .WithAadApplicationKeyAuthentication(appId, appKey, tenantId); | |
23 | + | ||
24 | + | _ingestClient = KustoIngestFactory.CreateDirectIngestClient(kustoCsb); | |
25 | + | _adminClient = KustoClientFactory.CreateCslAdminProvider(kustoCsb); | |
26 | + | _database = database; | |
27 | + | _logger = logger; | |
28 | + | } | |
29 | + | ||
30 | + | public async Task SaveToKustoInBatchAsync<T>(T[] dataToWrite, string table) | |
31 | + | { | |
32 | + | foreach (var parition in Program.Partition(dataToWrite, 150)) | |
33 | + | { | |
34 | + | var dataTable = parition.ToDataTable(); | |
35 | + | await IngestDataAsync(dataTable, table); | |
36 | + | } | |
37 | + | ||
38 | + | await IngestDataAsync(dataToWrite.ToDataTable(), table); | |
39 | + | } | |
40 | + | ||
41 | + | public async Task IngestDataAsync(DataTable dataTable, string tableName) | |
42 | + | { | |
43 | + | try | |
44 | + | { | |
45 | + | _logger.LogInformation("Starting ingestion of {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName); | |
46 | + | var ingestionProperties = new KustoIngestionProperties(_database, tableName); | |
47 | + | await _ingestClient.IngestFromDataReaderAsync(dataTable.CreateDataReader(), ingestionProperties); | |
48 | + | _logger.LogInformation("Successfully ingested {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName); | |
49 | + | } | |
50 | + | catch (Exception ex) | |
51 | + | { | |
52 | + | _logger.LogError(ex, "Error during ingestion into table {TableName}", tableName); | |
53 | + | throw; | |
54 | + | } | |
55 | + | } | |
56 | + | ||
57 | + | public async Task EnsureTableExistsAsync<T>(string tableName) | |
58 | + | { | |
59 | + | _logger.LogInformation("Ensuring table {TableName} exists.", tableName); | |
60 | + | ||
61 | + | var schema = await GetDatabaseSchemaAsync(); | |
62 | + | if (schema == null) | |
63 | + | { | |
64 | + | _logger.LogWarning("Database schema is null, creating new table {TableName}", tableName); | |
65 | + | await CreateMergeTable<T>(tableName); | |
66 | + | } | |
67 | + | else if (!schema.Tables.TryGetValue(tableName, out var tableSchema)) | |
68 | + | { | |
69 | + | _logger.LogInformation("Table {TableName} does not exist. Creating it.", tableName); | |
70 | + | await CreateMergeTable<T>(tableName); | |
71 | + | } | |
72 | + | else | |
73 | + | { | |
74 | + | _logger.LogInformation("Table {TableName} exists. Verifying schema.", tableName); | |
75 | + | var existingColumns = new HashSet<string>(tableSchema.OrderedColumns.Select(c => c.Name)); | |
76 | + | var newColumns = typeof(T).GetProperties().Select(p => p.Name).Where(c => !existingColumns.Contains(c)); | |
77 | + | ||
78 | + | if (newColumns.Any()) | |
79 | + | { | |
80 | + | _logger.LogWarning("New columns detected in table {TableName}, updating schema.", tableName); | |
81 | + | await CreateMergeTable<T>(tableName); | |
82 | + | } | |
83 | + | } | |
84 | + | } | |
85 | + | ||
86 | + | private async Task<DatabaseSchema> GetDatabaseSchemaAsync() | |
87 | + | { | |
88 | + | _logger.LogInformation("Fetching database schema for {Database}", _database); | |
89 | + | try | |
90 | + | { | |
91 | + | var command = CslCommandGenerator.GenerateDatabaseSchemaShowAsJsonCommand(_database); | |
92 | + | using var reader = await _adminClient.ExecuteControlCommandAsync(_database, command); | |
93 | + | var schemaJson = reader.ToEnumerable<string>().FirstOrDefault(); | |
94 | + | var clusterSchema = JsonConvert.DeserializeObject<ClusterSchema>(schemaJson); | |
95 | + | return clusterSchema?.Databases.GetValueOrDefault(_database); | |
96 | + | } | |
97 | + | catch (Exception ex) | |
98 | + | { | |
99 | + | _logger.LogError(ex, "Failed to retrieve database schema for {Database}", _database); | |
100 | + | throw; | |
101 | + | } | |
102 | + | } | |
103 | + | ||
104 | + | private async Task CreateMergeTable<T>(string tableName) | |
105 | + | { | |
106 | + | _logger.LogInformation("Creating or merging table schema for {TableName}", tableName); | |
107 | + | var columns = typeof(T).GetProperties().Select(p => ColumnSchema.FromNameAndCslType(p.Name, ConvertTypeToCslType(p.PropertyType))); | |
108 | + | var command = CslCommandGenerator.GenerateTableCreateMergeCommand(new TableSchema(tableName, columns)); | |
109 | + | await _adminClient.ExecuteControlCommandAsync(_database, command); | |
110 | + | _logger.LogInformation("Table {TableName} schema created/merged successfully", tableName); | |
111 | + | } | |
112 | + | ||
113 | + | private static string ConvertTypeToCslType(Type type) => type switch | |
114 | + | { | |
115 | + | _ when type == typeof(bool) => "bool", | |
116 | + | _ when type == typeof(DateTime) => "datetime", | |
117 | + | _ when type == typeof(Guid) => "guid", | |
118 | + | _ when type == typeof(int) => "int", | |
119 | + | _ when type == typeof(long) => "long", | |
120 | + | _ when type == typeof(double) => "real", | |
121 | + | _ when type == typeof(string) => "string", | |
122 | + | _ when type == typeof(TimeSpan) => "timespan", | |
123 | + | _ when type.IsEnum => "string", | |
124 | + | _ when type == typeof(byte[]) => "string", | |
125 | + | _ => "dynamic" | |
126 | + | }; | |
127 | + | } | |
128 | + | ||
129 | + | // 数据模型 | |
130 | + | public class MyData | |
131 | + | { | |
132 | + | public int Id { get; set; } | |
133 | + | public string Name { get; set; } | |
134 | + | public DateTime Timestamp { get; set; } | |
135 | + | } | |
136 | + | ||
137 | + | // 启动入口 | |
138 | + | public class Program | |
139 | + | { | |
140 | + | public static async Task Main() | |
141 | + | { | |
142 | + | var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole()); | |
143 | + | var logger = loggerFactory.CreateLogger<KustoIngestService>(); | |
144 | + | ||
145 | + | var kustoUri = "https://testkustoanduin.eastasia.kusto.windows.net"; | |
146 | + | var database = "yourdatabase"; | |
147 | + | var appId = "1c4b1053-5aef-45dc-9d42-a69ea84b422d"; | |
148 | + | var appKey = "7fm8Q~CxycQfV3qW5v6G71vRGVCuYM2eLfZ0obC5"; | |
149 | + | var tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838"; | |
150 | + | ||
151 | + | var ingestService = new KustoIngestService( | |
152 | + | kustoUri: kustoUri, | |
153 | + | database: database, | |
154 | + | appId: appId, | |
155 | + | appKey: appKey, | |
156 | + | tenantId: tenantId, | |
157 | + | logger: logger | |
158 | + | ); | |
159 | + | ||
160 | + | await ingestService.EnsureTableExistsAsync<MyData>("your-kusto-table"); | |
161 | + | ||
162 | + | Console.WriteLine("Generating records..."); | |
163 | + | var data = GenerateTestData(10_0000); | |
164 | + | ||
165 | + | Console.WriteLine("Ingesting data into Kusto..."); | |
166 | + | var kustoBuffer = new KustoBuffer<MyData>(ingestService, "your-kusto-table"); | |
167 | + | foreach (var parition in Partition(data, 2000)) | |
168 | + | { | |
169 | + | kustoBuffer.Add(parition.ToArray()); | |
170 | + | } | |
171 | + | Console.ReadLine(); | |
172 | + | } | |
173 | + | ||
174 | + | private static List<MyData> GenerateTestData(int count) | |
175 | + | { | |
176 | + | var data = new List<MyData>(count); | |
177 | + | var now = DateTime.UtcNow; | |
178 | + | ||
179 | + | Parallel.For(0, count, i => | |
180 | + | { | |
181 | + | var item = new MyData | |
182 | + | { | |
183 | + | Id = i + 1, | |
184 | + | Name = $"Item_{i}", | |
185 | + | Timestamp = now.AddSeconds(i) | |
186 | + | }; | |
187 | + | ||
188 | + | lock (data) { data.Add(item); } | |
189 | + | }); | |
190 | + | ||
191 | + | return data; | |
192 | + | } | |
193 | + | ||
194 | + | public static IEnumerable<List<T>> Partition<T>(IEnumerable<T> source, int maxSize) | |
195 | + | { | |
196 | + | if (source == null) | |
197 | + | { | |
198 | + | throw new ArgumentNullException(nameof(source)); | |
199 | + | } | |
200 | + | if (maxSize <= 0) | |
201 | + | { | |
202 | + | throw new ArgumentOutOfRangeException(nameof(maxSize), "MaxSize must larger than 0"); | |
203 | + | } | |
204 | + | ||
205 | + | var buffer = new List<T>(); | |
206 | + | foreach (var item in source) | |
207 | + | { | |
208 | + | buffer.Add(item); | |
209 | + | if (buffer.Count == maxSize) | |
210 | + | { | |
211 | + | yield return buffer; | |
212 | + | buffer = new List<T>(); | |
213 | + | } | |
214 | + | } | |
215 | + | if (buffer.Count > 0) | |
216 | + | { | |
217 | + | yield return buffer; | |
218 | + | } | |
219 | + | } | |
220 | + | } | |
221 | + | ||
222 | + | /// <summary> | |
223 | + | /// KustoBuffer provides a non-blocking way to buffer incoming data and batch write them to Kusto. | |
224 | + | /// This design is inspired by the proven ArrayDb implementation: | |
225 | + | /// | |
226 | + | /// https://github.com/AiursoftWeb/ArrayDb/blob/master/src/Aiursoft.ArrayDb.WriteBuffer/BufferedObjectBucket.cs | |
227 | + | /// </summary> | |
228 | + | public class KustoBuffer<T> | |
229 | + | { | |
230 | + | // Dependency: Kusto service for persisting data | |
231 | + | private readonly KustoIngestService _kustoService; | |
232 | + | private readonly string _tableName; | |
233 | + | ||
234 | + | // Configurable parameters: maximum sleep time when cold and threshold to stop sleeping | |
235 | + | private readonly int _maxSleepMilliSecondsWhenCold; | |
236 | + | private readonly int _stopSleepingWhenWriteBufferItemsMoreThan; | |
237 | + | ||
238 | + | // Engine tasks controlling the write process | |
239 | + | private Task _engine = Task.CompletedTask; | |
240 | + | private Task _coolDownEngine = Task.CompletedTask; | |
241 | + | ||
242 | + | // Locks for protecting buffer swapping and engine status switching | |
243 | + | private readonly ReaderWriterLockSlim _bufferLock = new ReaderWriterLockSlim(); | |
244 | + | private readonly object _bufferWriteSwapLock = new object(); | |
245 | + | private readonly object _engineStatusSwitchLock = new object(); | |
246 | + | ||
247 | + | // Double buffering: _activeBuffer is used for enqueuing new data, | |
248 | + | // while _secondaryBuffer is used for swapping and persisting data. | |
249 | + | private ConcurrentQueue<T> _activeBuffer = new ConcurrentQueue<T>(); | |
250 | + | private ConcurrentQueue<T> _secondaryBuffer = new ConcurrentQueue<T>(); | |
251 | + | ||
252 | + | public bool IsCold => _engine.IsCompleted && _coolDownEngine.IsCompleted; | |
253 | + | ||
254 | + | public bool IsHot => !IsCold; | |
255 | + | ||
256 | + | public KustoBuffer( | |
257 | + | KustoIngestService kustoService, | |
258 | + | string tableName, | |
259 | + | int maxSleepMilliSecondsWhenCold = 2000, | |
260 | + | int stopSleepingWhenWriteBufferItemsMoreThan = 1000) | |
261 | + | { | |
262 | + | _kustoService = kustoService ?? throw new ArgumentNullException(nameof(kustoService)); | |
263 | + | _tableName = tableName; | |
264 | + | _maxSleepMilliSecondsWhenCold = maxSleepMilliSecondsWhenCold; | |
265 | + | _stopSleepingWhenWriteBufferItemsMoreThan = stopSleepingWhenWriteBufferItemsMoreThan; | |
266 | + | } | |
267 | + | ||
268 | + | private static int CalculateSleepTime(double maxSleepMilliSecondsWhenCold, double stopSleepingWhenWriteBufferItemsMoreThan, int writeBufferItemsCount) | |
269 | + | { | |
270 | + | if (stopSleepingWhenWriteBufferItemsMoreThan <= 0) | |
271 | + | { | |
272 | + | throw new ArgumentException("B must be a positive number."); | |
273 | + | } | |
274 | + | ||
275 | + | if (writeBufferItemsCount > stopSleepingWhenWriteBufferItemsMoreThan) | |
276 | + | { | |
277 | + | return 0; | |
278 | + | } | |
279 | + | ||
280 | + | var y = maxSleepMilliSecondsWhenCold * (1 - (Math.Log(1 + writeBufferItemsCount) / Math.Log(1 + stopSleepingWhenWriteBufferItemsMoreThan))); | |
281 | + | return (int)y; | |
282 | + | } | |
283 | + | ||
284 | + | /// <summary> | |
285 | + | /// Adds objects to the buffer in a non-blocking manner. | |
286 | + | /// This method wakes up the engine to persist the data. | |
287 | + | /// </summary> | |
288 | + | public void Add(params T[] objs) | |
289 | + | { | |
290 | + | lock (_bufferWriteSwapLock) | |
291 | + | { | |
292 | + | foreach (var obj in objs) | |
293 | + | { | |
294 | + | _activeBuffer.Enqueue(obj); | |
295 | + | } | |
296 | + | } | |
297 | + | ||
298 | + | if (objs.Length == 0) | |
299 | + | { | |
300 | + | return; | |
301 | + | } | |
302 | + | ||
303 | + | // Get the engine status in advanced to avoid lock contention. | |
304 | + | if (!IsCold) | |
305 | + | { | |
306 | + | // Most of the cases in a high-frequency environment, the engine is still running. | |
307 | + | return; | |
308 | + | } | |
309 | + | ||
310 | + | // Engine might be sleeping. Wake it up. | |
311 | + | lock (_engineStatusSwitchLock) | |
312 | + | { | |
313 | + | // Avoid multiple threads to wake up the engine at the same time. | |
314 | + | if (!IsCold) | |
315 | + | { | |
316 | + | return; | |
317 | + | } | |
318 | + | ||
319 | + | _engine = Task.Run(WriteBuffered); | |
320 | + | } | |
321 | + | } | |
322 | + | ||
323 | + | /// <summary> | |
324 | + | /// Persists the buffered data in batches to the Kusto database. | |
325 | + | /// This method is guaranteed to be executed by a single thread at a time. | |
326 | + | /// </summary> | |
327 | + | private async Task WriteBuffered() | |
328 | + | { | |
329 | + | _bufferLock.EnterWriteLock(); | |
330 | + | try | |
331 | + | { | |
332 | + | ConcurrentQueue<T> bufferToPersist; | |
333 | + | lock (_bufferWriteSwapLock) | |
334 | + | { | |
335 | + | // Swap active and secondary buffers | |
336 | + | bufferToPersist = _activeBuffer; | |
337 | + | _activeBuffer = _secondaryBuffer; | |
338 | + | _secondaryBuffer = new ConcurrentQueue<T>(); | |
339 | + | } | |
340 | + | ||
341 | + | var dataToWrite = bufferToPersist.ToArray(); | |
342 | + | ||
343 | + | // Release the buffer to avoid memory leak. | |
344 | + | bufferToPersist.Clear(); | |
345 | + | ||
346 | + | // If there is data, batch write to Kusto | |
347 | + | if (dataToWrite.Length > 0) | |
348 | + | { | |
349 | + | // Process the buffer to persist | |
350 | + | await _kustoService.SaveToKustoInBatchAsync(dataToWrite, _tableName); | |
351 | + | } | |
352 | + | } | |
353 | + | finally | |
354 | + | { | |
355 | + | _bufferLock.ExitWriteLock(); | |
356 | + | } | |
357 | + | ||
358 | + | // While we are writing, new data may be added to the buffer. If so, we need to write it too. | |
359 | + | if (!_activeBuffer.IsEmpty) | |
360 | + | { | |
361 | + | // Restart the engine to write the new added data. | |
362 | + | // Before engine quits, it wakes up cool down engine to ensure the engine will be restarted. | |
363 | + | // Before cool down quits, it wakes up the engine to ensure the engine will be restarted. | |
364 | + | // So if one of the two tasks is running, the engine will be restarted. And this buffer is in a hot state. | |
365 | + | // In a hot state, you don't have to start the engine again. | |
366 | + | _coolDownEngine = Task.Run(async () => | |
367 | + | { | |
368 | + | // Slow down a little bit to wait for more data to come. | |
369 | + | // If we persist too fast, and the buffer is almost empty, frequent write will cause data fragmentation. | |
370 | + | // If we persist too slow, and a lot of new data has been added to the buffer, and the engine wasted time in sleeping. | |
371 | + | // So the time to sleep is calculated by the number of items in the buffer. | |
372 | + | var sleepTime = CalculateSleepTime( | |
373 | + | _maxSleepMilliSecondsWhenCold, | |
374 | + | _stopSleepingWhenWriteBufferItemsMoreThan, | |
375 | + | _activeBuffer.Count); | |
376 | + | await Task.Delay(sleepTime); | |
377 | + | ||
378 | + | // Wake up the engine to write the new added data. | |
379 | + | _engine = Task.Run(WriteBuffered); | |
380 | + | }); | |
381 | + | } | |
382 | + | } | |
383 | + | ||
384 | + | /// <summary> | |
385 | + | /// Synchronizes the buffer, ensuring that all pending data is persisted. | |
386 | + | /// </summary> | |
387 | + | public async Task SyncAsync() | |
388 | + | { | |
389 | + | // Case 1: | |
390 | + | // Engine is working. However, the buffer may still have data that after this phase, the data is still not written. | |
391 | + | // Wait two rounds of engine to finish to ensure all data is written. | |
392 | + | // Cool down engine will ensure restart the engine to write the remaining data. | |
393 | + | // Wait for the engine to finish. | |
394 | + | ||
395 | + | // Case 2: | |
396 | + | // The engine is not working. In this case, it might be in the cool down phase. | |
397 | + | // The first wait is just await a completed task. | |
398 | + | // Cool down engine will ensure restart the engine to write the remaining data. | |
399 | + | // Then wait for the engine to finish. | |
400 | + | await _engine; | |
401 | + | await _coolDownEngine; | |
402 | + | await _engine; | |
403 | + | } | |
404 | + | } | |
405 | + |
Newer
Older