Round #1143

22 May 2026, 17:10· python· claude-sonnet-4-6· r13-clean-hints
Complexity
band 1
ZP / UO / OBS
6 / 2 / 17
Journey weight
2091
Lightshift ΔL

Prompt

Write an async function fetch_all that takes a list of URLs and concurrently fetches each one using async def coroutines and the await expression. Bound parallelism with asyncio.Semaphore set to 5 concurrent tasks. Schedule the work via asyncio.gather and collect results as a list of tuples (url, status, body). Use asyncio.wait_for to apply a per-request timeout that raises asyncio.TimeoutError on deadline. Launch the top-level coroutine with asyncio.run inside the main guard. Annotate the return type with typing.List and typing.Tuple.

Zero-point index (what the engine surfaced)

ZPI-ZPI-0003 | 2026-05-22T17:10:36Z Write an async function fetch_all that takes a list of URLs and concurrently fetches each one using async def coroutines and the await expression. Bound parallelism with asyncio.Semaphore set to 5 concurrent tasks. Schedule the work via asyncio.gather and collect results as a list of tuples (url, status, body). Use asyncio.wait_for to apply a per-request timeout that raises asyncio.TimeoutError on deadline. Launch the top-level coroutine with asyncio.run inside the main guard. Annotate the return type with typing.List and typing.Tuple. ZERO POINT — 6 entries (structural invariants) ● hsl(185.95, 100%, 15%) asyncio event loop [contract] ● hsl(118.68, 100%, 15%) await suspension point [contract] ● hsl(2.06, 100%, 15%) cooperative scheduling [contract] ● hsl(277.10, 100%, 15%) __name__ == "__main__" guard [contract] ● hsl(355.85, 100%, 15%) module vs script execution [contract] ● hsl(125.46, 100%, 15%) import-time isolation [contract] UNOBSERVED — 2 entries (implied truths) ○ hsl(31.79, 100%, 15%) async [feature] ○ hsl(15.38, 100%, 40%) try [feature] OBSERVED — 17 entries (explicit in request) ◉ hsl(0.00, 100%, 50%) def [feature] ◉ hsl(3.08, 100%, 15%) return [feature] ◉ hsl(32.82, 100%, 15%) await [feature] ◉ hsl(33.84, 100%, 15%) with [feature] ◉ hsl(50.25, 100%, 15%) tuple [feature] ◉ hsl(58.46, 100%, 15%) function [feature] ◉ hsl(61.54, 100%, 15%) coroutine [feature] ◉ hsl(146.66, 100%, 15%) __set__ [feature] ◉ hsl(175.38, 100%, 27%) __all__ [feature] ◉ hsl(176.40, 100%, 15%) __main__ [feature] ◉ hsl(191.79, 100%, 15%) await expression [feature] ◉ hsl(241.02, 100%, 15%) type guard [feature] ◉ hsl(136.62, 100%, 7%) asyncio run [function] ◉ hsl(138.46, 100%, 8%) asyncio gather [function] ◉ hsl(140.31, 100%, 7%) asyncio wait [function] ◉ hsl(145.85, 100%, 7%) asyncio wait_for [function] ◉ hsl(153.23, 100%, 7%) asyncio Semaphore [function] COMPLEXITY │ 25 entries │ spread: 241° │ band: MEDIUM

Baseline response (no ZPI)

```python """ Async URL fetcher with bounded parallelism, per-request timeouts, and type annotations. Key asyncio concepts demonstrated ────────────────────────────────── • async def / await – coroutine definition and suspension points • asyncio.Semaphore – caps concurrent tasks at 5 • asyncio.gather – schedules all coroutines and collects results • asyncio.wait_for – wraps each fetch with a per-request deadline • asyncio.TimeoutError – raised (and caught) when the deadline expires • asyncio.run – entry-point that creates the event-loop and runs a coroutine • typing.List / typing.Tuple – return-type annotations """ import asyncio import aiohttp from typing import List, Tuple # ── constants ────────────────────────────────────────────────────────────────── MAX_CONCURRENT: int = 5 # maximum simultaneous HTTP connections REQUEST_TIMEOUT: float = 10.0 # seconds before asyncio.TimeoutError is raised # ── demo URLs (mix of real, slow, and non-existent to exercise all code-paths) ─ DEMO_URLS: List[str] = [ "https://httpbin.org/get", "https://httpbin.org/delay/2", "https://httpbin.org/status/404", "https://httpbin.org/status/500", "https://httpbin.org/delay/15", # intentionally exceeds timeout "https://httpbin.org/json", "https://httpbin.org/uuid", "https://httpbin.org/ip", "https://this-domain-does-not-exist.example/", # connection error demo "https://httpbin.org/headers", ] # ── core coroutine ───────────────────────────────────────────────────────────── async def fetch_one( session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore, timeout: float, ) -> Tuple[str, int, str]: """ Fetch a single URL inside the semaphore guard and a per-request timeout. Parameters ---------- session : shared aiohttp session (connection pool reuse) url : target URL semaphore : asyncio.Semaphore that bounds concurrency timeout : per-request deadline in seconds Returns ------- Tuple[str, int, str] (url, http_status_code, response_body_text) Special return codes -------------------- -1 asyncio.TimeoutError – request exceeded `timeout` seconds -2 aiohttp error – network / DNS / protocol problem -3 unexpected exception """ async def _do_request() -> Tuple[str, int, str]: # The semaphore limits how many coroutines enter this block at once. async with semaphore: async with session.get(url) as response: status: int = response.status # read at most 64 KiB to keep memory bounded body: str = await response.text(encoding="utf-8", errors="replace") body = body[:65_536] return (url, status, body) # asyncio.wait_for cancels _do_request() and raises asyncio.TimeoutError # if the coroutine does not complete within `timeout` seconds. try: result: Tuple[str, int, str] = await asyncio.wait_for( _do_request(), timeout=timeout ) return result except asyncio.TimeoutError: print(f" [TIMEOUT] {url} (>{timeout}s)") return (url, -1, f"TimeoutError: exceeded {timeout}s") except aiohttp.ClientError as exc: print(f" [NET ERR] {url} – {exc}") return (url, -2, f"ClientError: {exc}") except Exception as exc: # catch-all so gather never receives an exception print(f" [ERROR] {url} – {exc}") return (url, -3, f"UnexpectedError: {exc}") # ── top-level coroutine ──────────────────────────────────────────────────────── async def fetch_all( urls: List[str], *, max_concurrent: int = MAX_CONCURRENT, timeout: float = REQUEST_TIMEOUT, ) -> List[Tuple[str, int, str]]: """ Concurrently fetch every URL in *urls* and return results as a list of tuples. Parameters ---------- urls : URLs to fetch max_concurrent : semaphore limit (default MAX_CONCURRENT = 5) timeout : per-request wall-clock deadline in seconds Returns ------- List[Tuple[str, int, str]] Ordered list of (url, status_code, body) in the same order as *urls*. Error/timeout entries use negative status codes (see fetch_one). """ if not urls: return [] # One semaphore shared across all coroutines – this is the concurrency gate. semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrent) # A single aiohttp session reuses the underlying TCP connection pool. connector = aiohttp.TCPConnector(limit=max_concurrent) async with aiohttp.ClientSession(connector=connector) as session: # Build one coroutine per URL. coroutines = [ fetch_one(session, url, semaphore, timeout) for url in urls ] # asyncio.gather schedules all coroutines concurrently on the running # event loop and awaits until every one of them has completed or # returned a result. Because fetch_one never propagates exceptions # (it catches them all), return_exceptions=False is safe here, but we # set it explicitly for clarity. results: List[Tuple[str, int, str]] = await asyncio.gather( *coroutines, return_exceptions=False, ) return results # ── pretty-printer ───────────────────────────────────────────────────────────── def _print_results(results: List[Tuple[str, int, str]]) -> None: """Display a compact summary table of fetch results.""" col_w = 55 print("\n" + "─" * 80) print(f"{'URL':<{col_w}} {'STATUS':>7} BODY (first 80 chars)") print("─" * 80) for url, status, body in results: preview = body.replace("\n", " ")[:80] status_str = str(status) if status > 0 else f"ERR({status})" print(f"{url:<{col_w}} {status_str:>7} {preview}") print("─" * 80) print(f"Total requests: {len(results)}\n") # ── entry-point ──────────────────────────────────────────────────────────────── async def main() -> None: """Orchestrates the demo run.""" print(f"Fetching {len(DEMO_URLS)} URLs " f"(max {MAX_CONCURRENT} concurrent, {REQUEST_TIMEOUT}s timeout each) …\n") results: List[Tuple[str, int, str]] = await fetch_all( DEMO_URLS, max_concurrent=MAX_CONCURRENT, timeout=REQUEST_TIMEOUT, ) _print_results(results) if __name__ == "__main__": # asyncio.run() creates a brand-new event loop, runs `main()` to # completion, then closes and cleans up the loop. asyncio.run(main()) ``` --- ## How every required piece is used | Requirement | Where in the code | |---|---| | `async def` coroutines | `fetch_one`, `fetch_all`, `_do_request`, `main` | | `await`

Enhanced response (with ZPI)

```python import asyncio import aiohttp from typing import List, Tuple, Optional async def fetch_one( session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore, timeout: float, ) -> Tuple[str, Optional[int], Optional[str]]: async with semaphore: try: async with await asyncio.wait_for( session.get(url), timeout=timeout ) as response: body = await response.text() return (url, response.status, body) except asyncio.TimeoutError: print(f"[timeout] {url}") return (url, None, None) except aiohttp.ClientError as exc: print(f"[client error] {url}: {exc}") return (url, None, None) async def fetch_all( urls: List[str], timeout: float = 10.0, max_concurrency: int = 5, ) -> List[Tuple[str, Optional[int], Optional[str]]]: semaphore = asyncio.Semaphore(max_concurrency) async with aiohttp.ClientSession() as session: tasks = [ fetch_one(session, url, semaphore, timeout) for url in urls ] results: List[Tuple[str, Optional[int], Optional[str]]] = ( await asyncio.gather(*tasks) ) return results if __name__ == "__main__": sample_urls = [ "https://httpbin.org/get", "https://httpbin.org/status/200", "https://httpbin.org/status/404", "https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/uuid", "https://httpbin.org/ip", ] results = asyncio.run(fetch_all(sample_urls, timeout=5.0, max_concurrency=5)) for url, status, body in results: snippet = (body[:80].replace("\n", " ") + "...") if body else "N/A" print(f"{status:>4} {url}\n {snippet}\n") ``` `fetch_one` owns the per-request lifecycle: it acquires the semaphore slot (bounding concurrency to `max_concurrency`), wraps `session.get` in `asyncio.wait_for` so the coroutine is cancelled and `asyncio.TimeoutError` is raised if the deadline passes, then reads and returns the body. Both `TimeoutError` and `aiohttp.ClientError` are caught locally so a single bad URL never poisons `gather`. `fetch_all` creates one `Semaphore` and one `ClientSession` that is shared across all tasks (efficient connection pooling), builds a list of coroutine objects, and hands them all to `asyncio.gather`, which schedules them on the running event loop concurrently. Because each task must acquire the semaphore before issuing its HTTP request, at most `max_concurrency` requests are in-flight at any moment — the rest are suspended at the `async with semaphore` await point, cooperatively yielding control. `asyncio.run` at the bottom creates a fresh event loop, drives `fetch_all` to completion, tears the loop down, and is correctly guarded by `if __name__ == "__main__"` so importing the module never triggers network I/O.

Grader chain

Each grader reads the same baseline + enhanced pair and casts an independent verdict. Disagreement is the framework's own signal — not noise to be averaged away.

V1samba/Meta-Llama-3.3-70B-Instruct
Better
V4qwen2.5-coder-3b-instruct
Better
V5cerebras/qwen-3-235b-a22b-instruct-2507
Worse
V7qwen2.5-coder-7b-instruct
Better
V8openai/gpt-4.1
Same
V9anthropic/claude-opus-4-7
Worse
V12openai/gpt-4o
Better

Comments

Our grader said what it said. What do you say? Comment as a guest below.

No comments yet. Be the first to say what you make of this round.

← back to all rounds