Last active 1725699790

Program.cs Raw
1using System.ComponentModel;
2using System.Data;
3using System.Text;
4using Aiursoft.Canon;
5using Azure.Identity;
6using Azure.Messaging.ServiceBus;
7using Kusto.Data;
8using Kusto.Ingest;
9using Microsoft.Extensions.DependencyInjection;
10using Microsoft.Extensions.Logging;
11using Microsoft.Identity.Client;
12using Newtonsoft.Json;
13using LogLevel = Microsoft.Extensions.Logging.LogLevel;
14
15namespace LabServiceBus;
16
17public abstract class Program
18{
19 public static async Task Main(string[] args)
20 {
21 var services = new ServiceCollection();
22 services.AddLogging(logging =>
23 {
24 logging.AddConsole();
25 logging.SetMinimumLevel(LogLevel.Information);
26 });
27
28 services.AddMemoryCache();
29 services.AddTaskCanon();
30 services.AddHttpClient();
31 services.AddSingleton<DemoWorker>();
32
33 var serviceProvider = services.BuildServiceProvider();
34 var entry = serviceProvider.GetRequiredService<DemoWorker>();
35
36 await Task.Factory.StartNew(async () =>
37 {
38 try
39 {
40 await entry.ListenFromServiceBusAsync(async message =>
41 {
42 var messageObject = JsonConvert.DeserializeObject<TestEvent>(message);
43 Console.WriteLine($"================Received message: {messageObject?.Name}================");
44 await entry.SendToKustoAsync(messageObject);
45 });
46 }
47 catch (Exception e)
48 {
49 Console.WriteLine(e);
50 throw;
51 }
52 });
53
54 for (int i = 0; i < 1000; i++)
55 {
56 await entry.SendToServiceBusAsync(new TestEvent { Name = $"Test {i}" });
57 }
58
59 await Task.Delay(int.MaxValue);
60 }
61}
62
63public class TestEvent
64{
65 // ReSharper disable once UnusedAutoPropertyAccessor.Global
66 public string? Name { get; set; }
67}
68
69public class DemoWorker(ILogger<DemoWorker> logger, IHttpClientFactory httpClientFactory, CacheService cache)
70{
71 private readonly HttpClient _httpClient = httpClientFactory.CreateClient();
72
73 // Authentication.
74 private readonly string _tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838";
75 private readonly string _clientId = "a19850a3-cfef-4ced-8fcd-d632fec31bdc";
76 private readonly string _clientSecret = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
77 private readonly string _scope = "https://servicebus.azure.net/.default";
78
79 // Service Bus
80 private readonly string _serviceBusNamespace = "anduinlearn";
81 private readonly string _serviceBusTopicName = "mytopic";
82 private readonly string _serviceBusTopicSubscriptionName = "mysubscription";
83
84 // Kusto database.
85 private readonly string _kustoIngestionUri = "https://ingest-learnkustoanduin.eastasia.kusto.windows.net";
86 private readonly string _kustoDatabase = "MyDatabase";
87 private readonly string _kustoTable = "MyTable";
88
89 /// <summary>
90 /// Sends an asynchronous request to the service bus endpoint.
91 /// </summary>
92 /// <typeparam name="T">The type of the request object.</typeparam>
93 /// <param name="request">The request object.</param>
94 /// <returns>A task representing the asynchronous operation that returns the response string.</returns>
95 public async Task<string> SendToServiceBusAsync<T>(T request)
96 {
97 var requestJson = JsonConvert.SerializeObject(request);
98 logger.LogTrace("Sending request to service bus, with input {0}", request?.GetType().FullName);
99
100 var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_serviceBusTopicName}/messages";
101 logger.LogTrace("Sending request to service bus, with endpoint {0}", endpoint);
102
103 var token = await GetToken();
104 logger.LogTrace("Sending request to service bus, with token {0}", token.Substring(0, 20));
105
106 var httpRequest = new HttpRequestMessage
107 {
108 Method = HttpMethod.Post,
109 RequestUri = new Uri(endpoint)
110 };
111 httpRequest.Headers.Add("Authorization", $"Bearer {token}");
112 httpRequest.Headers.Add("accept", "application/json");
113 httpRequest.Content = new StringContent(requestJson, Encoding.UTF8, "application/json");
114
115 logger.LogTrace("Sending request to service bus, with content {0}", requestJson);
116 var response = await _httpClient.SendAsync(httpRequest);
117
118 logger.LogTrace("Received response from service bus, with status code {0}", response.StatusCode);
119 var responseString = await response.Content.ReadAsStringAsync();
120
121 logger.LogTrace("Received response from service bus, with content {0} and status code {1}", responseString,
122 response.StatusCode);
123 if (!response.IsSuccessStatusCode)
124 {
125 throw new Exception($"Failed to send new service bus event. {responseString}");
126 }
127
128 return responseString;
129 }
130
131 public async Task ListenFromServiceBusAsync(Func<string, Task> onMessage)
132 {
133 // Create Service Bus client and processor
134 logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _serviceBusTopicName, _serviceBusTopicSubscriptionName);
135 var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret);
136 var serviceBusClient = new ServiceBusClient(_serviceBusNamespace + ".servicebus.windows.net"
137 , credential);
138 var processor = serviceBusClient.CreateProcessor(_serviceBusTopicName, _serviceBusTopicSubscriptionName, new ServiceBusProcessorOptions());
139
140 processor.ProcessMessageAsync += async args =>
141 {
142 var message = args.Message;
143 var body = Encoding.UTF8.GetString(message.Body.ToArray());
144 await onMessage(body);
145 await args.CompleteMessageAsync(message);
146 };
147
148 processor.ProcessErrorAsync += args =>
149 {
150 logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message);
151 return Task.CompletedTask;
152 };
153
154 logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _serviceBusTopicName, _serviceBusTopicSubscriptionName);
155 // Start processing
156 await processor.StartProcessingAsync();
157 await Task.Delay(int.MaxValue);
158 }
159
160 public async Task SendToKustoAsync<T>(T request)
161 {
162 var dataTable = new List<T> { request }.ToDataTable();
163 var kustoIngestService = new KustoIngestService(_kustoIngestionUri, _kustoDatabase, _clientId, _clientSecret, _tenantId);
164 await kustoIngestService.IngestDataAsync(dataTable, _kustoTable);
165 }
166
167 private Task<string?> GetToken()
168 {
169 return cache.RunWithCache("token", async () =>
170 {
171 var app = ConfidentialClientApplicationBuilder.Create(_clientId)
172 .WithClientSecret(_clientSecret)
173 .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}"))
174 .Build();
175
176 var authResult = await app.AcquireTokenForClient([_scope]).ExecuteAsync();
177 return authResult.AccessToken;
178 });
179 }
180}
181
182public static class DataTableExtensions
183{
184 public static DataTable ToDataTable<T>(this IList<T> data)
185 {
186 var properties = TypeDescriptor.GetProperties(typeof(T));
187 var table = new DataTable();
188
189 foreach (PropertyDescriptor prop in properties)
190 {
191 table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
192 }
193
194 foreach (T item in data)
195 {
196 var row = table.NewRow();
197 foreach (PropertyDescriptor prop in properties)
198 {
199 row[prop.Name] = prop.GetValue(item) ?? DBNull.Value;
200 }
201 table.Rows.Add(row);
202 }
203 return table;
204 }
205}
206
207public class KustoIngestService
208{
209 private IKustoIngestClient _kustoIngestClient;
210 private string _database;
211
212 public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId)
213 {
214 var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
215 .WithAadApplicationKeyAuthentication(appId, appKey, tenantId);
216 _kustoIngestClient = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilder);
217 _database = database;
218 }
219
220 public async Task IngestDataAsync(DataTable dataTable, string tableName)
221 {
222 var ingestionProperties = new KustoIngestionProperties(_database, tableName);
223 var dataStream = dataTable.CreateDataReader();
224
225 await _kustoIngestClient.IngestFromDataReaderAsync(dataStream, ingestionProperties);
226 }
227}
Service.csproj Raw
1<Project Sdk="Microsoft.NET.Sdk">
2 <PropertyGroup>
3 <OutputType>Exe</OutputType>
4 <TargetFramework>net8.0</TargetFramework>
5 <AssemblyName>LearnServiceBus</AssemblyName>
6 <RootNamespace>LearnServiceBus</RootNamespace>
7 <IsTestProject>false</IsTestProject>
8 <IsPackable>false</IsPackable>
9 <ImplicitUsings>enable</ImplicitUsings>
10 <Nullable>enable</Nullable>
11 </PropertyGroup>
12 <ItemGroup>
13 <PackageReference Include="Aiursoft.Canon" Version="8.0.4" />
14 <PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
15 <PackageReference Include="Microsoft.Azure.Kusto.Ingest" Version="12.2.3" />
16 <PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
17 <PackageReference Include="Microsoft.Extensions.Logging.console" Version="8.0.0" />
18 </ItemGroup>
19</Project>