Round #714
10 May 2026, 15:11· csharp· Meta-Llama-3.3-70B-Instruct· r13-clean-hints
- Complexity
- band 1
- ZP / UO / OBS
- 7 / 3 / 2
- Journey weight
- 854
- Lightshift ΔL
- +0.14
Prompt
Implement an IObservable<T> stream over Channel<T> with backpressure, completing on CancellationToken cancellation.
Zero-point index (what the engine surfaced)
ZPI-ZPI-0013 | 2026-05-10T15:11:53Z
Implement an IObservable<T> stream over Channel<T> with backpressure, completing on CancellationToken cancellation.
ZERO POINT — 7 entries (structural invariants)
● hsl(303.02, 100%, 42%) Channel<T> [contract]
● hsl(160.91, 100%, 42%) ChannelReader<T> [contract]
● hsl(195.35, 100%, 42%) ChannelWriter<T> [contract]
● hsl(77.13, 100%, 15%) IObservable<T> [contract]
● hsl(94.80, 100%, 15%) IObserver<T> [contract]
● hsl(299.79, 100%, 15%) Subject<T> [contract]
● hsl(164.81, 100%, 15%) IScheduler [contract]
UNOBSERVED — 3 entries (implied truths)
○ hsl(27.69, 100%, 50%) async [feature]
○ hsl(3.75, 100%, 38%) CancellationToken [feature]
○ hsl(299.08, 100%, 7%) gRPC service implementation [function]
OBSERVED — 2 entries (explicit in request)
◉ hsl(73.85, 100%, 15%) on [feature]
◉ hsl(98.46, 100%, 50%) with [feature]
COMPLEXITY │ 12 entries │ spread: 295° │ band: MEDIUM
Baseline response (no ZPI)
Here's an implementation of an `IObservable<T>` stream over `Channel<T>` with backpressure, completing on `CancellationToken` cancellation:
```csharp
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Collections.Generic;
using System.Linq;
public class ChannelObservable<T> : IObservable<T>
{
private readonly Channel<T> _channel;
private readonly CancellationToken _cancellationToken;
public ChannelObservable(Channel<T>, CancellationToken cancellationToken)
{
_channel = channel;
_cancellationToken = cancellationToken;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var subscription = new Subscription(_channel, observer, _cancellationToken);
return subscription;
}
private class Subscription : IDisposable
{
private readonly Channel<T> _channel;
private readonly IObserver<T> _observer;
private readonly CancellationToken _cancellationToken;
private bool _disposed;
public Subscription(Channel<T> channel, IObserver<T> observer, CancellationToken cancellationToken)
{
_channel = channel;
_observer = observer;
_cancellationToken = cancellationToken;
}
public void Dispose()
{
if (!_disposed)
{
_disposed = true;
_channel.Reader.Complete(_cancellationToken);
}
}
public async Task StartAsync()
{
try
{
while (await _channel.Reader.WaitToReadAsync(_cancellationToken))
{
while (_channel.Reader.TryRead(out var item))
{
_observer.OnNext(item);
}
}
_observer.OnCompleted();
}
catch (OperationCanceledException)
{
_observer.OnError(new OperationCanceledException());
}
catch (Exception ex)
{
_observer.OnError(ex);
}
finally
{
Dispose();
}
}
}
}
public class Program
{
public static async Task Main()
{
var channel = Channel.CreateUnbounded<T>();
var cts = new CancellationTokenSource();
var observable = new ChannelObservable<T>(channel, cts.Token);
var subscription = observable.Subscribe(new Observer<T>());
// Produce items
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
await Task.Delay(100);
}
// Complete the channel
channel.Writer.Complete();
// Start consuming
await new Subscription(channel, new Observer<T>(), cts.Token).StartAsync();
}
}
public class Observer<T> : IObserver<T>
{
public void OnNext(T value)
{
Console.WriteLine($"Received: {value}");
}
public void OnError(Exception error)
{
Console.WriteLine($"Error: {error.Message}");
}
public void OnCompleted()
{
Console.WriteLine("Completed");
}
}
```
In this code:
* We create a `ChannelObservable<T>` class that implements `IObservable<T>`.
* The `ChannelObservable<T>` class takes a `Channel<T>` and a `CancellationToken` in its constructor.
* The `Subscribe` method returns an `IDisposable` subscription object.
* The subscription object starts consuming items from the channel when `StartAsync` is called.
* If the `CancellationToken` is canceled, the subscription will complete and dispose of the channel reader.
* We also define an `Observer<T>` class that implements `IObserver<T>` to demonstrate how to subscribe to the observable.
* In the `Main` method, we create a channel, produce items, and start consuming them using the `ChannelObservable<T>` and `Observer<T>` classes.
This implementation provides an `IObservable<T>` stream over a `Channel<T>` with backpressure, completing on `CancellationToken` cancellation.
Enhanced response (with ZPI)
To implement an `IObservable<T>` stream over a `Channel<T>` with backpressure, completing on `CancellationToken` cancellation, we can leverage the `System.Threading.Channels` and `System.Reactive.Linq` namespaces. The `Channel<T>` will be used to handle the backpressure, and `IObservable<T>` will be used to create the stream. We'll also utilize `CancellationToken` to handle cancellation.
Here's a complete implementation:
```csharp
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
public class ChannelObservable<T>
{
private readonly Channel<T> _channel;
private readonly CancellationToken _cancellationToken;
public ChannelObservable(Channel<T> channel, CancellationToken cancellationToken)
{
_channel = channel;
_cancellationToken = cancellationToken;
}
public IObservable<T> GetObservable()
{
return Observable.Create<T>(async (observer, cancellationToken) =>
{
try
{
while (await _channel.Reader.WaitToReadAsync(_cancellationToken))
{
while (_channel.Reader.TryRead(out var item))
{
observer.OnNext(item);
}
}
observer.OnCompleted();
}
catch (OperationCanceledException) when (_cancellationToken.IsCancellationRequested)
{
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
return Disposable.Empty;
});
}
}
public class Program
{
public static async Task Main()
{
var channel = Channel.CreateUnbounded<T>();
var cts = new CancellationTokenSource();
var observable = new ChannelObservable<T>(channel, cts.Token).GetObservable();
// Subscribe to the observable
using var subscription = observable.Subscribe(
item => Console.WriteLine($"Received: {item}"),
error => Console.WriteLine($"Error: {error}"),
() => Console.WriteLine("Completed"));
// Write items to the channel
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
await Task.Delay(100);
}
channel.Writer.Complete();
// Cancel the token source after 5 seconds
await Task.Delay(5000);
cts.Cancel();
}
}
```
Explanation:
* We create a `ChannelObservable<T>` class that takes a `Channel<T>` and a `CancellationToken` in its constructor.
* The `GetObservable` method returns an `IObservable<T>` that uses `Observable.Create` to create the stream.
* Inside the `Observable.Create` callback, we use a `while` loop to read items from the channel asynchronously using `WaitToReadAsync` and `TryRead`.
* We use `observer.OnNext` to push each item to the subscribers.
* If the `CancellationToken` is canceled, we catch the `OperationCanceledException` and complete the observable using `observer.OnCompleted`.
* If any other exception occurs, we use `observer.OnError` to propagate the error to the subscribers.
* In the `Main` method, we create a channel, a `CancellationTokenSource`, and an instance of `ChannelObservable<T>`.
* We subscribe to the observable and write items to the channel using `WriteAsync`.
* After writing all items, we complete the channel writer using `Complete`.
* Finally, we cancel the token source after 5 seconds to demonstrate the cancellation behavior.
Grader chain
Each grader reads the same baseline + enhanced pair and casts an independent verdict. Disagreement is the framework's own signal — not noise to be averaged away.
V1samba/Meta-Llama-3.3-70B-Instruct
BetterV4qwen2.5-coder-3b-instruct
BetterV5openrouter/qwen/qwen3-235b-a22b-2507
BetterV7qwen2.5-coder-7b-instruct
BetterV8openai/gpt-4.1
BetterV9anthropic/claude-opus-4-7
BetterV12openai/gpt-4o
BetterComments
Our grader said what it said. What do you say? Comment as a guest below.
No comments yet. Be the first to say what you make of this round.