Program.cs
· 15 KiB · C#
Raw
using System.Collections.Concurrent;
using System.Data;
using Kusto.Cloud.Platform.Data;
using Kusto.Cloud.Platform.Utils;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
public class KustoIngestService
{
private readonly IKustoIngestClient _ingestClient;
private readonly ICslAdminProvider _adminClient;
private readonly string _database;
private readonly Microsoft.Extensions.Logging.ILogger<KustoIngestService> _logger;
public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId, Microsoft.Extensions.Logging.ILogger<KustoIngestService> logger)
{
var kustoCsb = new KustoConnectionStringBuilder(kustoUri)
.WithAadApplicationKeyAuthentication(appId, appKey, tenantId);
_ingestClient = KustoIngestFactory.CreateDirectIngestClient(kustoCsb);
_adminClient = KustoClientFactory.CreateCslAdminProvider(kustoCsb);
_database = database;
_logger = logger;
}
public async Task SaveToKustoInBatchAsync<T>(T[] dataToWrite, string table)
{
foreach (var parition in Program.Partition(dataToWrite, 150))
{
var dataTable = parition.ToDataTable();
await IngestDataAsync(dataTable, table);
}
await IngestDataAsync(dataToWrite.ToDataTable(), table);
}
public async Task IngestDataAsync(DataTable dataTable, string tableName)
{
try
{
_logger.LogInformation("Starting ingestion of {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName);
var ingestionProperties = new KustoIngestionProperties(_database, tableName);
await _ingestClient.IngestFromDataReaderAsync(dataTable.CreateDataReader(), ingestionProperties);
_logger.LogInformation("Successfully ingested {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during ingestion into table {TableName}", tableName);
throw;
}
}
public async Task EnsureTableExistsAsync<T>(string tableName)
{
_logger.LogInformation("Ensuring table {TableName} exists.", tableName);
var schema = await GetDatabaseSchemaAsync();
if (schema == null)
{
_logger.LogWarning("Database schema is null, creating new table {TableName}", tableName);
await CreateMergeTable<T>(tableName);
}
else if (!schema.Tables.TryGetValue(tableName, out var tableSchema))
{
_logger.LogInformation("Table {TableName} does not exist. Creating it.", tableName);
await CreateMergeTable<T>(tableName);
}
else
{
_logger.LogInformation("Table {TableName} exists. Verifying schema.", tableName);
var existingColumns = new HashSet<string>(tableSchema.OrderedColumns.Select(c => c.Name));
var newColumns = typeof(T).GetProperties().Select(p => p.Name).Where(c => !existingColumns.Contains(c));
if (newColumns.Any())
{
_logger.LogWarning("New columns detected in table {TableName}, updating schema.", tableName);
await CreateMergeTable<T>(tableName);
}
}
}
private async Task<DatabaseSchema> GetDatabaseSchemaAsync()
{
_logger.LogInformation("Fetching database schema for {Database}", _database);
try
{
var command = CslCommandGenerator.GenerateDatabaseSchemaShowAsJsonCommand(_database);
using var reader = await _adminClient.ExecuteControlCommandAsync(_database, command);
var schemaJson = reader.ToEnumerable<string>().FirstOrDefault();
var clusterSchema = JsonConvert.DeserializeObject<ClusterSchema>(schemaJson);
return clusterSchema?.Databases.GetValueOrDefault(_database);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to retrieve database schema for {Database}", _database);
throw;
}
}
private async Task CreateMergeTable<T>(string tableName)
{
_logger.LogInformation("Creating or merging table schema for {TableName}", tableName);
var columns = typeof(T).GetProperties().Select(p => ColumnSchema.FromNameAndCslType(p.Name, ConvertTypeToCslType(p.PropertyType)));
var command = CslCommandGenerator.GenerateTableCreateMergeCommand(new TableSchema(tableName, columns));
await _adminClient.ExecuteControlCommandAsync(_database, command);
_logger.LogInformation("Table {TableName} schema created/merged successfully", tableName);
}
private static string ConvertTypeToCslType(Type type) => type switch
{
_ when type == typeof(bool) => "bool",
_ when type == typeof(DateTime) => "datetime",
_ when type == typeof(Guid) => "guid",
_ when type == typeof(int) => "int",
_ when type == typeof(long) => "long",
_ when type == typeof(double) => "real",
_ when type == typeof(string) => "string",
_ when type == typeof(TimeSpan) => "timespan",
_ when type.IsEnum => "string",
_ when type == typeof(byte[]) => "string",
_ => "dynamic"
};
}
// 数据模型
public class MyData
{
public int Id { get; set; }
public string Name { get; set; }
public DateTime Timestamp { get; set; }
}
// 启动入口
public class Program
{
public static async Task Main()
{
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger<KustoIngestService>();
var kustoUri = "https://testkustoanduin.eastasia.kusto.windows.net";
var database = "yourdatabase";
var appId = "1c4b1053-5aef-45dc-9d42-a69ea84b422d";
var appKey = "7fm8Q~CxycQfV3qW5v6G71vRGVCuYM2eLfZ0obC5";
var tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838";
var ingestService = new KustoIngestService(
kustoUri: kustoUri,
database: database,
appId: appId,
appKey: appKey,
tenantId: tenantId,
logger: logger
);
await ingestService.EnsureTableExistsAsync<MyData>("your-kusto-table");
Console.WriteLine("Generating records...");
var data = GenerateTestData(10_0000);
Console.WriteLine("Ingesting data into Kusto...");
var kustoBuffer = new KustoBuffer<MyData>(ingestService, "your-kusto-table");
foreach (var parition in Partition(data, 2000))
{
kustoBuffer.Add(parition.ToArray());
}
Console.ReadLine();
}
private static List<MyData> GenerateTestData(int count)
{
var data = new List<MyData>(count);
var now = DateTime.UtcNow;
Parallel.For(0, count, i =>
{
var item = new MyData
{
Id = i + 1,
Name = $"Item_{i}",
Timestamp = now.AddSeconds(i)
};
lock (data) { data.Add(item); }
});
return data;
}
public static IEnumerable<List<T>> Partition<T>(IEnumerable<T> source, int maxSize)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (maxSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxSize), "MaxSize must larger than 0");
}
var buffer = new List<T>();
foreach (var item in source)
{
buffer.Add(item);
if (buffer.Count == maxSize)
{
yield return buffer;
buffer = new List<T>();
}
}
if (buffer.Count > 0)
{
yield return buffer;
}
}
}
/// <summary>
/// KustoBuffer provides a non-blocking way to buffer incoming data and batch write them to Kusto.
/// This design is inspired by the proven ArrayDb implementation:
///
/// https://github.com/AiursoftWeb/ArrayDb/blob/master/src/Aiursoft.ArrayDb.WriteBuffer/BufferedObjectBucket.cs
/// </summary>
public class KustoBuffer<T>
{
// Dependency: Kusto service for persisting data
private readonly KustoIngestService _kustoService;
private readonly string _tableName;
// Configurable parameters: maximum sleep time when cold and threshold to stop sleeping
private readonly int _maxSleepMilliSecondsWhenCold;
private readonly int _stopSleepingWhenWriteBufferItemsMoreThan;
// Engine tasks controlling the write process
private Task _engine = Task.CompletedTask;
private Task _coolDownEngine = Task.CompletedTask;
// Locks for protecting buffer swapping and engine status switching
private readonly ReaderWriterLockSlim _bufferLock = new ReaderWriterLockSlim();
private readonly object _bufferWriteSwapLock = new object();
private readonly object _engineStatusSwitchLock = new object();
// Double buffering: _activeBuffer is used for enqueuing new data,
// while _secondaryBuffer is used for swapping and persisting data.
private ConcurrentQueue<T> _activeBuffer = new ConcurrentQueue<T>();
private ConcurrentQueue<T> _secondaryBuffer = new ConcurrentQueue<T>();
public bool IsCold => _engine.IsCompleted && _coolDownEngine.IsCompleted;
public bool IsHot => !IsCold;
public KustoBuffer(
KustoIngestService kustoService,
string tableName,
int maxSleepMilliSecondsWhenCold = 2000,
int stopSleepingWhenWriteBufferItemsMoreThan = 1000)
{
_kustoService = kustoService ?? throw new ArgumentNullException(nameof(kustoService));
_tableName = tableName;
_maxSleepMilliSecondsWhenCold = maxSleepMilliSecondsWhenCold;
_stopSleepingWhenWriteBufferItemsMoreThan = stopSleepingWhenWriteBufferItemsMoreThan;
}
private static int CalculateSleepTime(double maxSleepMilliSecondsWhenCold, double stopSleepingWhenWriteBufferItemsMoreThan, int writeBufferItemsCount)
{
if (stopSleepingWhenWriteBufferItemsMoreThan <= 0)
{
throw new ArgumentException("B must be a positive number.");
}
if (writeBufferItemsCount > stopSleepingWhenWriteBufferItemsMoreThan)
{
return 0;
}
var y = maxSleepMilliSecondsWhenCold * (1 - (Math.Log(1 + writeBufferItemsCount) / Math.Log(1 + stopSleepingWhenWriteBufferItemsMoreThan)));
return (int)y;
}
/// <summary>
/// Adds objects to the buffer in a non-blocking manner.
/// This method wakes up the engine to persist the data.
/// </summary>
public void Add(params T[] objs)
{
lock (_bufferWriteSwapLock)
{
foreach (var obj in objs)
{
_activeBuffer.Enqueue(obj);
}
}
if (objs.Length == 0)
{
return;
}
// Get the engine status in advanced to avoid lock contention.
if (!IsCold)
{
// Most of the cases in a high-frequency environment, the engine is still running.
return;
}
// Engine might be sleeping. Wake it up.
lock (_engineStatusSwitchLock)
{
// Avoid multiple threads to wake up the engine at the same time.
if (!IsCold)
{
return;
}
_engine = Task.Run(WriteBuffered);
}
}
/// <summary>
/// Persists the buffered data in batches to the Kusto database.
/// This method is guaranteed to be executed by a single thread at a time.
/// </summary>
private async Task WriteBuffered()
{
_bufferLock.EnterWriteLock();
try
{
ConcurrentQueue<T> bufferToPersist;
lock (_bufferWriteSwapLock)
{
// Swap active and secondary buffers
bufferToPersist = _activeBuffer;
_activeBuffer = _secondaryBuffer;
_secondaryBuffer = new ConcurrentQueue<T>();
}
var dataToWrite = bufferToPersist.ToArray();
// Release the buffer to avoid memory leak.
bufferToPersist.Clear();
// If there is data, batch write to Kusto
if (dataToWrite.Length > 0)
{
// Process the buffer to persist
await _kustoService.SaveToKustoInBatchAsync(dataToWrite, _tableName);
}
}
finally
{
_bufferLock.ExitWriteLock();
}
// While we are writing, new data may be added to the buffer. If so, we need to write it too.
if (!_activeBuffer.IsEmpty)
{
// Restart the engine to write the new added data.
// Before engine quits, it wakes up cool down engine to ensure the engine will be restarted.
// Before cool down quits, it wakes up the engine to ensure the engine will be restarted.
// So if one of the two tasks is running, the engine will be restarted. And this buffer is in a hot state.
// In a hot state, you don't have to start the engine again.
_coolDownEngine = Task.Run(async () =>
{
// Slow down a little bit to wait for more data to come.
// If we persist too fast, and the buffer is almost empty, frequent write will cause data fragmentation.
// If we persist too slow, and a lot of new data has been added to the buffer, and the engine wasted time in sleeping.
// So the time to sleep is calculated by the number of items in the buffer.
var sleepTime = CalculateSleepTime(
_maxSleepMilliSecondsWhenCold,
_stopSleepingWhenWriteBufferItemsMoreThan,
_activeBuffer.Count);
await Task.Delay(sleepTime);
// Wake up the engine to write the new added data.
_engine = Task.Run(WriteBuffered);
});
}
}
/// <summary>
/// Synchronizes the buffer, ensuring that all pending data is persisted.
/// </summary>
public async Task SyncAsync()
{
// Case 1:
// Engine is working. However, the buffer may still have data that after this phase, the data is still not written.
// Wait two rounds of engine to finish to ensure all data is written.
// Cool down engine will ensure restart the engine to write the remaining data.
// Wait for the engine to finish.
// Case 2:
// The engine is not working. In this case, it might be in the cool down phase.
// The first wait is just await a completed task.
// Cool down engine will ensure restart the engine to write the remaining data.
// Then wait for the engine to finish.
await _engine;
await _coolDownEngine;
await _engine;
}
}
1 | using System.Collections.Concurrent; |
2 | using System.Data; |
3 | using Kusto.Cloud.Platform.Data; |
4 | using Kusto.Cloud.Platform.Utils; |
5 | using Kusto.Data; |
6 | using Kusto.Data.Common; |
7 | using Kusto.Data.Net.Client; |
8 | using Kusto.Ingest; |
9 | using Microsoft.Extensions.Logging; |
10 | using Newtonsoft.Json; |
11 | |
12 | public class KustoIngestService |
13 | { |
14 | private readonly IKustoIngestClient _ingestClient; |
15 | private readonly ICslAdminProvider _adminClient; |
16 | private readonly string _database; |
17 | private readonly Microsoft.Extensions.Logging.ILogger<KustoIngestService> _logger; |
18 | |
19 | public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId, Microsoft.Extensions.Logging.ILogger<KustoIngestService> logger) |
20 | { |
21 | var kustoCsb = new KustoConnectionStringBuilder(kustoUri) |
22 | .WithAadApplicationKeyAuthentication(appId, appKey, tenantId); |
23 | |
24 | _ingestClient = KustoIngestFactory.CreateDirectIngestClient(kustoCsb); |
25 | _adminClient = KustoClientFactory.CreateCslAdminProvider(kustoCsb); |
26 | _database = database; |
27 | _logger = logger; |
28 | } |
29 | |
30 | public async Task SaveToKustoInBatchAsync<T>(T[] dataToWrite, string table) |
31 | { |
32 | foreach (var parition in Program.Partition(dataToWrite, 150)) |
33 | { |
34 | var dataTable = parition.ToDataTable(); |
35 | await IngestDataAsync(dataTable, table); |
36 | } |
37 | |
38 | await IngestDataAsync(dataToWrite.ToDataTable(), table); |
39 | } |
40 | |
41 | public async Task IngestDataAsync(DataTable dataTable, string tableName) |
42 | { |
43 | try |
44 | { |
45 | _logger.LogInformation("Starting ingestion of {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName); |
46 | var ingestionProperties = new KustoIngestionProperties(_database, tableName); |
47 | await _ingestClient.IngestFromDataReaderAsync(dataTable.CreateDataReader(), ingestionProperties); |
48 | _logger.LogInformation("Successfully ingested {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName); |
49 | } |
50 | catch (Exception ex) |
51 | { |
52 | _logger.LogError(ex, "Error during ingestion into table {TableName}", tableName); |
53 | throw; |
54 | } |
55 | } |
56 | |
57 | public async Task EnsureTableExistsAsync<T>(string tableName) |
58 | { |
59 | _logger.LogInformation("Ensuring table {TableName} exists.", tableName); |
60 | |
61 | var schema = await GetDatabaseSchemaAsync(); |
62 | if (schema == null) |
63 | { |
64 | _logger.LogWarning("Database schema is null, creating new table {TableName}", tableName); |
65 | await CreateMergeTable<T>(tableName); |
66 | } |
67 | else if (!schema.Tables.TryGetValue(tableName, out var tableSchema)) |
68 | { |
69 | _logger.LogInformation("Table {TableName} does not exist. Creating it.", tableName); |
70 | await CreateMergeTable<T>(tableName); |
71 | } |
72 | else |
73 | { |
74 | _logger.LogInformation("Table {TableName} exists. Verifying schema.", tableName); |
75 | var existingColumns = new HashSet<string>(tableSchema.OrderedColumns.Select(c => c.Name)); |
76 | var newColumns = typeof(T).GetProperties().Select(p => p.Name).Where(c => !existingColumns.Contains(c)); |
77 | |
78 | if (newColumns.Any()) |
79 | { |
80 | _logger.LogWarning("New columns detected in table {TableName}, updating schema.", tableName); |
81 | await CreateMergeTable<T>(tableName); |
82 | } |
83 | } |
84 | } |
85 | |
86 | private async Task<DatabaseSchema> GetDatabaseSchemaAsync() |
87 | { |
88 | _logger.LogInformation("Fetching database schema for {Database}", _database); |
89 | try |
90 | { |
91 | var command = CslCommandGenerator.GenerateDatabaseSchemaShowAsJsonCommand(_database); |
92 | using var reader = await _adminClient.ExecuteControlCommandAsync(_database, command); |
93 | var schemaJson = reader.ToEnumerable<string>().FirstOrDefault(); |
94 | var clusterSchema = JsonConvert.DeserializeObject<ClusterSchema>(schemaJson); |
95 | return clusterSchema?.Databases.GetValueOrDefault(_database); |
96 | } |
97 | catch (Exception ex) |
98 | { |
99 | _logger.LogError(ex, "Failed to retrieve database schema for {Database}", _database); |
100 | throw; |
101 | } |
102 | } |
103 | |
104 | private async Task CreateMergeTable<T>(string tableName) |
105 | { |
106 | _logger.LogInformation("Creating or merging table schema for {TableName}", tableName); |
107 | var columns = typeof(T).GetProperties().Select(p => ColumnSchema.FromNameAndCslType(p.Name, ConvertTypeToCslType(p.PropertyType))); |
108 | var command = CslCommandGenerator.GenerateTableCreateMergeCommand(new TableSchema(tableName, columns)); |
109 | await _adminClient.ExecuteControlCommandAsync(_database, command); |
110 | _logger.LogInformation("Table {TableName} schema created/merged successfully", tableName); |
111 | } |
112 | |
113 | private static string ConvertTypeToCslType(Type type) => type switch |
114 | { |
115 | _ when type == typeof(bool) => "bool", |
116 | _ when type == typeof(DateTime) => "datetime", |
117 | _ when type == typeof(Guid) => "guid", |
118 | _ when type == typeof(int) => "int", |
119 | _ when type == typeof(long) => "long", |
120 | _ when type == typeof(double) => "real", |
121 | _ when type == typeof(string) => "string", |
122 | _ when type == typeof(TimeSpan) => "timespan", |
123 | _ when type.IsEnum => "string", |
124 | _ when type == typeof(byte[]) => "string", |
125 | _ => "dynamic" |
126 | }; |
127 | } |
128 | |
129 | // 数据模型 |
130 | public class MyData |
131 | { |
132 | public int Id { get; set; } |
133 | public string Name { get; set; } |
134 | public DateTime Timestamp { get; set; } |
135 | } |
136 | |
137 | // 启动入口 |
138 | public class Program |
139 | { |
140 | public static async Task Main() |
141 | { |
142 | var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole()); |
143 | var logger = loggerFactory.CreateLogger<KustoIngestService>(); |
144 | |
145 | var kustoUri = "https://testkustoanduin.eastasia.kusto.windows.net"; |
146 | var database = "yourdatabase"; |
147 | var appId = "1c4b1053-5aef-45dc-9d42-a69ea84b422d"; |
148 | var appKey = "7fm8Q~CxycQfV3qW5v6G71vRGVCuYM2eLfZ0obC5"; |
149 | var tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838"; |
150 | |
151 | var ingestService = new KustoIngestService( |
152 | kustoUri: kustoUri, |
153 | database: database, |
154 | appId: appId, |
155 | appKey: appKey, |
156 | tenantId: tenantId, |
157 | logger: logger |
158 | ); |
159 | |
160 | await ingestService.EnsureTableExistsAsync<MyData>("your-kusto-table"); |
161 | |
162 | Console.WriteLine("Generating records..."); |
163 | var data = GenerateTestData(10_0000); |
164 | |
165 | Console.WriteLine("Ingesting data into Kusto..."); |
166 | var kustoBuffer = new KustoBuffer<MyData>(ingestService, "your-kusto-table"); |
167 | foreach (var parition in Partition(data, 2000)) |
168 | { |
169 | kustoBuffer.Add(parition.ToArray()); |
170 | } |
171 | Console.ReadLine(); |
172 | } |
173 | |
174 | private static List<MyData> GenerateTestData(int count) |
175 | { |
176 | var data = new List<MyData>(count); |
177 | var now = DateTime.UtcNow; |
178 | |
179 | Parallel.For(0, count, i => |
180 | { |
181 | var item = new MyData |
182 | { |
183 | Id = i + 1, |
184 | Name = $"Item_{i}", |
185 | Timestamp = now.AddSeconds(i) |
186 | }; |
187 | |
188 | lock (data) { data.Add(item); } |
189 | }); |
190 | |
191 | return data; |
192 | } |
193 | |
194 | public static IEnumerable<List<T>> Partition<T>(IEnumerable<T> source, int maxSize) |
195 | { |
196 | if (source == null) |
197 | { |
198 | throw new ArgumentNullException(nameof(source)); |
199 | } |
200 | if (maxSize <= 0) |
201 | { |
202 | throw new ArgumentOutOfRangeException(nameof(maxSize), "MaxSize must larger than 0"); |
203 | } |
204 | |
205 | var buffer = new List<T>(); |
206 | foreach (var item in source) |
207 | { |
208 | buffer.Add(item); |
209 | if (buffer.Count == maxSize) |
210 | { |
211 | yield return buffer; |
212 | buffer = new List<T>(); |
213 | } |
214 | } |
215 | if (buffer.Count > 0) |
216 | { |
217 | yield return buffer; |
218 | } |
219 | } |
220 | } |
221 | |
222 | /// <summary> |
223 | /// KustoBuffer provides a non-blocking way to buffer incoming data and batch write them to Kusto. |
224 | /// This design is inspired by the proven ArrayDb implementation: |
225 | /// |
226 | /// https://github.com/AiursoftWeb/ArrayDb/blob/master/src/Aiursoft.ArrayDb.WriteBuffer/BufferedObjectBucket.cs |
227 | /// </summary> |
228 | public class KustoBuffer<T> |
229 | { |
230 | // Dependency: Kusto service for persisting data |
231 | private readonly KustoIngestService _kustoService; |
232 | private readonly string _tableName; |
233 | |
234 | // Configurable parameters: maximum sleep time when cold and threshold to stop sleeping |
235 | private readonly int _maxSleepMilliSecondsWhenCold; |
236 | private readonly int _stopSleepingWhenWriteBufferItemsMoreThan; |
237 | |
238 | // Engine tasks controlling the write process |
239 | private Task _engine = Task.CompletedTask; |
240 | private Task _coolDownEngine = Task.CompletedTask; |
241 | |
242 | // Locks for protecting buffer swapping and engine status switching |
243 | private readonly ReaderWriterLockSlim _bufferLock = new ReaderWriterLockSlim(); |
244 | private readonly object _bufferWriteSwapLock = new object(); |
245 | private readonly object _engineStatusSwitchLock = new object(); |
246 | |
247 | // Double buffering: _activeBuffer is used for enqueuing new data, |
248 | // while _secondaryBuffer is used for swapping and persisting data. |
249 | private ConcurrentQueue<T> _activeBuffer = new ConcurrentQueue<T>(); |
250 | private ConcurrentQueue<T> _secondaryBuffer = new ConcurrentQueue<T>(); |
251 | |
252 | public bool IsCold => _engine.IsCompleted && _coolDownEngine.IsCompleted; |
253 | |
254 | public bool IsHot => !IsCold; |
255 | |
256 | public KustoBuffer( |
257 | KustoIngestService kustoService, |
258 | string tableName, |
259 | int maxSleepMilliSecondsWhenCold = 2000, |
260 | int stopSleepingWhenWriteBufferItemsMoreThan = 1000) |
261 | { |
262 | _kustoService = kustoService ?? throw new ArgumentNullException(nameof(kustoService)); |
263 | _tableName = tableName; |
264 | _maxSleepMilliSecondsWhenCold = maxSleepMilliSecondsWhenCold; |
265 | _stopSleepingWhenWriteBufferItemsMoreThan = stopSleepingWhenWriteBufferItemsMoreThan; |
266 | } |
267 | |
268 | private static int CalculateSleepTime(double maxSleepMilliSecondsWhenCold, double stopSleepingWhenWriteBufferItemsMoreThan, int writeBufferItemsCount) |
269 | { |
270 | if (stopSleepingWhenWriteBufferItemsMoreThan <= 0) |
271 | { |
272 | throw new ArgumentException("B must be a positive number."); |
273 | } |
274 | |
275 | if (writeBufferItemsCount > stopSleepingWhenWriteBufferItemsMoreThan) |
276 | { |
277 | return 0; |
278 | } |
279 | |
280 | var y = maxSleepMilliSecondsWhenCold * (1 - (Math.Log(1 + writeBufferItemsCount) / Math.Log(1 + stopSleepingWhenWriteBufferItemsMoreThan))); |
281 | return (int)y; |
282 | } |
283 | |
284 | /// <summary> |
285 | /// Adds objects to the buffer in a non-blocking manner. |
286 | /// This method wakes up the engine to persist the data. |
287 | /// </summary> |
288 | public void Add(params T[] objs) |
289 | { |
290 | lock (_bufferWriteSwapLock) |
291 | { |
292 | foreach (var obj in objs) |
293 | { |
294 | _activeBuffer.Enqueue(obj); |
295 | } |
296 | } |
297 | |
298 | if (objs.Length == 0) |
299 | { |
300 | return; |
301 | } |
302 | |
303 | // Get the engine status in advanced to avoid lock contention. |
304 | if (!IsCold) |
305 | { |
306 | // Most of the cases in a high-frequency environment, the engine is still running. |
307 | return; |
308 | } |
309 | |
310 | // Engine might be sleeping. Wake it up. |
311 | lock (_engineStatusSwitchLock) |
312 | { |
313 | // Avoid multiple threads to wake up the engine at the same time. |
314 | if (!IsCold) |
315 | { |
316 | return; |
317 | } |
318 | |
319 | _engine = Task.Run(WriteBuffered); |
320 | } |
321 | } |
322 | |
323 | /// <summary> |
324 | /// Persists the buffered data in batches to the Kusto database. |
325 | /// This method is guaranteed to be executed by a single thread at a time. |
326 | /// </summary> |
327 | private async Task WriteBuffered() |
328 | { |
329 | _bufferLock.EnterWriteLock(); |
330 | try |
331 | { |
332 | ConcurrentQueue<T> bufferToPersist; |
333 | lock (_bufferWriteSwapLock) |
334 | { |
335 | // Swap active and secondary buffers |
336 | bufferToPersist = _activeBuffer; |
337 | _activeBuffer = _secondaryBuffer; |
338 | _secondaryBuffer = new ConcurrentQueue<T>(); |
339 | } |
340 | |
341 | var dataToWrite = bufferToPersist.ToArray(); |
342 | |
343 | // Release the buffer to avoid memory leak. |
344 | bufferToPersist.Clear(); |
345 | |
346 | // If there is data, batch write to Kusto |
347 | if (dataToWrite.Length > 0) |
348 | { |
349 | // Process the buffer to persist |
350 | await _kustoService.SaveToKustoInBatchAsync(dataToWrite, _tableName); |
351 | } |
352 | } |
353 | finally |
354 | { |
355 | _bufferLock.ExitWriteLock(); |
356 | } |
357 | |
358 | // While we are writing, new data may be added to the buffer. If so, we need to write it too. |
359 | if (!_activeBuffer.IsEmpty) |
360 | { |
361 | // Restart the engine to write the new added data. |
362 | // Before engine quits, it wakes up cool down engine to ensure the engine will be restarted. |
363 | // Before cool down quits, it wakes up the engine to ensure the engine will be restarted. |
364 | // So if one of the two tasks is running, the engine will be restarted. And this buffer is in a hot state. |
365 | // In a hot state, you don't have to start the engine again. |
366 | _coolDownEngine = Task.Run(async () => |
367 | { |
368 | // Slow down a little bit to wait for more data to come. |
369 | // If we persist too fast, and the buffer is almost empty, frequent write will cause data fragmentation. |
370 | // If we persist too slow, and a lot of new data has been added to the buffer, and the engine wasted time in sleeping. |
371 | // So the time to sleep is calculated by the number of items in the buffer. |
372 | var sleepTime = CalculateSleepTime( |
373 | _maxSleepMilliSecondsWhenCold, |
374 | _stopSleepingWhenWriteBufferItemsMoreThan, |
375 | _activeBuffer.Count); |
376 | await Task.Delay(sleepTime); |
377 | |
378 | // Wake up the engine to write the new added data. |
379 | _engine = Task.Run(WriteBuffered); |
380 | }); |
381 | } |
382 | } |
383 | |
384 | /// <summary> |
385 | /// Synchronizes the buffer, ensuring that all pending data is persisted. |
386 | /// </summary> |
387 | public async Task SyncAsync() |
388 | { |
389 | // Case 1: |
390 | // Engine is working. However, the buffer may still have data that after this phase, the data is still not written. |
391 | // Wait two rounds of engine to finish to ensure all data is written. |
392 | // Cool down engine will ensure restart the engine to write the remaining data. |
393 | // Wait for the engine to finish. |
394 | |
395 | // Case 2: |
396 | // The engine is not working. In this case, it might be in the cool down phase. |
397 | // The first wait is just await a completed task. |
398 | // Cool down engine will ensure restart the engine to write the remaining data. |
399 | // Then wait for the engine to finish. |
400 | await _engine; |
401 | await _coolDownEngine; |
402 | await _engine; |
403 | } |
404 | } |
405 | |
406 |