Skip to content

Commit

Permalink
add demo
Browse files Browse the repository at this point in the history
  • Loading branch information
NikiforovAll committed Aug 22, 2024
1 parent 88e93e6 commit 1ca6f47
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 23 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

This repository demonstrates how to use `IAsyncEnumerable` and `System.Linq.Async` to build pipelines in C#.


> [!IMPORTANT]
> This repository doesn't cover all the possible concerns such as error handling, cancellation, backpressure, performance, etc. It's just a simple demonstration of how to build pipelines with `IAsyncEnumerable` and `System.Linq.Async`.
```bash
dotnet example --list
```
Expand All @@ -19,3 +23,18 @@ dotnet example --list
│ TextSummarizationAndAggregationPipeline │ Demonstrates how to build custom async-enumerable operators. │
╰─────────────────────────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────╯
```
## Demo: CalculateWordCountPipeline

<video src="https://github.com/user-attachments/assets/84c1e8a8-996d-4960-9b39-20e6bd1101a9" controls="controls"></video>

## Demo: CalculateWordCountBatchPipeline

<video src="https://github.com/user-attachments/assets/56db32bd-a7e9-41ec-8706-eaf876750bb6" controls="controls"></video>

## Demo: CalculateWordCountFileWatcherPipeline

<video src="https://github.com/user-attachments/assets/96cc653d-8b42-4779-b2f2-fce804f0160b" controls="controls"></video>

## Demo: TextSummarizationAndAggregationPipeline

<video src="https://github.com/user-attachments/assets/42c6eb97-7a11-4b89-857e-1ffb8e70073c" controls="controls"></video>
Binary file added assets/demo1.mp4
Binary file not shown.
Binary file added assets/demo2.mp4
Binary file not shown.
Binary file added assets/demo3.mp4
Binary file not shown.
Binary file added assets/demo4.mp4
Binary file not shown.
2 changes: 1 addition & 1 deletion examples/CalculateWordCountFileWatcherPipeline/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
var fileWatcher = CreateFileObservable(path);

var pipeline = fileWatcher
.TakeUntil(DateTimeOffset.Now.AddMinutes(1))
.TakeUntil(DateTimeOffset.Now.AddSeconds(15))
.ToAsyncEnumerable()
.SelectAwait(ReadFile)
.Where(IsValidFileForProcessing)
Expand Down
55 changes: 35 additions & 20 deletions examples/TextSummarizationAndAggregationPipeline/Program.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Connectors.OpenAI;
using Shared;
using Spectre.Console;
using static Shared.Steps;

var (kernel, summarizationFunction) = Init();
var kernel = Init();
var path = Path.Combine(Directory.GetCurrentDirectory(), "..", "..", "Data");

var pipeline = Directory
Expand All @@ -15,14 +16,18 @@
.SelectAwait(ReadFile)
.Where(IsValidFileForProcessing)
.SelectAwait(Summarize)
.WriteResultToFile(path: Path.Combine(path, "summaries.txt"))
.ForEachAsync(x => Console.WriteLine($"Processed {x.Name}"));
.WriteResultToFile(path: Path.Combine(Path.GetTempPath(), "summaries.txt"))
.ForEachAsync(x => AnsiConsole.MarkupLine($"Processed [green]{x.Name}[/]"));

await pipeline;

static (Kernel kernel, KernelFunction summarizationFunction) Init()
static Kernel Init()
{
var builder = Host.CreateApplicationBuilder();
var builder = Host.CreateApplicationBuilder(
new HostApplicationBuilderSettings { EnvironmentName = Environments.Development }
);
builder.Services.AddLogging(builder => builder.SetMinimumLevel(LogLevel.None));

var endpoint = builder.Configuration["AZURE_OPENAI_ENDPOINT"]!;
var deployment = builder.Configuration["AZURE_OPENAI_GPT_NAME"]!;
var key = builder.Configuration["AZURE_OPENAI_KEY"]!;
Expand All @@ -33,21 +38,20 @@
var services = builder.Build().Services;

var kernel = services.GetRequiredService<Kernel>();
var prompt = """
Please summarize the the following text in 20 words or less:
${input}
""";
var summarizationFunction = kernel.CreateFunctionFromPrompt(prompt);

return (kernel, summarizationFunction);
return kernel;
}

async ValueTask<SummarizationPayload> Summarize(FilePayload file)
{
var result = await summarizationFunction.InvokeAsync(
kernel,
new KernelArguments(new OpenAIPromptExecutionSettings() { MaxTokens = 400 }) { ["input"] = file.Content }
);
var prompt = """
{{$input}}
Please summarize the content above in 20 words or less:

The output format should be: [title]: [summary]
""";

var result = await kernel.InvokePromptAsync(prompt, new KernelArguments() { ["input"] = file.Content });

return new(file.Name, result.ToString());
}
Expand All @@ -61,7 +65,7 @@ string path
{
const int batchSize = 10;

using var streamWriter = new StreamWriter(path);
using var streamWriter = new StreamWriter(path, append: true);

await foreach (var batch in values.Buffer(batchSize))
{
Expand All @@ -72,17 +76,28 @@ string path
yield return value;
}

streamWriter.Flush();
await streamWriter.FlushAsync();
}

AnsiConsole.MarkupLine($"Results written to [green]{path}[/]");
}

public static async IAsyncEnumerable<T> ReportProgress<T>(this IAsyncEnumerable<T> values)
public static async IAsyncEnumerable<string> ReportProgress(this IAsyncEnumerable<string> values)
{
var totalCount = await values.CountAsync();

await foreach (var value in values)
await foreach (var (value, index) in values.Select((value, index) => (value, index)))
{
yield return value;

AnsiConsole
.Progress()
.Start(ctx =>
{
var task = ctx.AddTask($"Processing - {Path.GetFileName(value)}", true, totalCount);
task.Increment(index + 1);
task.StopTask();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<ExampleDescription>Demonstrates how to build custom async-enumerable operators.</ExampleDescription>
<ExampleDescription>Demonstrates how to build custom async-enumerable operators</ExampleDescription>
<ExampleOrder>4</ExampleOrder>
<UserSecretsId>3f392587-2601-400b-84a6-b7340d1436ef</UserSecretsId>
<UserSecretsId>e789b935-ac65-4262-a2e3-dcca0f6e9cfe</UserSecretsId>
</PropertyGroup>

<ItemGroup>
Expand All @@ -26,6 +26,7 @@
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.SemanticKernel" Version="1.17.2" />
<PackageReference Include="Spectre.Console" Version="0.49.1" />
</ItemGroup>

</Project>

0 comments on commit 1ca6f47

Please sign in to comment.