Round #350
08 May 2026, 17:10· csharp· claude-sonnet-4-6· r13-clean-hints
- Complexity
- band 0
- ZP / UO / OBS
- 3 / 0 / 4
- Journey weight
- 445
- Lightshift ΔL
- +0.00
Prompt
Implement async producer-consumer pattern using Channel<T>
Zero-point index (what the engine surfaced)
ZPI-ZPI-0004 | 2026-05-08T17:10:44Z
Implement async producer-consumer pattern using Channel<T>
ZERO POINT — 3 entries (structural invariants)
● hsl(303.02, 100%, 25%) Channel<T> [contract]
● hsl(160.91, 100%, 25%) ChannelReader<T> [contract]
● hsl(195.35, 100%, 25%) ChannelWriter<T> [contract]
UNOBSERVED — 0 entries (implied truths)
OBSERVED — 4 entries (explicit in request)
◉ hsl(27.69, 100%, 50%) async [feature]
◉ hsl(77.95, 100%, 30%) async [feature]
◉ hsl(85.13, 100%, 50%) using [feature]
◉ hsl(234.87, 100%, 50%) using [feature]
COMPLEXITY │ 7 entries │ spread: 207° │ band: LOW
Baseline response (no ZPI)
# Async Producer-Consumer Pattern Using Channel<T>
## Overview
A complete, production-ready implementation featuring multiple producers/consumers, cancellation, error handling, backpressure, and monitoring.
## Implementation
```csharp
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
// ============================================================
// Domain Models
// ============================================================
public record WorkItem(
int Id,
string Payload,
int Priority = 0,
DateTime CreatedAt = default)
{
public DateTime CreatedAt { get; init; } = CreatedAt == default ? DateTime.UtcNow : CreatedAt;
}
public record ProcessingResult(
int WorkItemId,
bool Success,
string Output,
TimeSpan Duration,
Exception? Error = null);
// ============================================================
// Channel Statistics
// ============================================================
public class ChannelStatistics
{
private long _produced;
private long _consumed;
private long _failed;
private long _dropped;
public long Produced => Interlocked.Read(ref _produced);
public long Consumed => Interlocked.Read(ref _consumed);
public long Failed => Interlocked.Read(ref _failed);
public long Dropped => Interlocked.Read(ref _dropped);
public long Pending => Produced - Consumed - Failed - Dropped;
public void IncrementProduced() => Interlocked.Increment(ref _produced);
public void IncrementConsumed() => Interlocked.Increment(ref _consumed);
public void IncrementFailed() => Interlocked.Increment(ref _failed);
public void IncrementDropped() => Interlocked.Increment(ref _dropped);
public override string ToString() =>
$"Produced={Produced}, Consumed={Consumed}, " +
$"Failed={Failed}, Dropped={Dropped}, Pending={Pending}";
}
// ============================================================
// Pipeline Configuration
// ============================================================
public class PipelineOptions
{
/// <summary>Maximum items buffered in the channel (-1 = unbounded).</summary>
public int Capacity { get; init; } = 100;
/// <summary>Number of concurrent producer tasks.</summary>
public int ProducerCount { get; init; } = 2;
/// <summary>Number of concurrent consumer tasks.</summary>
public int ConsumerCount { get; init; } = 4;
/// <summary>Drop items instead of blocking when the channel is full.</summary>
public bool DropWhenFull { get; init; } = false;
/// <summary>How long a producer waits before giving up on a write.</summary>
public TimeSpan ProducerTimeout { get; init; } = TimeSpan.FromSeconds(5);
/// <summary>Delay simulated per work item in a consumer.</summary>
public TimeSpan ConsumerDelay { get; init; } = TimeSpan.FromMilliseconds(50);
}
// ============================================================
// Producer
// ============================================================
public sealed class Producer
{
private readonly ChannelWriter<WorkItem> _writer;
private readonly PipelineOptions _options;
private readonly ChannelStatistics _stats;
private readonly IProgress<string>? _progress;
public int Id { get; }
public Producer(
int id,
ChannelWriter<WorkItem> writer,
PipelineOptions options,
ChannelStatistics stats,
IProgress<string>? progress = null)
{
Id = id;
_writer = writer;
_options = options;
_stats = stats;
_progress = progress;
}
/// <summary>Produces <paramref name="itemCount"/> items then returns.</summary>
public async Task ProduceAsync(int itemCount, CancellationToken ct = default)
{
_progress?.Report($"[Producer {Id}] Starting – will produce {itemCount} items.");
for (int i = 0; i < itemCount && !ct.IsCancellationRequested; i++)
{
var item = new WorkItem(
Id: Id * 10_000 + i,
Payload: $"Producer{Id}-Item{i}",
Priority: Random.Shared.Next(1, 10));
bool written = await TryWriteAsync(item, ct);
if (written)
{
_stats.IncrementProduced();
_progress?.Report($"[Producer {Id}] Sent item {item.Id} (priority={item.Priority})");
}
else
{
_stats.IncrementDropped();
_progress?.Report($"[Producer {Id}] Dropped item {item.Id} – channel full or cancelled");
}
// Simulate variable production rate
await Task.Delay(Random.Shared.Next(10, 30), ct).ConfigureAwait(false);
}
_progress?.Report($"[Producer {Id}] Finished producing.");
}
private async ValueTask<bool> TryWriteAsync(WorkItem item, CancellationToken ct)
{
if (_options.DropWhenFull)
{
// Non-blocking: drop if channel is at capacity
return _writer.TryWrite(item);
}
// Blocking with timeout
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(_options.ProducerTimeout);
try
{
await _writer.WriteAsync(item, timeoutCts.Token).ConfigureAwait(false);
return true;
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
// Only the timeout fired – treat as drop
return false;
}
}
}
// ============================================================
// Consumer
// ============================================================
public sealed class Consumer
{
private readonly ChannelReader<WorkItem> _reader;
private readonly PipelineOptions _options;
private readonly ChannelStatistics _stats;
private readonly ChannelWriter<ProcessingResult>? _resultWriter;
private readonly IProgress<string>? _progress;
public int Id { get; }
public Consumer(
int id,
ChannelReader<WorkItem> reader,
PipelineOptions options,
ChannelStatistics stats,
ChannelWriter<ProcessingResult>? resultWriter = null,
IProgress<string>? progress = null)
{
Id = id;
_reader = reader;
_options = options;
_stats = stats;
_resultWriter = resultWriter;
_progress = progress;
}
/// <summary>Consumes items until the channel is completed or cancellation is requested.</summary>
public async Task ConsumeAsync(CancellationToken ct = default)
{
_progress?.Report($"[Consumer {Id}] Starting.");
await foreach (WorkItem item in _reader.ReadAllAsync(ct).ConfigureAwait(false))
{
var result = await ProcessItemAsync(item, ct).ConfigureAwait(false);
if (result.Success)
_stats.IncrementConsumed();
else
_stats.IncrementFailed();
// Forward result downstream (optional chaining)
if (_resultWriter is not null)
await _resultWriter.WriteAsync(result, ct).ConfigureAwait(false);
_progress?.Report(
$"[Consumer {Id}] Processed item {item.Id} – " +
$"success={result.Success}, duration={result.Duration.TotalMilliseconds:F1}ms");
}
_progress?.Report($"[Consumer {Id}] Channel completed – shutting down.");
}
private async Task<ProcessingResult> ProcessItemAsync(WorkItem item, CancellationToken ct)
{
var sw = Stopwatch.StartNew();
try
{
// Simulate real async work (e.g. DB write, HTTP call)
await Task.Delay(_
Enhanced response (with ZPI)
# Async Producer-Consumer Pattern with `Channel<T>`
## Complete Implementation
```csharp
using System.Threading.Channels;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
// ──────────────────────────────────────────────────────────────
// Domain model
// ──────────────────────────────────────────────────────────────
public sealed record WorkItem(
int Id,
string Payload,
DateTimeOffset EnqueuedAt
);
public sealed record ProcessedResult(
int WorkItemId,
string Output,
TimeSpan ProcessingDuration
);
// ──────────────────────────────────────────────────────────────
// Channel wrapper — owns the Channel<T> and exposes typed
// reader/writer so consumers/producers are decoupled.
// ──────────────────────────────────────────────────────────────
public interface IWorkItemChannel
{
ChannelWriter<WorkItem> Writer { get; }
ChannelReader<WorkItem> Reader { get; }
}
public sealed class WorkItemChannel : IWorkItemChannel
{
private readonly Channel<WorkItem> _channel;
public WorkItemChannel(int capacity = 100)
{
// BoundedChannel gives back-pressure; swap for
// Channel.CreateUnbounded<WorkItem>() if unbounded is needed.
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait, // producer awaits when full
SingleWriter = false, // multiple producers OK
SingleReader = false // multiple consumers OK
};
_channel = Channel.CreateBounded<WorkItem>(options);
}
public ChannelWriter<WorkItem> Writer => _channel.Writer;
public ChannelReader<WorkItem> Reader => _channel.Reader;
}
// ──────────────────────────────────────────────────────────────
// Producer — IHostedService that writes to the channel
// ──────────────────────────────────────────────────────────────
public sealed class ProducerService : BackgroundService
{
private readonly ChannelWriter<WorkItem> _writer;
private readonly ILogger<ProducerService> _logger;
private int _sequence;
public ProducerService(IWorkItemChannel channel,
ILogger<ProducerService> logger)
{
_writer = channel.Writer;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Producer started.");
try
{
while (!stoppingToken.IsCancellationRequested)
{
var item = new WorkItem(
Id: Interlocked.Increment(ref _sequence),
Payload: $"payload-{_sequence}",
EnqueuedAt: DateTimeOffset.UtcNow
);
// WriteAsync respects back-pressure from BoundedChannel
await _writer.WriteAsync(item, stoppingToken);
_logger.LogInformation(
"Produced #{Id} – {Payload}", item.Id, item.Payload);
await Task.Delay(TimeSpan.FromMilliseconds(200), stoppingToken);
}
}
catch (OperationCanceledException)
{
// Normal shutdown path — swallow
}
finally
{
// Signal that no more items will be written.
// Consumers will drain remaining items then complete.
_writer.TryComplete();
_logger.LogInformation("Producer completed channel.");
}
}
}
// ──────────────────────────────────────────────────────────────
// Consumer — IHostedService that reads from the channel.
// Multiple instances can be registered for parallel consumption.
// ──────────────────────────────────────────────────────────────
public sealed class ConsumerService : BackgroundService
{
private readonly ChannelReader<WorkItem> _reader;
private readonly ILogger<ConsumerService> _logger;
private readonly string _name;
public ConsumerService(IWorkItemChannel channel,
ILogger<ConsumerService> logger,
string name = "Consumer-1")
{
_reader = channel.Reader;
_logger = logger;
_name = name;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("{Name} started.", _name);
// ReadAllAsync yields items as they arrive; exits cleanly when
// the writer calls TryComplete() and the channel is drained.
await foreach (WorkItem item in
_reader.ReadAllAsync(stoppingToken))
{
await ProcessAsync(item, stoppingToken);
}
_logger.LogInformation("{Name} finished — channel drained.", _name);
}
private async Task ProcessAsync(WorkItem item, CancellationToken ct)
{
var sw = System.Diagnostics.Stopwatch.StartNew();
// Simulate async work
await Task.Delay(TimeSpan.FromMilliseconds(500), ct);
sw.Stop();
var result = new ProcessedResult(
WorkItemId: item.Id,
Output: $"processed-{item.Payload}",
ProcessingDuration: sw.Elapsed
);
_logger.LogInformation(
"{Name} processed #{Id} in {Ms} ms → {Output}",
_name, result.WorkItemId,
result.ProcessingDuration.TotalMilliseconds,
result.Output);
}
}
// ──────────────────────────────────────────────────────────────
// Advanced: a self-contained pipeline that lets callers
// await individual results via a results channel.
// ──────────────────────────────────────────────────────────────
public sealed class Pipeline : IAsyncDisposable
{
private readonly Channel<WorkItem> _workChannel;
private readonly Channel<ProcessedResult> _resultChannel;
private readonly Task[] _workerTasks;
private readonly CancellationTokenSource _cts = new();
public ChannelWriter<WorkItem> WorkWriter => _workChannel.Writer;
public ChannelReader<ProcessedResult> ResultReader => _resultChannel.Reader;
public Pipeline(int workerCount = 4, int capacity = 256)
{
_workChannel = Channel.CreateBounded<WorkItem>(
new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = false,
SingleReader = false
});
_resultChannel = Channel.CreateUnbounded<ProcessedResult>(
new UnboundedChannelOptions { SingleWriter = false, SingleReader = false });
_workerTasks = Enumerable
.Range(1, workerCount)
.Select(i => RunWorkerAsync(i, _cts.Token))
.ToArray();
}
/// <summary>Completes the input side and waits for all workers to finish.</summary>
public async Task CompleteAsync()
{
_workChannel.Writer.TryComplete();
await Task.WhenAll(_workerTasks);
_resultChannel.Writer.TryComplete();
}
private async Task RunWorkerAsync(int workerId, CancellationToken ct)
{
await foreach (WorkItem item in _workChannel.Reader.ReadAllAsync(ct))
{
await Task.Delay(50, ct); // simulate real work
await _resultChannel.Writer.WriteAsync(new ProcessedResult(
WorkItemId: item.Id,
Output: $"w{workerId}:{item.Payload}",
ProcessingDuration: TimeSpan.FromMilliseconds(50)), ct);
}
}
public async ValueTask DisposeAsync()
{
_cts.Cancel();
try { await Task.WhenAll(_workerTasks); }
catch (OperationCanceledException) { /*
Grader chain
Each grader reads the same baseline + enhanced pair and casts an independent verdict. Disagreement is the framework's own signal — not noise to be averaged away.
V1claude-sonnet-4-6
BetterV4qwen2.5-coder-7b-instruct
BetterComments
Our grader said what it said. What do you say? Comment as a guest below.
No comments yet. Be the first to say what you make of this round.