anduin zrewidował ten Gist . Przejdź do rewizji
Brak zmian
anduin zrewidował ten Gist . Przejdź do rewizji
1 file changed, 19 insertions
Service.csproj(stworzono plik)
@@ -0,0 +1,19 @@ | |||
1 | + | <Project Sdk="Microsoft.NET.Sdk"> | |
2 | + | <PropertyGroup> | |
3 | + | <OutputType>Exe</OutputType> | |
4 | + | <TargetFramework>net8.0</TargetFramework> | |
5 | + | <AssemblyName>LearnServiceBus</AssemblyName> | |
6 | + | <RootNamespace>LearnServiceBus</RootNamespace> | |
7 | + | <IsTestProject>false</IsTestProject> | |
8 | + | <IsPackable>false</IsPackable> | |
9 | + | <ImplicitUsings>enable</ImplicitUsings> | |
10 | + | <Nullable>enable</Nullable> | |
11 | + | </PropertyGroup> | |
12 | + | <ItemGroup> | |
13 | + | <PackageReference Include="Aiursoft.Canon" Version="8.0.4" /> | |
14 | + | <PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.5" /> | |
15 | + | <PackageReference Include="Microsoft.Azure.Kusto.Ingest" Version="12.2.3" /> | |
16 | + | <PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" /> | |
17 | + | <PackageReference Include="Microsoft.Extensions.Logging.console" Version="8.0.0" /> | |
18 | + | </ItemGroup> | |
19 | + | </Project> |
anduin zrewidował ten Gist . Przejdź do rewizji
1 file changed, 99 insertions, 28 deletions
Program.cs
@@ -1,6 +1,11 @@ | |||
1 | + | using System.ComponentModel; | |
2 | + | using System.Data; | |
1 | 3 | using System.Text; | |
4 | + | using Aiursoft.Canon; | |
2 | 5 | using Azure.Identity; | |
3 | 6 | using Azure.Messaging.ServiceBus; | |
7 | + | using Kusto.Data; | |
8 | + | using Kusto.Ingest; | |
4 | 9 | using Microsoft.Extensions.DependencyInjection; | |
5 | 10 | using Microsoft.Extensions.Logging; | |
6 | 11 | using Microsoft.Identity.Client; | |
@@ -20,20 +25,23 @@ public abstract class Program | |||
20 | 25 | logging.SetMinimumLevel(LogLevel.Information); | |
21 | 26 | }); | |
22 | 27 | ||
28 | + | services.AddMemoryCache(); | |
29 | + | services.AddTaskCanon(); | |
23 | 30 | services.AddHttpClient(); | |
24 | - | services.AddSingleton<ServiceBusEventSender>(); | |
31 | + | services.AddSingleton<DemoWorker>(); | |
25 | 32 | ||
26 | 33 | var serviceProvider = services.BuildServiceProvider(); | |
27 | - | var entry = serviceProvider.GetRequiredService<ServiceBusEventSender>(); | |
34 | + | var entry = serviceProvider.GetRequiredService<DemoWorker>(); | |
28 | 35 | ||
29 | 36 | await Task.Factory.StartNew(async () => | |
30 | 37 | { | |
31 | 38 | try | |
32 | 39 | { | |
33 | - | await entry.Listen(async message => | |
40 | + | await entry.ListenFromServiceBusAsync(async message => | |
34 | 41 | { | |
35 | - | await Task.CompletedTask; | |
36 | - | Console.WriteLine($"==================Received message: {message}=================="); | |
42 | + | var messageObject = JsonConvert.DeserializeObject<TestEvent>(message); | |
43 | + | Console.WriteLine($"================Received message: {messageObject?.Name}================"); | |
44 | + | await entry.SendToKustoAsync(messageObject); | |
37 | 45 | }); | |
38 | 46 | } | |
39 | 47 | catch (Exception e) | |
@@ -45,9 +53,9 @@ public abstract class Program | |||
45 | 53 | ||
46 | 54 | for (int i = 0; i < 1000; i++) | |
47 | 55 | { | |
48 | - | await entry.SendAsync(new TestEvent { Name = $"Test {i}" }); | |
56 | + | await entry.SendToServiceBusAsync(new TestEvent { Name = $"Test {i}" }); | |
49 | 57 | } | |
50 | - | ||
58 | + | ||
51 | 59 | await Task.Delay(int.MaxValue); | |
52 | 60 | } | |
53 | 61 | } | |
@@ -58,17 +66,25 @@ public class TestEvent | |||
58 | 66 | public string? Name { get; set; } | |
59 | 67 | } | |
60 | 68 | ||
61 | - | public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpClientFactory httpClientFactory) | |
69 | + | public class DemoWorker(ILogger<DemoWorker> logger, IHttpClientFactory httpClientFactory, CacheService cache) | |
62 | 70 | { | |
63 | 71 | private readonly HttpClient _httpClient = httpClientFactory.CreateClient(); | |
72 | + | ||
73 | + | // Authentication. | |
64 | 74 | private readonly string _tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838"; | |
65 | 75 | private readonly string _clientId = "a19850a3-cfef-4ced-8fcd-d632fec31bdc"; | |
66 | - | private readonly string _clientSecret = "YOUR_CLIENT_SECRET"; | |
76 | + | private readonly string _clientSecret = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; | |
67 | 77 | private readonly string _scope = "https://servicebus.azure.net/.default"; | |
68 | 78 | ||
79 | + | // Service Bus | |
69 | 80 | private readonly string _serviceBusNamespace = "anduinlearn"; | |
70 | - | private readonly string _topicName = "mytopic"; | |
71 | - | private readonly string _subscriptionName = "mysubscription"; | |
81 | + | private readonly string _serviceBusTopicName = "mytopic"; | |
82 | + | private readonly string _serviceBusTopicSubscriptionName = "mysubscription"; | |
83 | + | ||
84 | + | // Kusto database. | |
85 | + | private readonly string _kustoIngestionUri = "https://ingest-learnkustoanduin.eastasia.kusto.windows.net"; | |
86 | + | private readonly string _kustoDatabase = "MyDatabase"; | |
87 | + | private readonly string _kustoTable = "MyTable"; | |
72 | 88 | ||
73 | 89 | /// <summary> | |
74 | 90 | /// Sends an asynchronous request to the service bus endpoint. | |
@@ -76,12 +92,12 @@ public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpC | |||
76 | 92 | /// <typeparam name="T">The type of the request object.</typeparam> | |
77 | 93 | /// <param name="request">The request object.</param> | |
78 | 94 | /// <returns>A task representing the asynchronous operation that returns the response string.</returns> | |
79 | - | public async Task<string> SendAsync<T>(T request) | |
95 | + | public async Task<string> SendToServiceBusAsync<T>(T request) | |
80 | 96 | { | |
81 | 97 | var requestJson = JsonConvert.SerializeObject(request); | |
82 | 98 | logger.LogTrace("Sending request to service bus, with input {0}", request?.GetType().FullName); | |
83 | 99 | ||
84 | - | var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_topicName}/messages"; | |
100 | + | var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_serviceBusTopicName}/messages"; | |
85 | 101 | logger.LogTrace("Sending request to service bus, with endpoint {0}", endpoint); | |
86 | 102 | ||
87 | 103 | var token = await GetToken(); | |
@@ -112,14 +128,14 @@ public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpC | |||
112 | 128 | return responseString; | |
113 | 129 | } | |
114 | 130 | ||
115 | - | public async Task Listen(Func<string, Task> onMessage) | |
131 | + | public async Task ListenFromServiceBusAsync(Func<string, Task> onMessage) | |
116 | 132 | { | |
117 | 133 | // Create Service Bus client and processor | |
118 | - | logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _topicName, _subscriptionName); | |
134 | + | logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _serviceBusTopicName, _serviceBusTopicSubscriptionName); | |
119 | 135 | var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret); | |
120 | 136 | var serviceBusClient = new ServiceBusClient(_serviceBusNamespace + ".servicebus.windows.net" | |
121 | 137 | , credential); | |
122 | - | var processor = serviceBusClient.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions()); | |
138 | + | var processor = serviceBusClient.CreateProcessor(_serviceBusTopicName, _serviceBusTopicSubscriptionName, new ServiceBusProcessorOptions()); | |
123 | 139 | ||
124 | 140 | processor.ProcessMessageAsync += async args => | |
125 | 141 | { | |
@@ -129,28 +145,83 @@ public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpC | |||
129 | 145 | await args.CompleteMessageAsync(message); | |
130 | 146 | }; | |
131 | 147 | ||
132 | - | processor.ProcessErrorAsync += ErrorHandler; | |
148 | + | processor.ProcessErrorAsync += args => | |
149 | + | { | |
150 | + | logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message); | |
151 | + | return Task.CompletedTask; | |
152 | + | }; | |
133 | 153 | ||
134 | - | logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _topicName, _subscriptionName); | |
154 | + | logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _serviceBusTopicName, _serviceBusTopicSubscriptionName); | |
135 | 155 | // Start processing | |
136 | 156 | await processor.StartProcessingAsync(); | |
137 | 157 | await Task.Delay(int.MaxValue); | |
138 | 158 | } | |
139 | 159 | ||
140 | - | private Task ErrorHandler(ProcessErrorEventArgs args) | |
160 | + | public async Task SendToKustoAsync<T>(T request) | |
141 | 161 | { | |
142 | - | logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message); | |
143 | - | return Task.CompletedTask; | |
162 | + | var dataTable = new List<T> { request }.ToDataTable(); | |
163 | + | var kustoIngestService = new KustoIngestService(_kustoIngestionUri, _kustoDatabase, _clientId, _clientSecret, _tenantId); | |
164 | + | await kustoIngestService.IngestDataAsync(dataTable, _kustoTable); | |
144 | 165 | } | |
145 | 166 | ||
146 | - | private async Task<string> GetToken() | |
167 | + | private Task<string?> GetToken() | |
147 | 168 | { | |
148 | - | var app = ConfidentialClientApplicationBuilder.Create(_clientId) | |
149 | - | .WithClientSecret(_clientSecret) | |
150 | - | .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}")) | |
151 | - | .Build(); | |
169 | + | return cache.RunWithCache("token", async () => | |
170 | + | { | |
171 | + | var app = ConfidentialClientApplicationBuilder.Create(_clientId) | |
172 | + | .WithClientSecret(_clientSecret) | |
173 | + | .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}")) | |
174 | + | .Build(); | |
152 | 175 | ||
153 | - | var authResult = await app.AcquireTokenForClient(new[] { _scope }).ExecuteAsync(); | |
154 | - | return authResult.AccessToken; | |
176 | + | var authResult = await app.AcquireTokenForClient([_scope]).ExecuteAsync(); | |
177 | + | return authResult.AccessToken; | |
178 | + | }); | |
155 | 179 | } | |
156 | 180 | } | |
181 | + | ||
182 | + | public static class DataTableExtensions | |
183 | + | { | |
184 | + | public static DataTable ToDataTable<T>(this IList<T> data) | |
185 | + | { | |
186 | + | var properties = TypeDescriptor.GetProperties(typeof(T)); | |
187 | + | var table = new DataTable(); | |
188 | + | ||
189 | + | foreach (PropertyDescriptor prop in properties) | |
190 | + | { | |
191 | + | table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType); | |
192 | + | } | |
193 | + | ||
194 | + | foreach (T item in data) | |
195 | + | { | |
196 | + | var row = table.NewRow(); | |
197 | + | foreach (PropertyDescriptor prop in properties) | |
198 | + | { | |
199 | + | row[prop.Name] = prop.GetValue(item) ?? DBNull.Value; | |
200 | + | } | |
201 | + | table.Rows.Add(row); | |
202 | + | } | |
203 | + | return table; | |
204 | + | } | |
205 | + | } | |
206 | + | ||
207 | + | public class KustoIngestService | |
208 | + | { | |
209 | + | private IKustoIngestClient _kustoIngestClient; | |
210 | + | private string _database; | |
211 | + | ||
212 | + | public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId) | |
213 | + | { | |
214 | + | var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri) | |
215 | + | .WithAadApplicationKeyAuthentication(appId, appKey, tenantId); | |
216 | + | _kustoIngestClient = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilder); | |
217 | + | _database = database; | |
218 | + | } | |
219 | + | ||
220 | + | public async Task IngestDataAsync(DataTable dataTable, string tableName) | |
221 | + | { | |
222 | + | var ingestionProperties = new KustoIngestionProperties(_database, tableName); | |
223 | + | var dataStream = dataTable.CreateDataReader(); | |
224 | + | ||
225 | + | await _kustoIngestClient.IngestFromDataReaderAsync(dataStream, ingestionProperties); | |
226 | + | } | |
227 | + | } |
anduin zrewidował ten Gist . Przejdź do rewizji
1 file changed, 156 insertions
Program.cs(stworzono plik)
@@ -0,0 +1,156 @@ | |||
1 | + | using System.Text; | |
2 | + | using Azure.Identity; | |
3 | + | using Azure.Messaging.ServiceBus; | |
4 | + | using Microsoft.Extensions.DependencyInjection; | |
5 | + | using Microsoft.Extensions.Logging; | |
6 | + | using Microsoft.Identity.Client; | |
7 | + | using Newtonsoft.Json; | |
8 | + | using LogLevel = Microsoft.Extensions.Logging.LogLevel; | |
9 | + | ||
10 | + | namespace LabServiceBus; | |
11 | + | ||
12 | + | public abstract class Program | |
13 | + | { | |
14 | + | public static async Task Main(string[] args) | |
15 | + | { | |
16 | + | var services = new ServiceCollection(); | |
17 | + | services.AddLogging(logging => | |
18 | + | { | |
19 | + | logging.AddConsole(); | |
20 | + | logging.SetMinimumLevel(LogLevel.Information); | |
21 | + | }); | |
22 | + | ||
23 | + | services.AddHttpClient(); | |
24 | + | services.AddSingleton<ServiceBusEventSender>(); | |
25 | + | ||
26 | + | var serviceProvider = services.BuildServiceProvider(); | |
27 | + | var entry = serviceProvider.GetRequiredService<ServiceBusEventSender>(); | |
28 | + | ||
29 | + | await Task.Factory.StartNew(async () => | |
30 | + | { | |
31 | + | try | |
32 | + | { | |
33 | + | await entry.Listen(async message => | |
34 | + | { | |
35 | + | await Task.CompletedTask; | |
36 | + | Console.WriteLine($"==================Received message: {message}=================="); | |
37 | + | }); | |
38 | + | } | |
39 | + | catch (Exception e) | |
40 | + | { | |
41 | + | Console.WriteLine(e); | |
42 | + | throw; | |
43 | + | } | |
44 | + | }); | |
45 | + | ||
46 | + | for (int i = 0; i < 1000; i++) | |
47 | + | { | |
48 | + | await entry.SendAsync(new TestEvent { Name = $"Test {i}" }); | |
49 | + | } | |
50 | + | ||
51 | + | await Task.Delay(int.MaxValue); | |
52 | + | } | |
53 | + | } | |
54 | + | ||
55 | + | public class TestEvent | |
56 | + | { | |
57 | + | // ReSharper disable once UnusedAutoPropertyAccessor.Global | |
58 | + | public string? Name { get; set; } | |
59 | + | } | |
60 | + | ||
61 | + | public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpClientFactory httpClientFactory) | |
62 | + | { | |
63 | + | private readonly HttpClient _httpClient = httpClientFactory.CreateClient(); | |
64 | + | private readonly string _tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838"; | |
65 | + | private readonly string _clientId = "a19850a3-cfef-4ced-8fcd-d632fec31bdc"; | |
66 | + | private readonly string _clientSecret = "YOUR_CLIENT_SECRET"; | |
67 | + | private readonly string _scope = "https://servicebus.azure.net/.default"; | |
68 | + | ||
69 | + | private readonly string _serviceBusNamespace = "anduinlearn"; | |
70 | + | private readonly string _topicName = "mytopic"; | |
71 | + | private readonly string _subscriptionName = "mysubscription"; | |
72 | + | ||
73 | + | /// <summary> | |
74 | + | /// Sends an asynchronous request to the service bus endpoint. | |
75 | + | /// </summary> | |
76 | + | /// <typeparam name="T">The type of the request object.</typeparam> | |
77 | + | /// <param name="request">The request object.</param> | |
78 | + | /// <returns>A task representing the asynchronous operation that returns the response string.</returns> | |
79 | + | public async Task<string> SendAsync<T>(T request) | |
80 | + | { | |
81 | + | var requestJson = JsonConvert.SerializeObject(request); | |
82 | + | logger.LogTrace("Sending request to service bus, with input {0}", request?.GetType().FullName); | |
83 | + | ||
84 | + | var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_topicName}/messages"; | |
85 | + | logger.LogTrace("Sending request to service bus, with endpoint {0}", endpoint); | |
86 | + | ||
87 | + | var token = await GetToken(); | |
88 | + | logger.LogTrace("Sending request to service bus, with token {0}", token.Substring(0, 20)); | |
89 | + | ||
90 | + | var httpRequest = new HttpRequestMessage | |
91 | + | { | |
92 | + | Method = HttpMethod.Post, | |
93 | + | RequestUri = new Uri(endpoint) | |
94 | + | }; | |
95 | + | httpRequest.Headers.Add("Authorization", $"Bearer {token}"); | |
96 | + | httpRequest.Headers.Add("accept", "application/json"); | |
97 | + | httpRequest.Content = new StringContent(requestJson, Encoding.UTF8, "application/json"); | |
98 | + | ||
99 | + | logger.LogTrace("Sending request to service bus, with content {0}", requestJson); | |
100 | + | var response = await _httpClient.SendAsync(httpRequest); | |
101 | + | ||
102 | + | logger.LogTrace("Received response from service bus, with status code {0}", response.StatusCode); | |
103 | + | var responseString = await response.Content.ReadAsStringAsync(); | |
104 | + | ||
105 | + | logger.LogTrace("Received response from service bus, with content {0} and status code {1}", responseString, | |
106 | + | response.StatusCode); | |
107 | + | if (!response.IsSuccessStatusCode) | |
108 | + | { | |
109 | + | throw new Exception($"Failed to send new service bus event. {responseString}"); | |
110 | + | } | |
111 | + | ||
112 | + | return responseString; | |
113 | + | } | |
114 | + | ||
115 | + | public async Task Listen(Func<string, Task> onMessage) | |
116 | + | { | |
117 | + | // Create Service Bus client and processor | |
118 | + | logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _topicName, _subscriptionName); | |
119 | + | var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret); | |
120 | + | var serviceBusClient = new ServiceBusClient(_serviceBusNamespace + ".servicebus.windows.net" | |
121 | + | , credential); | |
122 | + | var processor = serviceBusClient.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions()); | |
123 | + | ||
124 | + | processor.ProcessMessageAsync += async args => | |
125 | + | { | |
126 | + | var message = args.Message; | |
127 | + | var body = Encoding.UTF8.GetString(message.Body.ToArray()); | |
128 | + | await onMessage(body); | |
129 | + | await args.CompleteMessageAsync(message); | |
130 | + | }; | |
131 | + | ||
132 | + | processor.ProcessErrorAsync += ErrorHandler; | |
133 | + | ||
134 | + | logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _topicName, _subscriptionName); | |
135 | + | // Start processing | |
136 | + | await processor.StartProcessingAsync(); | |
137 | + | await Task.Delay(int.MaxValue); | |
138 | + | } | |
139 | + | ||
140 | + | private Task ErrorHandler(ProcessErrorEventArgs args) | |
141 | + | { | |
142 | + | logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message); | |
143 | + | return Task.CompletedTask; | |
144 | + | } | |
145 | + | ||
146 | + | private async Task<string> GetToken() | |
147 | + | { | |
148 | + | var app = ConfidentialClientApplicationBuilder.Create(_clientId) | |
149 | + | .WithClientSecret(_clientSecret) | |
150 | + | .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}")) | |
151 | + | .Build(); | |
152 | + | ||
153 | + | var authResult = await app.AcquireTokenForClient(new[] { _scope }).ExecuteAsync(); | |
154 | + | return authResult.AccessToken; | |
155 | + | } | |
156 | + | } |