Ostatnio aktywny 1725699790

anduin's Avatar anduin zrewidował ten Gist 1725699790. Przejdź do rewizji

Brak zmian

anduin's Avatar anduin zrewidował ten Gist 1719243981. Przejdź do rewizji

1 file changed, 19 insertions

Service.csproj(stworzono plik)

@@ -0,0 +1,19 @@
1 + <Project Sdk="Microsoft.NET.Sdk">
2 + <PropertyGroup>
3 + <OutputType>Exe</OutputType>
4 + <TargetFramework>net8.0</TargetFramework>
5 + <AssemblyName>LearnServiceBus</AssemblyName>
6 + <RootNamespace>LearnServiceBus</RootNamespace>
7 + <IsTestProject>false</IsTestProject>
8 + <IsPackable>false</IsPackable>
9 + <ImplicitUsings>enable</ImplicitUsings>
10 + <Nullable>enable</Nullable>
11 + </PropertyGroup>
12 + <ItemGroup>
13 + <PackageReference Include="Aiursoft.Canon" Version="8.0.4" />
14 + <PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
15 + <PackageReference Include="Microsoft.Azure.Kusto.Ingest" Version="12.2.3" />
16 + <PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
17 + <PackageReference Include="Microsoft.Extensions.Logging.console" Version="8.0.0" />
18 + </ItemGroup>
19 + </Project>

anduin's Avatar anduin zrewidował ten Gist 1719228667. Przejdź do rewizji

1 file changed, 99 insertions, 28 deletions

Program.cs

@@ -1,6 +1,11 @@
1 + using System.ComponentModel;
2 + using System.Data;
1 3 using System.Text;
4 + using Aiursoft.Canon;
2 5 using Azure.Identity;
3 6 using Azure.Messaging.ServiceBus;
7 + using Kusto.Data;
8 + using Kusto.Ingest;
4 9 using Microsoft.Extensions.DependencyInjection;
5 10 using Microsoft.Extensions.Logging;
6 11 using Microsoft.Identity.Client;
@@ -20,20 +25,23 @@ public abstract class Program
20 25 logging.SetMinimumLevel(LogLevel.Information);
21 26 });
22 27
28 + services.AddMemoryCache();
29 + services.AddTaskCanon();
23 30 services.AddHttpClient();
24 - services.AddSingleton<ServiceBusEventSender>();
31 + services.AddSingleton<DemoWorker>();
25 32
26 33 var serviceProvider = services.BuildServiceProvider();
27 - var entry = serviceProvider.GetRequiredService<ServiceBusEventSender>();
34 + var entry = serviceProvider.GetRequiredService<DemoWorker>();
28 35
29 36 await Task.Factory.StartNew(async () =>
30 37 {
31 38 try
32 39 {
33 - await entry.Listen(async message =>
40 + await entry.ListenFromServiceBusAsync(async message =>
34 41 {
35 - await Task.CompletedTask;
36 - Console.WriteLine($"==================Received message: {message}==================");
42 + var messageObject = JsonConvert.DeserializeObject<TestEvent>(message);
43 + Console.WriteLine($"================Received message: {messageObject?.Name}================");
44 + await entry.SendToKustoAsync(messageObject);
37 45 });
38 46 }
39 47 catch (Exception e)
@@ -45,9 +53,9 @@ public abstract class Program
45 53
46 54 for (int i = 0; i < 1000; i++)
47 55 {
48 - await entry.SendAsync(new TestEvent { Name = $"Test {i}" });
56 + await entry.SendToServiceBusAsync(new TestEvent { Name = $"Test {i}" });
49 57 }
50 -
58 +
51 59 await Task.Delay(int.MaxValue);
52 60 }
53 61 }
@@ -58,17 +66,25 @@ public class TestEvent
58 66 public string? Name { get; set; }
59 67 }
60 68
61 - public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpClientFactory httpClientFactory)
69 + public class DemoWorker(ILogger<DemoWorker> logger, IHttpClientFactory httpClientFactory, CacheService cache)
62 70 {
63 71 private readonly HttpClient _httpClient = httpClientFactory.CreateClient();
72 +
73 + // Authentication.
64 74 private readonly string _tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838";
65 75 private readonly string _clientId = "a19850a3-cfef-4ced-8fcd-d632fec31bdc";
66 - private readonly string _clientSecret = "YOUR_CLIENT_SECRET";
76 + private readonly string _clientSecret = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
67 77 private readonly string _scope = "https://servicebus.azure.net/.default";
68 78
79 + // Service Bus
69 80 private readonly string _serviceBusNamespace = "anduinlearn";
70 - private readonly string _topicName = "mytopic";
71 - private readonly string _subscriptionName = "mysubscription";
81 + private readonly string _serviceBusTopicName = "mytopic";
82 + private readonly string _serviceBusTopicSubscriptionName = "mysubscription";
83 +
84 + // Kusto database.
85 + private readonly string _kustoIngestionUri = "https://ingest-learnkustoanduin.eastasia.kusto.windows.net";
86 + private readonly string _kustoDatabase = "MyDatabase";
87 + private readonly string _kustoTable = "MyTable";
72 88
73 89 /// <summary>
74 90 /// Sends an asynchronous request to the service bus endpoint.
@@ -76,12 +92,12 @@ public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpC
76 92 /// <typeparam name="T">The type of the request object.</typeparam>
77 93 /// <param name="request">The request object.</param>
78 94 /// <returns>A task representing the asynchronous operation that returns the response string.</returns>
79 - public async Task<string> SendAsync<T>(T request)
95 + public async Task<string> SendToServiceBusAsync<T>(T request)
80 96 {
81 97 var requestJson = JsonConvert.SerializeObject(request);
82 98 logger.LogTrace("Sending request to service bus, with input {0}", request?.GetType().FullName);
83 99
84 - var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_topicName}/messages";
100 + var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_serviceBusTopicName}/messages";
85 101 logger.LogTrace("Sending request to service bus, with endpoint {0}", endpoint);
86 102
87 103 var token = await GetToken();
@@ -112,14 +128,14 @@ public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpC
112 128 return responseString;
113 129 }
114 130
115 - public async Task Listen(Func<string, Task> onMessage)
131 + public async Task ListenFromServiceBusAsync(Func<string, Task> onMessage)
116 132 {
117 133 // Create Service Bus client and processor
118 - logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _topicName, _subscriptionName);
134 + logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _serviceBusTopicName, _serviceBusTopicSubscriptionName);
119 135 var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret);
120 136 var serviceBusClient = new ServiceBusClient(_serviceBusNamespace + ".servicebus.windows.net"
121 137 , credential);
122 - var processor = serviceBusClient.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions());
138 + var processor = serviceBusClient.CreateProcessor(_serviceBusTopicName, _serviceBusTopicSubscriptionName, new ServiceBusProcessorOptions());
123 139
124 140 processor.ProcessMessageAsync += async args =>
125 141 {
@@ -129,28 +145,83 @@ public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpC
129 145 await args.CompleteMessageAsync(message);
130 146 };
131 147
132 - processor.ProcessErrorAsync += ErrorHandler;
148 + processor.ProcessErrorAsync += args =>
149 + {
150 + logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message);
151 + return Task.CompletedTask;
152 + };
133 153
134 - logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _topicName, _subscriptionName);
154 + logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _serviceBusTopicName, _serviceBusTopicSubscriptionName);
135 155 // Start processing
136 156 await processor.StartProcessingAsync();
137 157 await Task.Delay(int.MaxValue);
138 158 }
139 159
140 - private Task ErrorHandler(ProcessErrorEventArgs args)
160 + public async Task SendToKustoAsync<T>(T request)
141 161 {
142 - logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message);
143 - return Task.CompletedTask;
162 + var dataTable = new List<T> { request }.ToDataTable();
163 + var kustoIngestService = new KustoIngestService(_kustoIngestionUri, _kustoDatabase, _clientId, _clientSecret, _tenantId);
164 + await kustoIngestService.IngestDataAsync(dataTable, _kustoTable);
144 165 }
145 166
146 - private async Task<string> GetToken()
167 + private Task<string?> GetToken()
147 168 {
148 - var app = ConfidentialClientApplicationBuilder.Create(_clientId)
149 - .WithClientSecret(_clientSecret)
150 - .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}"))
151 - .Build();
169 + return cache.RunWithCache("token", async () =>
170 + {
171 + var app = ConfidentialClientApplicationBuilder.Create(_clientId)
172 + .WithClientSecret(_clientSecret)
173 + .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}"))
174 + .Build();
152 175
153 - var authResult = await app.AcquireTokenForClient(new[] { _scope }).ExecuteAsync();
154 - return authResult.AccessToken;
176 + var authResult = await app.AcquireTokenForClient([_scope]).ExecuteAsync();
177 + return authResult.AccessToken;
178 + });
155 179 }
156 180 }
181 +
182 + public static class DataTableExtensions
183 + {
184 + public static DataTable ToDataTable<T>(this IList<T> data)
185 + {
186 + var properties = TypeDescriptor.GetProperties(typeof(T));
187 + var table = new DataTable();
188 +
189 + foreach (PropertyDescriptor prop in properties)
190 + {
191 + table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
192 + }
193 +
194 + foreach (T item in data)
195 + {
196 + var row = table.NewRow();
197 + foreach (PropertyDescriptor prop in properties)
198 + {
199 + row[prop.Name] = prop.GetValue(item) ?? DBNull.Value;
200 + }
201 + table.Rows.Add(row);
202 + }
203 + return table;
204 + }
205 + }
206 +
207 + public class KustoIngestService
208 + {
209 + private IKustoIngestClient _kustoIngestClient;
210 + private string _database;
211 +
212 + public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId)
213 + {
214 + var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
215 + .WithAadApplicationKeyAuthentication(appId, appKey, tenantId);
216 + _kustoIngestClient = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilder);
217 + _database = database;
218 + }
219 +
220 + public async Task IngestDataAsync(DataTable dataTable, string tableName)
221 + {
222 + var ingestionProperties = new KustoIngestionProperties(_database, tableName);
223 + var dataStream = dataTable.CreateDataReader();
224 +
225 + await _kustoIngestClient.IngestFromDataReaderAsync(dataStream, ingestionProperties);
226 + }
227 + }

anduin's Avatar anduin zrewidował ten Gist 1718796106. Przejdź do rewizji

1 file changed, 156 insertions

Program.cs(stworzono plik)

@@ -0,0 +1,156 @@
1 + using System.Text;
2 + using Azure.Identity;
3 + using Azure.Messaging.ServiceBus;
4 + using Microsoft.Extensions.DependencyInjection;
5 + using Microsoft.Extensions.Logging;
6 + using Microsoft.Identity.Client;
7 + using Newtonsoft.Json;
8 + using LogLevel = Microsoft.Extensions.Logging.LogLevel;
9 +
10 + namespace LabServiceBus;
11 +
12 + public abstract class Program
13 + {
14 + public static async Task Main(string[] args)
15 + {
16 + var services = new ServiceCollection();
17 + services.AddLogging(logging =>
18 + {
19 + logging.AddConsole();
20 + logging.SetMinimumLevel(LogLevel.Information);
21 + });
22 +
23 + services.AddHttpClient();
24 + services.AddSingleton<ServiceBusEventSender>();
25 +
26 + var serviceProvider = services.BuildServiceProvider();
27 + var entry = serviceProvider.GetRequiredService<ServiceBusEventSender>();
28 +
29 + await Task.Factory.StartNew(async () =>
30 + {
31 + try
32 + {
33 + await entry.Listen(async message =>
34 + {
35 + await Task.CompletedTask;
36 + Console.WriteLine($"==================Received message: {message}==================");
37 + });
38 + }
39 + catch (Exception e)
40 + {
41 + Console.WriteLine(e);
42 + throw;
43 + }
44 + });
45 +
46 + for (int i = 0; i < 1000; i++)
47 + {
48 + await entry.SendAsync(new TestEvent { Name = $"Test {i}" });
49 + }
50 +
51 + await Task.Delay(int.MaxValue);
52 + }
53 + }
54 +
55 + public class TestEvent
56 + {
57 + // ReSharper disable once UnusedAutoPropertyAccessor.Global
58 + public string? Name { get; set; }
59 + }
60 +
61 + public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpClientFactory httpClientFactory)
62 + {
63 + private readonly HttpClient _httpClient = httpClientFactory.CreateClient();
64 + private readonly string _tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838";
65 + private readonly string _clientId = "a19850a3-cfef-4ced-8fcd-d632fec31bdc";
66 + private readonly string _clientSecret = "YOUR_CLIENT_SECRET";
67 + private readonly string _scope = "https://servicebus.azure.net/.default";
68 +
69 + private readonly string _serviceBusNamespace = "anduinlearn";
70 + private readonly string _topicName = "mytopic";
71 + private readonly string _subscriptionName = "mysubscription";
72 +
73 + /// <summary>
74 + /// Sends an asynchronous request to the service bus endpoint.
75 + /// </summary>
76 + /// <typeparam name="T">The type of the request object.</typeparam>
77 + /// <param name="request">The request object.</param>
78 + /// <returns>A task representing the asynchronous operation that returns the response string.</returns>
79 + public async Task<string> SendAsync<T>(T request)
80 + {
81 + var requestJson = JsonConvert.SerializeObject(request);
82 + logger.LogTrace("Sending request to service bus, with input {0}", request?.GetType().FullName);
83 +
84 + var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_topicName}/messages";
85 + logger.LogTrace("Sending request to service bus, with endpoint {0}", endpoint);
86 +
87 + var token = await GetToken();
88 + logger.LogTrace("Sending request to service bus, with token {0}", token.Substring(0, 20));
89 +
90 + var httpRequest = new HttpRequestMessage
91 + {
92 + Method = HttpMethod.Post,
93 + RequestUri = new Uri(endpoint)
94 + };
95 + httpRequest.Headers.Add("Authorization", $"Bearer {token}");
96 + httpRequest.Headers.Add("accept", "application/json");
97 + httpRequest.Content = new StringContent(requestJson, Encoding.UTF8, "application/json");
98 +
99 + logger.LogTrace("Sending request to service bus, with content {0}", requestJson);
100 + var response = await _httpClient.SendAsync(httpRequest);
101 +
102 + logger.LogTrace("Received response from service bus, with status code {0}", response.StatusCode);
103 + var responseString = await response.Content.ReadAsStringAsync();
104 +
105 + logger.LogTrace("Received response from service bus, with content {0} and status code {1}", responseString,
106 + response.StatusCode);
107 + if (!response.IsSuccessStatusCode)
108 + {
109 + throw new Exception($"Failed to send new service bus event. {responseString}");
110 + }
111 +
112 + return responseString;
113 + }
114 +
115 + public async Task Listen(Func<string, Task> onMessage)
116 + {
117 + // Create Service Bus client and processor
118 + logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _topicName, _subscriptionName);
119 + var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret);
120 + var serviceBusClient = new ServiceBusClient(_serviceBusNamespace + ".servicebus.windows.net"
121 + , credential);
122 + var processor = serviceBusClient.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions());
123 +
124 + processor.ProcessMessageAsync += async args =>
125 + {
126 + var message = args.Message;
127 + var body = Encoding.UTF8.GetString(message.Body.ToArray());
128 + await onMessage(body);
129 + await args.CompleteMessageAsync(message);
130 + };
131 +
132 + processor.ProcessErrorAsync += ErrorHandler;
133 +
134 + logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _topicName, _subscriptionName);
135 + // Start processing
136 + await processor.StartProcessingAsync();
137 + await Task.Delay(int.MaxValue);
138 + }
139 +
140 + private Task ErrorHandler(ProcessErrorEventArgs args)
141 + {
142 + logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message);
143 + return Task.CompletedTask;
144 + }
145 +
146 + private async Task<string> GetToken()
147 + {
148 + var app = ConfidentialClientApplicationBuilder.Create(_clientId)
149 + .WithClientSecret(_clientSecret)
150 + .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}"))
151 + .Build();
152 +
153 + var authResult = await app.AcquireTokenForClient(new[] { _scope }).ExecuteAsync();
154 + return authResult.AccessToken;
155 + }
156 + }
Nowsze Starsze