using System.ComponentModel; using System.Data; using System.Text; using Aiursoft.Canon; using Azure.Identity; using Azure.Messaging.ServiceBus; using Kusto.Data; using Kusto.Ingest; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Identity.Client; using Newtonsoft.Json; using LogLevel = Microsoft.Extensions.Logging.LogLevel; namespace LabServiceBus; public abstract class Program { public static async Task Main(string[] args) { var services = new ServiceCollection(); services.AddLogging(logging => { logging.AddConsole(); logging.SetMinimumLevel(LogLevel.Information); }); services.AddMemoryCache(); services.AddTaskCanon(); services.AddHttpClient(); services.AddSingleton(); var serviceProvider = services.BuildServiceProvider(); var entry = serviceProvider.GetRequiredService(); await Task.Factory.StartNew(async () => { try { await entry.ListenFromServiceBusAsync(async message => { var messageObject = JsonConvert.DeserializeObject(message); Console.WriteLine($"================Received message: {messageObject?.Name}================"); await entry.SendToKustoAsync(messageObject); }); } catch (Exception e) { Console.WriteLine(e); throw; } }); for (int i = 0; i < 1000; i++) { await entry.SendToServiceBusAsync(new TestEvent { Name = $"Test {i}" }); } await Task.Delay(int.MaxValue); } } public class TestEvent { // ReSharper disable once UnusedAutoPropertyAccessor.Global public string? Name { get; set; } } public class DemoWorker(ILogger logger, IHttpClientFactory httpClientFactory, CacheService cache) { private readonly HttpClient _httpClient = httpClientFactory.CreateClient(); // Authentication. private readonly string _tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838"; private readonly string _clientId = "a19850a3-cfef-4ced-8fcd-d632fec31bdc"; private readonly string _clientSecret = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; private readonly string _scope = "https://servicebus.azure.net/.default"; // Service Bus private readonly string _serviceBusNamespace = "anduinlearn"; private readonly string _serviceBusTopicName = "mytopic"; private readonly string _serviceBusTopicSubscriptionName = "mysubscription"; // Kusto database. private readonly string _kustoIngestionUri = "https://ingest-learnkustoanduin.eastasia.kusto.windows.net"; private readonly string _kustoDatabase = "MyDatabase"; private readonly string _kustoTable = "MyTable"; /// /// Sends an asynchronous request to the service bus endpoint. /// /// The type of the request object. /// The request object. /// A task representing the asynchronous operation that returns the response string. public async Task SendToServiceBusAsync(T request) { var requestJson = JsonConvert.SerializeObject(request); logger.LogTrace("Sending request to service bus, with input {0}", request?.GetType().FullName); var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_serviceBusTopicName}/messages"; logger.LogTrace("Sending request to service bus, with endpoint {0}", endpoint); var token = await GetToken(); logger.LogTrace("Sending request to service bus, with token {0}", token.Substring(0, 20)); var httpRequest = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(endpoint) }; httpRequest.Headers.Add("Authorization", $"Bearer {token}"); httpRequest.Headers.Add("accept", "application/json"); httpRequest.Content = new StringContent(requestJson, Encoding.UTF8, "application/json"); logger.LogTrace("Sending request to service bus, with content {0}", requestJson); var response = await _httpClient.SendAsync(httpRequest); logger.LogTrace("Received response from service bus, with status code {0}", response.StatusCode); var responseString = await response.Content.ReadAsStringAsync(); logger.LogTrace("Received response from service bus, with content {0} and status code {1}", responseString, response.StatusCode); if (!response.IsSuccessStatusCode) { throw new Exception($"Failed to send new service bus event. {responseString}"); } return responseString; } public async Task ListenFromServiceBusAsync(Func onMessage) { // Create Service Bus client and processor logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _serviceBusTopicName, _serviceBusTopicSubscriptionName); var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret); var serviceBusClient = new ServiceBusClient(_serviceBusNamespace + ".servicebus.windows.net" , credential); var processor = serviceBusClient.CreateProcessor(_serviceBusTopicName, _serviceBusTopicSubscriptionName, new ServiceBusProcessorOptions()); processor.ProcessMessageAsync += async args => { var message = args.Message; var body = Encoding.UTF8.GetString(message.Body.ToArray()); await onMessage(body); await args.CompleteMessageAsync(message); }; processor.ProcessErrorAsync += args => { logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message); return Task.CompletedTask; }; logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _serviceBusTopicName, _serviceBusTopicSubscriptionName); // Start processing await processor.StartProcessingAsync(); await Task.Delay(int.MaxValue); } public async Task SendToKustoAsync(T request) { var dataTable = new List { request }.ToDataTable(); var kustoIngestService = new KustoIngestService(_kustoIngestionUri, _kustoDatabase, _clientId, _clientSecret, _tenantId); await kustoIngestService.IngestDataAsync(dataTable, _kustoTable); } private Task GetToken() { return cache.RunWithCache("token", async () => { var app = ConfidentialClientApplicationBuilder.Create(_clientId) .WithClientSecret(_clientSecret) .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}")) .Build(); var authResult = await app.AcquireTokenForClient([_scope]).ExecuteAsync(); return authResult.AccessToken; }); } } public static class DataTableExtensions { public static DataTable ToDataTable(this IList data) { var properties = TypeDescriptor.GetProperties(typeof(T)); var table = new DataTable(); foreach (PropertyDescriptor prop in properties) { table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType); } foreach (T item in data) { var row = table.NewRow(); foreach (PropertyDescriptor prop in properties) { row[prop.Name] = prop.GetValue(item) ?? DBNull.Value; } table.Rows.Add(row); } return table; } } public class KustoIngestService { private IKustoIngestClient _kustoIngestClient; private string _database; public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId) { var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri) .WithAadApplicationKeyAuthentication(appId, appKey, tenantId); _kustoIngestClient = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilder); _database = database; } public async Task IngestDataAsync(DataTable dataTable, string tableName) { var ingestionProperties = new KustoIngestionProperties(_database, tableName); var dataStream = dataTable.CreateDataReader(); await _kustoIngestClient.IngestFromDataReaderAsync(dataStream, ingestionProperties); } }