using System.Text; using Azure.Identity; using Azure.Messaging.ServiceBus; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Identity.Client; using Newtonsoft.Json; using LogLevel = Microsoft.Extensions.Logging.LogLevel; namespace LabServiceBus; public abstract class Program { public static async Task Main(string[] args) { var services = new ServiceCollection(); services.AddLogging(logging => { logging.AddConsole(); logging.SetMinimumLevel(LogLevel.Information); }); services.AddHttpClient(); services.AddSingleton(); var serviceProvider = services.BuildServiceProvider(); var entry = serviceProvider.GetRequiredService(); await Task.Factory.StartNew(async () => { try { await entry.Listen(async message => { await Task.CompletedTask; Console.WriteLine($"==================Received message: {message}=================="); }); } catch (Exception e) { Console.WriteLine(e); throw; } }); for (int i = 0; i < 1000; i++) { await entry.SendAsync(new TestEvent { Name = $"Test {i}" }); } await Task.Delay(int.MaxValue); } } public class TestEvent { // ReSharper disable once UnusedAutoPropertyAccessor.Global public string? Name { get; set; } } public class ServiceBusEventSender(ILogger logger, IHttpClientFactory httpClientFactory) { private readonly HttpClient _httpClient = httpClientFactory.CreateClient(); private readonly string _tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838"; private readonly string _clientId = "a19850a3-cfef-4ced-8fcd-d632fec31bdc"; private readonly string _clientSecret = "YOUR_CLIENT_SECRET"; private readonly string _scope = "https://servicebus.azure.net/.default"; private readonly string _serviceBusNamespace = "anduinlearn"; private readonly string _topicName = "mytopic"; private readonly string _subscriptionName = "mysubscription"; /// /// Sends an asynchronous request to the service bus endpoint. /// /// The type of the request object. /// The request object. /// A task representing the asynchronous operation that returns the response string. public async Task SendAsync(T request) { var requestJson = JsonConvert.SerializeObject(request); logger.LogTrace("Sending request to service bus, with input {0}", request?.GetType().FullName); var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_topicName}/messages"; logger.LogTrace("Sending request to service bus, with endpoint {0}", endpoint); var token = await GetToken(); logger.LogTrace("Sending request to service bus, with token {0}", token.Substring(0, 20)); var httpRequest = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(endpoint) }; httpRequest.Headers.Add("Authorization", $"Bearer {token}"); httpRequest.Headers.Add("accept", "application/json"); httpRequest.Content = new StringContent(requestJson, Encoding.UTF8, "application/json"); logger.LogTrace("Sending request to service bus, with content {0}", requestJson); var response = await _httpClient.SendAsync(httpRequest); logger.LogTrace("Received response from service bus, with status code {0}", response.StatusCode); var responseString = await response.Content.ReadAsStringAsync(); logger.LogTrace("Received response from service bus, with content {0} and status code {1}", responseString, response.StatusCode); if (!response.IsSuccessStatusCode) { throw new Exception($"Failed to send new service bus event. {responseString}"); } return responseString; } public async Task Listen(Func onMessage) { // Create Service Bus client and processor logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _topicName, _subscriptionName); var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret); var serviceBusClient = new ServiceBusClient(_serviceBusNamespace + ".servicebus.windows.net" , credential); var processor = serviceBusClient.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions()); processor.ProcessMessageAsync += async args => { var message = args.Message; var body = Encoding.UTF8.GetString(message.Body.ToArray()); await onMessage(body); await args.CompleteMessageAsync(message); }; processor.ProcessErrorAsync += ErrorHandler; logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _topicName, _subscriptionName); // Start processing await processor.StartProcessingAsync(); await Task.Delay(int.MaxValue); } private Task ErrorHandler(ProcessErrorEventArgs args) { logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message); return Task.CompletedTask; } private async Task GetToken() { var app = ConfidentialClientApplicationBuilder.Create(_clientId) .WithClientSecret(_clientSecret) .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}")) .Build(); var authResult = await app.AcquireTokenForClient(new[] { _scope }).ExecuteAsync(); return authResult.AccessToken; } }