Round #714

10 May 2026, 15:11· csharp· Meta-Llama-3.3-70B-Instruct· r13-clean-hints
Complexity
band 1
ZP / UO / OBS
7 / 3 / 2
Journey weight
854
Lightshift ΔL
+0.14

Prompt

Implement an IObservable<T> stream over Channel<T> with backpressure, completing on CancellationToken cancellation.

Zero-point index (what the engine surfaced)

ZPI-ZPI-0013 | 2026-05-10T15:11:53Z Implement an IObservable<T> stream over Channel<T> with backpressure, completing on CancellationToken cancellation. ZERO POINT — 7 entries (structural invariants) ● hsl(303.02, 100%, 42%) Channel<T> [contract] ● hsl(160.91, 100%, 42%) ChannelReader<T> [contract] ● hsl(195.35, 100%, 42%) ChannelWriter<T> [contract] ● hsl(77.13, 100%, 15%) IObservable<T> [contract] ● hsl(94.80, 100%, 15%) IObserver<T> [contract] ● hsl(299.79, 100%, 15%) Subject<T> [contract] ● hsl(164.81, 100%, 15%) IScheduler [contract] UNOBSERVED — 3 entries (implied truths) ○ hsl(27.69, 100%, 50%) async [feature] ○ hsl(3.75, 100%, 38%) CancellationToken [feature] ○ hsl(299.08, 100%, 7%) gRPC service implementation [function] OBSERVED — 2 entries (explicit in request) ◉ hsl(73.85, 100%, 15%) on [feature] ◉ hsl(98.46, 100%, 50%) with [feature] COMPLEXITY │ 12 entries │ spread: 295° │ band: MEDIUM

Baseline response (no ZPI)

Here's an implementation of an `IObservable<T>` stream over `Channel<T>` with backpressure, completing on `CancellationToken` cancellation: ```csharp using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Channels; using System.Collections.Generic; using System.Linq; public class ChannelObservable<T> : IObservable<T> { private readonly Channel<T> _channel; private readonly CancellationToken _cancellationToken; public ChannelObservable(Channel<T>, CancellationToken cancellationToken) { _channel = channel; _cancellationToken = cancellationToken; } public IDisposable Subscribe(IObserver<T> observer) { var subscription = new Subscription(_channel, observer, _cancellationToken); return subscription; } private class Subscription : IDisposable { private readonly Channel<T> _channel; private readonly IObserver<T> _observer; private readonly CancellationToken _cancellationToken; private bool _disposed; public Subscription(Channel<T> channel, IObserver<T> observer, CancellationToken cancellationToken) { _channel = channel; _observer = observer; _cancellationToken = cancellationToken; } public void Dispose() { if (!_disposed) { _disposed = true; _channel.Reader.Complete(_cancellationToken); } } public async Task StartAsync() { try { while (await _channel.Reader.WaitToReadAsync(_cancellationToken)) { while (_channel.Reader.TryRead(out var item)) { _observer.OnNext(item); } } _observer.OnCompleted(); } catch (OperationCanceledException) { _observer.OnError(new OperationCanceledException()); } catch (Exception ex) { _observer.OnError(ex); } finally { Dispose(); } } } } public class Program { public static async Task Main() { var channel = Channel.CreateUnbounded<T>(); var cts = new CancellationTokenSource(); var observable = new ChannelObservable<T>(channel, cts.Token); var subscription = observable.Subscribe(new Observer<T>()); // Produce items for (int i = 0; i < 10; i++) { await channel.Writer.WriteAsync(i); await Task.Delay(100); } // Complete the channel channel.Writer.Complete(); // Start consuming await new Subscription(channel, new Observer<T>(), cts.Token).StartAsync(); } } public class Observer<T> : IObserver<T> { public void OnNext(T value) { Console.WriteLine($"Received: {value}"); } public void OnError(Exception error) { Console.WriteLine($"Error: {error.Message}"); } public void OnCompleted() { Console.WriteLine("Completed"); } } ``` In this code: * We create a `ChannelObservable<T>` class that implements `IObservable<T>`. * The `ChannelObservable<T>` class takes a `Channel<T>` and a `CancellationToken` in its constructor. * The `Subscribe` method returns an `IDisposable` subscription object. * The subscription object starts consuming items from the channel when `StartAsync` is called. * If the `CancellationToken` is canceled, the subscription will complete and dispose of the channel reader. * We also define an `Observer<T>` class that implements `IObserver<T>` to demonstrate how to subscribe to the observable. * In the `Main` method, we create a channel, produce items, and start consuming them using the `ChannelObservable<T>` and `Observer<T>` classes. This implementation provides an `IObservable<T>` stream over a `Channel<T>` with backpressure, completing on `CancellationToken` cancellation.

Enhanced response (with ZPI)

To implement an `IObservable<T>` stream over a `Channel<T>` with backpressure, completing on `CancellationToken` cancellation, we can leverage the `System.Threading.Channels` and `System.Reactive.Linq` namespaces. The `Channel<T>` will be used to handle the backpressure, and `IObservable<T>` will be used to create the stream. We'll also utilize `CancellationToken` to handle cancellation. Here's a complete implementation: ```csharp using System; using System.Reactive.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; public class ChannelObservable<T> { private readonly Channel<T> _channel; private readonly CancellationToken _cancellationToken; public ChannelObservable(Channel<T> channel, CancellationToken cancellationToken) { _channel = channel; _cancellationToken = cancellationToken; } public IObservable<T> GetObservable() { return Observable.Create<T>(async (observer, cancellationToken) => { try { while (await _channel.Reader.WaitToReadAsync(_cancellationToken)) { while (_channel.Reader.TryRead(out var item)) { observer.OnNext(item); } } observer.OnCompleted(); } catch (OperationCanceledException) when (_cancellationToken.IsCancellationRequested) { observer.OnCompleted(); } catch (Exception ex) { observer.OnError(ex); } return Disposable.Empty; }); } } public class Program { public static async Task Main() { var channel = Channel.CreateUnbounded<T>(); var cts = new CancellationTokenSource(); var observable = new ChannelObservable<T>(channel, cts.Token).GetObservable(); // Subscribe to the observable using var subscription = observable.Subscribe( item => Console.WriteLine($"Received: {item}"), error => Console.WriteLine($"Error: {error}"), () => Console.WriteLine("Completed")); // Write items to the channel for (int i = 0; i < 10; i++) { await channel.Writer.WriteAsync(i); await Task.Delay(100); } channel.Writer.Complete(); // Cancel the token source after 5 seconds await Task.Delay(5000); cts.Cancel(); } } ``` Explanation: * We create a `ChannelObservable<T>` class that takes a `Channel<T>` and a `CancellationToken` in its constructor. * The `GetObservable` method returns an `IObservable<T>` that uses `Observable.Create` to create the stream. * Inside the `Observable.Create` callback, we use a `while` loop to read items from the channel asynchronously using `WaitToReadAsync` and `TryRead`. * We use `observer.OnNext` to push each item to the subscribers. * If the `CancellationToken` is canceled, we catch the `OperationCanceledException` and complete the observable using `observer.OnCompleted`. * If any other exception occurs, we use `observer.OnError` to propagate the error to the subscribers. * In the `Main` method, we create a channel, a `CancellationTokenSource`, and an instance of `ChannelObservable<T>`. * We subscribe to the observable and write items to the channel using `WriteAsync`. * After writing all items, we complete the channel writer using `Complete`. * Finally, we cancel the token source after 5 seconds to demonstrate the cancellation behavior.

Grader chain

Each grader reads the same baseline + enhanced pair and casts an independent verdict. Disagreement is the framework's own signal — not noise to be averaged away.

V1samba/Meta-Llama-3.3-70B-Instruct
Better
V4qwen2.5-coder-3b-instruct
Better
V5openrouter/qwen/qwen3-235b-a22b-2507
Better
V7qwen2.5-coder-7b-instruct
Better
V8openai/gpt-4.1
Better
V9anthropic/claude-opus-4-7
Better
V12openai/gpt-4o
Better

Comments

Our grader said what it said. What do you say? Comment as a guest below.

No comments yet. Be the first to say what you make of this round.

← back to all rounds