Round #312
08 May 2026, 13:23· csharp· openai/gpt-oss-120b:free· r13-clean-hints
- Complexity
- band 0
- ZP / UO / OBS
- 3 / 0 / 4
- Journey weight
- 442
- Lightshift ΔL
- +0.00
Prompt
Implement async producer-consumer pattern using Channel<T>
Zero-point index (what the engine surfaced)
ZPI-ZPI-0004 | 2026-05-08T13:23:05Z
Implement async producer-consumer pattern using Channel<T>
ZERO POINT — 3 entries (structural invariants)
● hsl(303.02, 100%, 26%) Channel<T> [contract]
● hsl(160.91, 100%, 26%) ChannelReader<T> [contract]
● hsl(195.35, 100%, 26%) ChannelWriter<T> [contract]
UNOBSERVED — 0 entries (implied truths)
OBSERVED — 4 entries (explicit in request)
◉ hsl(27.69, 100%, 50%) async [feature]
◉ hsl(77.95, 100%, 30%) async [feature]
◉ hsl(85.13, 100%, 50%) using [feature]
◉ hsl(234.87, 100%, 50%) using [feature]
COMPLEXITY │ 7 entries │ spread: 207° │ band: LOW
Baseline response (no ZPI)
Below is a **complete, self‑contained example** that shows how to build an **asynchronous producer‑consumer pipeline** with the modern `System.Threading.Channels.Channel<T>` type.
The code demonstrates:
* Creating a bounded or unbounded channel.
* One (or many) producer tasks that write to the channel asynchronously.
* One (or many) consumer tasks that read from the channel asynchronously.
* Graceful shutdown using `Complete` and `CancellationToken`.
* Optional back‑pressure handling, error propagation and logging.
---
## 1. Project setup
```bash
dotnet new console -n ChannelDemo
cd ChannelDemo
dotnet add package Microsoft.Extensions.Logging.Console
```
The example targets **.NET 8** (or any .NET 6+ runtime) and uses the built‑in `ILogger` for diagnostics.
---
## 2. Full source (`Program.cs`)
```csharp
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Console;
namespace ChannelDemo;
public static class Program
{
// -----------------------------------------------------------------
// Entry point
// -----------------------------------------------------------------
public static async Task Main(string[] args)
{
// -------------------------------------------------------------
// 1️⃣ Configure logging (optional but very handy)
// -------------------------------------------------------------
using var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddSimpleConsole(options =>
{
options.SingleLine = true;
options.TimestampFormat = "HH:mm:ss ";
})
.SetMinimumLevel(LogLevel.Information);
});
ILogger logger = loggerFactory.CreateLogger("ChannelDemo");
// -------------------------------------------------------------
// 2️⃣ Create a channel
// -------------------------------------------------------------
// Choose bounded (back‑pressure) or unbounded.
// Bounded example – capacity 100, drops oldest when full:
var options = new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // block producers when full
SingleWriter = false, // we will have many producers
SingleReader = false // we will have many consumers
};
Channel<int> channel = Channel.CreateBounded<int>(options);
// For an unbounded channel you could use:
// Channel<int> channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
// {
// SingleWriter = false,
// SingleReader = false
// });
// -------------------------------------------------------------
// 3️⃣ Create a cancellation source for graceful shutdown
// -------------------------------------------------------------
using var cts = new CancellationTokenSource();
// Cancel on Ctrl+C (or SIGTERM on Linux)
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating immediately
logger.LogInformation("Cancellation requested (Ctrl+C) …");
cts.Cancel();
};
// -------------------------------------------------------------
// 4️⃣ Start producers
// -------------------------------------------------------------
const int producerCount = 3;
var producerTasks = new List<Task>();
for (int i = 0; i < producerCount; i++)
{
int id = i + 1;
producerTasks.Add(Task.Run(() => ProducerAsync(id, channel.Writer, logger, cts.Token)));
}
// -------------------------------------------------------------
// 5️⃣ Start consumers
// -------------------------------------------------------------
const int consumerCount = 2;
var consumerTasks = new List<Task>();
for (int i = 0; i < consumerCount; i++)
{
int id = i + 1;
consumerTasks.Add(Task.Run(() => ConsumerAsync(id, channel.Reader, logger, cts.Token)));
}
// -------------------------------------------------------------
// 6️⃣ Wait for producers to finish (or be cancelled)
// -------------------------------------------------------------
// In a real‑world scenario you might have a finite workload.
// Here we just wait for a cancellation request.
try
{
await Task.WhenAll(producerTasks);
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
{
// Expected – ignore
}
// -------------------------------------------------------------
// 7️⃣ Signal that no more items will be written
// -------------------------------------------------------------
logger.LogInformation("All producers completed – completing the channel.");
channel.Writer.Complete(); // tells consumers that the stream is finished
// -------------------------------------------------------------
// 8️⃣ Wait for consumers to drain the channel
// -------------------------------------------------------------
try
{
await Task.WhenAll(consumerTasks);
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
{
// Expected – ignore
}
logger.LogInformation("All consumers completed – application exiting.");
}
// -----------------------------------------------------------------
// Producer – generates numbers and writes them to the channel
// -----------------------------------------------------------------
private static async Task ProducerAsync(
int producerId,
ChannelWriter<int> writer,
ILogger logger,
CancellationToken ct)
{
var rnd = new Random(Guid.NewGuid().GetHashCode());
try
{
int produced = 0;
while (!ct.IsCancellationRequested)
{
// Simulate work (e.g., reading from a device, DB, etc.)
await Task.Delay(rnd.Next(100, 500), ct);
int value = rnd.Next(0, 1000);
// WriteAsync respects back‑pressure when the channel is bounded.
await writer.WriteAsync(value, ct);
produced++;
logger.LogInformation("[Producer {Id}] produced {Value} (total {Count})",
producerId, value, produced);
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Normal shutdown path – just exit.
}
catch (Exception ex)
{
// If something goes wrong we *complete* the channel with the exception.
// All consumers will see the same exception when they read.
logger.LogError(ex, "[Producer {Id}] encountered an error – completing channel with failure.", producerId);
writer.Complete(ex);
}
finally
{
// If this producer is the *last* one you could call Complete here,
// but in this demo we let Main() complete the channel after all producers finish.
logger.LogInformation("[Producer {Id}] exiting.", producerId);
}
}
// -----------------------------------------------------------------
// Consumer – reads numbers from the channel and processes them
// -----------------------------------------------------------------
private static async Task ConsumerAsync(
int consumerId,
ChannelReader<int> reader,
ILogger logger,
CancellationToken ct)
{
try
{
await foreach (var item in reader.ReadAllAsync(ct))
{
// Simulate processing time
await Task.Delay(200, ct);
logger.LogInformation("[Consumer {Id}] processed {Value}", consumerId, item);
}
// The loop ends when the channel is completed *and* empty.
logger.LogInformation("[Consumer {Id}] channel completed – exiting.", consumerId);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
logger.LogInformation("[Consumer {Id}] cancellation requested – exiting.", consumerId);
}
catch (ChannelClosedException ex) when (ex.InnerException != null)
{
// Propagated producer exception
logger.LogError(ex.InnerException,
"[Consumer {Id}] channel closed due to producer failure.", consumerId);
}
catch (Exception ex)
{
logger.LogError(ex, "[Consumer {Id}] unexpected error.", consumerId);
}
}
}
```
---
## 3. How it works
| Step | What happens | Channel interaction |
|------|--------------|---------------------|
| **Create channel** | `Channel.CreateBounded<T>(options)` builds a thread‑safe queue with a capacity. | Writers block (or wait) when the buffer is full; readers block when empty. |
| **Producers** | Each producer loops, creates a value, then `await writer.WriteAsync(value, ct)`. | `WriteAsync` **asynchronously** waits if the channel is full (bounded) or returns immediately (unbounded). |
| **Consumers** | Each consumer uses `await foreach (var item in reader.ReadAllAsync(ct))`. | `ReadAllAsync` yields items until the channel is **completed** *and* empty. |
| **Shutdown** | When cancellation is requested, producers stop producing. After all producers finish, `writer.Complete()` is called. | Completion signals to readers that no more items will ever be written. The `await foreach` loop then ends. |
| **Error propagation** | If a producer throws, it calls `writer.Complete(exception)`. All readers receive a `ChannelClosedException` whose `InnerException` is the original error. | Allows a single failure to be observed by every consumer. |
---
## 4. Variations & Tips
| Scenario | Change |
|----------|--------|
| **Multiple independent pipelines** | Create separate `Channel<T>` instances per pipeline; the pattern stays the same. |
| **Back‑pressure with `FullMode = BoundedChannelFullMode.DropWrite`** | Use `DropWrite` or `DropOldest` to drop items instead of waiting. |
| **Processing batches** | Inside the consumer, accumulate items into a list and process when the list reaches a size or a timeout. |
| **Parallel processing inside a consumer** | Use `Parallel.ForEachAsync` on the items read from the channel. |
| **Graceful shutdown with a timeout** | After `cts.Cancel()`, you can `await Task.WhenAny(Task.WhenAll(...), Task.Delay(timeout))`. |
| **Channel of complex types** | Replace `int` with any POCO (`class Message { public Guid Id; public string Payload; }`). |
| **Bounded channel with `SingleWriter = true`** | If you know there will be exactly one producer, set this to improve performance. Same for `SingleReader`. |
---
## 5. Running the demo
```bash
dotnet run
```
You’ll see interleaved log lines such as:
```
12:00:01 [Producer 1] produced 483 (total 1)
12:00:01 [Producer 2] produced 12 (total 1)
12:00:01 [Consumer 1] processed 483
12:00:01 [Consumer 2] processed 12
...
```
Press **Ctrl+C** to trigger cancellation; the program will finish the current work, complete the channel, drain remaining items, and exit cleanly.
---
### TL;DR
*Create a `Channel<T>` (bounded or unbounded).
*Producers `await writer.WriteAsync(item, ct)`.
*Consumers `await foreach (var item in reader.ReadAllAsync(ct))`.
*On shutdown call `writer.Complete()` (or `Complete(exception)`).
*Use a `CancellationToken` to stop everything gracefully.*
That’s the idiomatic async producer‑consumer pattern in modern C#. Happy coding!
Enhanced response (with ZPI)
**Solution – Async Producer‑Consumer with `System.Threading.Channels`**
Below is a self‑contained, production‑ready example that demonstrates a classic
asynchronous producer‑consumer pattern using the .NET `Channel<T>` API
(`Channel<T>`, `ChannelReader<T>`, `ChannelWriter<T>`).
The sample includes:
* A **bounded** channel (back‑pressure) that can be shared safely between many
producers and many consumers.
* A **producer** that generates items asynchronously (simulated work).
* A **consumer** that processes items asynchronously (simulated work).
* **Cancellation** support so the whole pipeline can be stopped gracefully.
* **DI registration** (`IHostedService`) so the pattern can be dropped into an
ASP.NET Core / generic host application.
* Proper **error handling** – exceptions from producers/consumers are logged and
cause the channel to be completed so the other side can finish cleanly.
---
## 1. Domain model
```csharp
public record WorkItem(int Id, string Payload);
```
A simple immutable record that represents a unit of work.
---
## 2. The channel wrapper (optional but convenient)
```csharp
using System.Threading.Channels;
public interface IWorkQueue
{
ChannelWriter<WorkItem> Writer { get; }
ChannelReader<WorkItem> Reader { get; }
}
public sealed class WorkQueue : IWorkQueue, IDisposable
{
private readonly Channel<WorkItem> _channel;
public WorkQueue(int capacity, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = fullMode,
SingleWriter = false,
SingleReader = false
};
_channel = Channel.CreateBounded<WorkItem>(options);
}
public ChannelWriter<WorkItem> Writer => _channel.Writer;
public ChannelReader<WorkItem> Reader => _channel.Reader;
public void Dispose() => _channel.Writer.TryComplete();
}
```
*The wrapper is only a thin façade that makes the channel injectable via DI.*
---
## 3. Producer service
```csharp
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public class ProducerService : BackgroundService
{
private readonly IWorkQueue _queue;
private readonly ILogger<ProducerService> _log;
private readonly int _producerId;
private readonly Random _rnd = new();
public ProducerService(IWorkQueue queue, ILogger<ProducerService> log, int producerId)
{
_queue = queue;
_log = log;
_producerId = producerId;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var writer = _queue.Writer;
try
{
int counter = 0;
while (!stoppingToken.IsCancellationRequested)
{
// Simulate work to produce an item
await Task.Delay(_rnd.Next(100, 500), stoppingToken);
var item = new WorkItem(counter++, $"Prod{_producerId}-Msg");
_log.LogInformation("Producer {ProdId} produced {Item}", _producerId, item);
// WriteAsync respects back‑pressure; it will wait if the channel is full.
await writer.WriteAsync(item, stoppingToken);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Normal shutdown – just fall through.
}
catch (Exception ex)
{
_log.LogError(ex, "Producer {ProdId} failed");
// Propagate the error to the consumer side.
writer.TryComplete(ex);
}
finally
{
// When the host stops we complete the writer so consumers can finish.
writer.TryComplete();
}
}
}
```
*Each producer runs as a `BackgroundService`. Multiple instances can be
registered – they all share the same `IWorkQueue`.*
---
## 4. Consumer service
```csharp
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public class ConsumerService : BackgroundService
{
private readonly IWorkQueue _queue;
private readonly ILogger<ConsumerService> _log;
private readonly int _consumerId;
private readonly Random _rnd = new();
public ConsumerService(IWorkQueue queue, ILogger<ConsumerService> log, int consumerId)
{
_queue = queue;
_log = log;
_consumerId = consumerId;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var reader = _queue.Reader;
try
{
await foreach (var item in reader.ReadAllAsync(stoppingToken))
{
// Simulate processing time
await Task.Delay(_rnd.Next(200, 800), stoppingToken);
_log.LogInformation("Consumer {ConsId} processed {Item}", _consumerId, item);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Normal shutdown.
}
catch (Exception ex)
{
_log.LogError(ex, "Consumer {ConsId} failed");
}
}
}
```
*`ReadAllAsync` enumerates the channel until it is completed (or cancelled).*
---
## 5. DI registration (e.g., in `Program.cs` for a generic host)
```csharp
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((ctx, services) =>
{
// Register a single shared bounded channel (capacity = 100)
services.AddSingleton<IWorkQueue>(sp => new WorkQueue(capacity: 100));
// Register 2 producers and 3 consumers – IDs are injected via factories.
services.AddHostedService(sp =>
new ProducerService(
sp.GetRequiredService<IWorkQueue>(),
sp.GetRequiredService<ILogger<ProducerService>>(),
producerId: 1));
services.AddHostedService(sp =>
new ProducerService(
sp.GetRequiredService<IWorkQueue>(),
sp.GetRequiredService<ILogger<ProducerService>>(),
producerId: 2));
services.AddHostedService(sp =>
new ConsumerService(
sp.GetRequiredService<IWorkQueue>(),
sp.GetRequiredService<ILogger<ConsumerService>>(),
consumerId: 1));
services.AddHostedService(sp =>
new ConsumerService(
sp.GetRequiredService<IWorkQueue>(),
sp.GetRequiredService<ILogger<ConsumerService>>(),
consumerId: 2));
services.AddHostedService(sp =>
new ConsumerService(
sp.GetRequiredService<IWorkQueue>(),
sp.GetRequiredService<ILogger<ConsumerService>>(),
consumerId: 3));
})
.Build();
await host.RunAsync();
```
*The host will start all background services, share the same bounded channel,
and shut everything down cleanly on `Ctrl+C` or SIGTERM.*
---
## 6. How it works – Step‑by‑step
1. **Channel creation** – `WorkQueue` builds a *bounded* channel (`Channel.CreateBounded<T>`).
Bounded channels provide back‑pressure: if producers outrun consumers, `WriteAsync`
will await until space becomes available.
2. **Producers** – Each `ProducerService` loops until cancellation, creates a
`WorkItem`, and calls `writer.WriteAsync`. The call is *awaitable* and will
pause when the channel is full, preventing unbounded memory growth.
3. **Consumers** – Each `ConsumerService` uses `await foreach` over
`reader.ReadAllAsync`. This enumerates items as they become available and
automatically exits when the writer completes (or when cancellation is
requested).
4. **Graceful shutdown** – When the host signals cancellation:
* Producers catch `OperationCanceledException`, exit the loop and call
`writer.TryComplete()`.
* Consumers see the completion and finish processing any remaining items,
then exit.
5. **Error propagation** – If a producer throws an unexpected exception,
it calls `writer.TryComplete(ex)`. The exception is stored in the channel
and re‑thrown on the consumer side when they attempt to read, allowing the
consumer to log the failure.
---
## 7. Why this pattern is idiomatic
* **`Channel<T>`** is the recommended low‑level building block for high‑throughput
producer‑consumer pipelines in .NET (available since .NET Core 3.0).
* Using **`BackgroundService`** integrates naturally with the generic host,
giving you lifetime management, DI, and structured logging for free.
* **Bounded channels** give natural flow‑control without manual `SemaphoreSlim`
or custom queues.
* The code is **cancellation‑aware** and **exception‑aware**, which is essential
for reliable services.
---
### Full source file (for quick copy‑paste)
```csharp
// ------------------------------------------------------------
// Program.cs (or any .cs file in a console/worker project)
// ------------------------------------------------------------
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
// ------------------------------------------------------------
// Domain model
// ------------------------------------------------------------
public record WorkItem(int Id, string Payload);
// ------------------------------------------------------------
// Channel wrapper
// ------------------------------------------------------------
public interface IWorkQueue
{
ChannelWriter<WorkItem> Writer { get; }
ChannelReader<WorkItem> Reader { get; }
}
public sealed class WorkQueue : IWorkQueue, IDisposable
{
private readonly Channel<WorkItem> _channel;
public WorkQueue(int capacity, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = fullMode,
SingleWriter = false,
SingleReader = false
};
_channel = Channel.CreateBounded<WorkItem>(options);
}
public ChannelWriter<WorkItem> Writer => _channel.Writer;
public ChannelReader<WorkItem> Reader => _channel.Reader;
public void Dispose() => _channel.Writer.TryComplete();
}
// ------------------------------------------------------------
// Producer
// ------------------------------------------------------------
public class ProducerService : BackgroundService
{
private readonly IWorkQueue _queue;
private readonly ILogger<ProducerService> _log;
private readonly int _producerId;
private readonly Random _rnd = new();
public ProducerService(IWorkQueue queue, ILogger<ProducerService> log, int producerId)
{
_queue = queue;
_log = log;
_producerId = producerId;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var writer = _queue.Writer;
try
{
int counter = 0;
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(_rnd.Next(100, 500), stoppingToken);
var item = new WorkItem(counter++, $"Prod{_producerId}-Msg");
_log.LogInformation("Producer {ProdId} produced {Item}", _producerId, item);
await writer.WriteAsync(item, stoppingToken);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
}
catch (Exception ex)
{
_log.LogError(ex, "Producer {ProdId} failed");
writer.TryComplete(ex);
}
finally
{
writer.TryComplete();
}
}
}
// ------------------------------------------------------------
// Consumer
// ------------------------------------------------------------
public class ConsumerService : BackgroundService
{
private readonly IWorkQueue _queue;
private readonly ILogger<ConsumerService> _log;
private readonly int _consumerId;
private readonly Random _rnd = new();
public ConsumerService(IWorkQueue queue, ILogger<ConsumerService> log, int consumerId)
{
_queue = queue;
_log = log;
_consumerId = consumerId;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var reader = _queue.Reader;
try
{
await foreach (var item in reader.ReadAllAsync(stoppingToken))
{
await Task.Delay(_rnd.Next(200, 800), stoppingToken);
_log.LogInformation("Consumer {ConsId} processed {Item}", _consumerId, item);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
}
catch (Exception ex)
{
_log.LogError(ex, "Consumer {ConsId} failed");
}
}
}
// ------------------------------------------------------------
// Host builder
// ------------------------------------------------------------
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((ctx, services) =>
{
services.AddSingleton<IWorkQueue>(sp => new WorkQueue(capacity: 100));
// Two producers
services.AddHostedService(sp =>
new ProducerService(
sp.GetRequiredService<IWorkQueue>(),
sp.GetRequiredService<ILogger<ProducerService>>(),
producerId: 1));
services.AddHostedService(sp =>
new ProducerService(
sp.GetRequiredService<IWorkQueue>(),
sp.GetRequiredService<ILogger<ProducerService>>(),
producerId: 2));
// Three consumers
services.AddHostedService(sp =>
new ConsumerService(
sp.GetRequiredService<IWorkQueue>(),
sp.GetRequiredService<ILogger<ConsumerService>>(),
consumerId: 1));
services.AddHostedService(sp =>
new ConsumerService(
sp.GetRequiredService<IWorkQueue>(),
sp.GetRequiredService<ILogger<ConsumerService>>(),
consumerId: 2));
services.AddHostedService(sp =>
new ConsumerService(
sp.GetRequiredService<IWorkQueue>(),
sp.GetRequiredService<ILogger<ConsumerService>>(),
consumerId: 3));
})
.Build();
await host.RunAsync();
```
Run the project (`dotnet run`). You’ll see interleaved log lines showing
producers generating items and consumers processing them, all coordinated
through the shared `Channel<WorkItem>` without any explicit locks.
Grader chain
Each grader reads the same baseline + enhanced pair and casts an independent verdict. Disagreement is the framework's own signal — not noise to be averaged away.
V1claude-sonnet-4-6
BetterV4qwen2.5-coder-7b-instruct
BetterComments
Our grader said what it said. What do you say? Comment as a guest below.
No comments yet. Be the first to say what you make of this round.