Round #1143
22 May 2026, 17:10· python· claude-sonnet-4-6· r13-clean-hints
- Complexity
- band 1
- ZP / UO / OBS
- 6 / 2 / 17
- Journey weight
- 2091
- Lightshift ΔL
- —
Prompt
Write an async function fetch_all that takes a list of URLs and concurrently fetches each one using async def coroutines and the await expression. Bound parallelism with asyncio.Semaphore set to 5 concurrent tasks. Schedule the work via asyncio.gather and collect results as a list of tuples (url, status, body). Use asyncio.wait_for to apply a per-request timeout that raises asyncio.TimeoutError on deadline. Launch the top-level coroutine with asyncio.run inside the main guard. Annotate the return type with typing.List and typing.Tuple.
Zero-point index (what the engine surfaced)
ZPI-ZPI-0003 | 2026-05-22T17:10:36Z
Write an async function fetch_all that takes a list of URLs and concurrently fetches each one using async def coroutines and the await expression. Bound parallelism with asyncio.Semaphore set to 5 concurrent tasks. Schedule the work via asyncio.gather and collect results as a list of tuples (url, status, body). Use asyncio.wait_for to apply a per-request timeout that raises asyncio.TimeoutError on deadline. Launch the top-level coroutine with asyncio.run inside the main guard. Annotate the return type with typing.List and typing.Tuple.
ZERO POINT — 6 entries (structural invariants)
● hsl(185.95, 100%, 15%) asyncio event loop [contract]
● hsl(118.68, 100%, 15%) await suspension point [contract]
● hsl(2.06, 100%, 15%) cooperative scheduling [contract]
● hsl(277.10, 100%, 15%) __name__ == "__main__" guard [contract]
● hsl(355.85, 100%, 15%) module vs script execution [contract]
● hsl(125.46, 100%, 15%) import-time isolation [contract]
UNOBSERVED — 2 entries (implied truths)
○ hsl(31.79, 100%, 15%) async [feature]
○ hsl(15.38, 100%, 40%) try [feature]
OBSERVED — 17 entries (explicit in request)
◉ hsl(0.00, 100%, 50%) def [feature]
◉ hsl(3.08, 100%, 15%) return [feature]
◉ hsl(32.82, 100%, 15%) await [feature]
◉ hsl(33.84, 100%, 15%) with [feature]
◉ hsl(50.25, 100%, 15%) tuple [feature]
◉ hsl(58.46, 100%, 15%) function [feature]
◉ hsl(61.54, 100%, 15%) coroutine [feature]
◉ hsl(146.66, 100%, 15%) __set__ [feature]
◉ hsl(175.38, 100%, 27%) __all__ [feature]
◉ hsl(176.40, 100%, 15%) __main__ [feature]
◉ hsl(191.79, 100%, 15%) await expression [feature]
◉ hsl(241.02, 100%, 15%) type guard [feature]
◉ hsl(136.62, 100%, 7%) asyncio run [function]
◉ hsl(138.46, 100%, 8%) asyncio gather [function]
◉ hsl(140.31, 100%, 7%) asyncio wait [function]
◉ hsl(145.85, 100%, 7%) asyncio wait_for [function]
◉ hsl(153.23, 100%, 7%) asyncio Semaphore [function]
COMPLEXITY │ 25 entries │ spread: 241° │ band: MEDIUM
Baseline response (no ZPI)
```python
"""
Async URL fetcher with bounded parallelism, per-request timeouts, and type annotations.
Key asyncio concepts demonstrated
──────────────────────────────────
• async def / await – coroutine definition and suspension points
• asyncio.Semaphore – caps concurrent tasks at 5
• asyncio.gather – schedules all coroutines and collects results
• asyncio.wait_for – wraps each fetch with a per-request deadline
• asyncio.TimeoutError – raised (and caught) when the deadline expires
• asyncio.run – entry-point that creates the event-loop and runs a coroutine
• typing.List / typing.Tuple – return-type annotations
"""
import asyncio
import aiohttp
from typing import List, Tuple
# ── constants ──────────────────────────────────────────────────────────────────
MAX_CONCURRENT: int = 5 # maximum simultaneous HTTP connections
REQUEST_TIMEOUT: float = 10.0 # seconds before asyncio.TimeoutError is raised
# ── demo URLs (mix of real, slow, and non-existent to exercise all code-paths) ─
DEMO_URLS: List[str] = [
"https://httpbin.org/get",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404",
"https://httpbin.org/status/500",
"https://httpbin.org/delay/15", # intentionally exceeds timeout
"https://httpbin.org/json",
"https://httpbin.org/uuid",
"https://httpbin.org/ip",
"https://this-domain-does-not-exist.example/", # connection error demo
"https://httpbin.org/headers",
]
# ── core coroutine ─────────────────────────────────────────────────────────────
async def fetch_one(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore,
timeout: float,
) -> Tuple[str, int, str]:
"""
Fetch a single URL inside the semaphore guard and a per-request timeout.
Parameters
----------
session : shared aiohttp session (connection pool reuse)
url : target URL
semaphore : asyncio.Semaphore that bounds concurrency
timeout : per-request deadline in seconds
Returns
-------
Tuple[str, int, str]
(url, http_status_code, response_body_text)
Special return codes
--------------------
-1 asyncio.TimeoutError – request exceeded `timeout` seconds
-2 aiohttp error – network / DNS / protocol problem
-3 unexpected exception
"""
async def _do_request() -> Tuple[str, int, str]:
# The semaphore limits how many coroutines enter this block at once.
async with semaphore:
async with session.get(url) as response:
status: int = response.status
# read at most 64 KiB to keep memory bounded
body: str = await response.text(encoding="utf-8", errors="replace")
body = body[:65_536]
return (url, status, body)
# asyncio.wait_for cancels _do_request() and raises asyncio.TimeoutError
# if the coroutine does not complete within `timeout` seconds.
try:
result: Tuple[str, int, str] = await asyncio.wait_for(
_do_request(), timeout=timeout
)
return result
except asyncio.TimeoutError:
print(f" [TIMEOUT] {url} (>{timeout}s)")
return (url, -1, f"TimeoutError: exceeded {timeout}s")
except aiohttp.ClientError as exc:
print(f" [NET ERR] {url} – {exc}")
return (url, -2, f"ClientError: {exc}")
except Exception as exc: # catch-all so gather never receives an exception
print(f" [ERROR] {url} – {exc}")
return (url, -3, f"UnexpectedError: {exc}")
# ── top-level coroutine ────────────────────────────────────────────────────────
async def fetch_all(
urls: List[str],
*,
max_concurrent: int = MAX_CONCURRENT,
timeout: float = REQUEST_TIMEOUT,
) -> List[Tuple[str, int, str]]:
"""
Concurrently fetch every URL in *urls* and return results as a list of tuples.
Parameters
----------
urls : URLs to fetch
max_concurrent : semaphore limit (default MAX_CONCURRENT = 5)
timeout : per-request wall-clock deadline in seconds
Returns
-------
List[Tuple[str, int, str]]
Ordered list of (url, status_code, body) in the same order as *urls*.
Error/timeout entries use negative status codes (see fetch_one).
"""
if not urls:
return []
# One semaphore shared across all coroutines – this is the concurrency gate.
semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrent)
# A single aiohttp session reuses the underlying TCP connection pool.
connector = aiohttp.TCPConnector(limit=max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
# Build one coroutine per URL.
coroutines = [
fetch_one(session, url, semaphore, timeout)
for url in urls
]
# asyncio.gather schedules all coroutines concurrently on the running
# event loop and awaits until every one of them has completed or
# returned a result. Because fetch_one never propagates exceptions
# (it catches them all), return_exceptions=False is safe here, but we
# set it explicitly for clarity.
results: List[Tuple[str, int, str]] = await asyncio.gather(
*coroutines,
return_exceptions=False,
)
return results
# ── pretty-printer ─────────────────────────────────────────────────────────────
def _print_results(results: List[Tuple[str, int, str]]) -> None:
"""Display a compact summary table of fetch results."""
col_w = 55
print("\n" + "─" * 80)
print(f"{'URL':<{col_w}} {'STATUS':>7} BODY (first 80 chars)")
print("─" * 80)
for url, status, body in results:
preview = body.replace("\n", " ")[:80]
status_str = str(status) if status > 0 else f"ERR({status})"
print(f"{url:<{col_w}} {status_str:>7} {preview}")
print("─" * 80)
print(f"Total requests: {len(results)}\n")
# ── entry-point ────────────────────────────────────────────────────────────────
async def main() -> None:
"""Orchestrates the demo run."""
print(f"Fetching {len(DEMO_URLS)} URLs "
f"(max {MAX_CONCURRENT} concurrent, {REQUEST_TIMEOUT}s timeout each) …\n")
results: List[Tuple[str, int, str]] = await fetch_all(
DEMO_URLS,
max_concurrent=MAX_CONCURRENT,
timeout=REQUEST_TIMEOUT,
)
_print_results(results)
if __name__ == "__main__":
# asyncio.run() creates a brand-new event loop, runs `main()` to
# completion, then closes and cleans up the loop.
asyncio.run(main())
```
---
## How every required piece is used
| Requirement | Where in the code |
|---|---|
| `async def` coroutines | `fetch_one`, `fetch_all`, `_do_request`, `main` |
| `await`
Enhanced response (with ZPI)
```python
import asyncio
import aiohttp
from typing import List, Tuple, Optional
async def fetch_one(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore,
timeout: float,
) -> Tuple[str, Optional[int], Optional[str]]:
async with semaphore:
try:
async with await asyncio.wait_for(
session.get(url), timeout=timeout
) as response:
body = await response.text()
return (url, response.status, body)
except asyncio.TimeoutError:
print(f"[timeout] {url}")
return (url, None, None)
except aiohttp.ClientError as exc:
print(f"[client error] {url}: {exc}")
return (url, None, None)
async def fetch_all(
urls: List[str],
timeout: float = 10.0,
max_concurrency: int = 5,
) -> List[Tuple[str, Optional[int], Optional[str]]]:
semaphore = asyncio.Semaphore(max_concurrency)
async with aiohttp.ClientSession() as session:
tasks = [
fetch_one(session, url, semaphore, timeout)
for url in urls
]
results: List[Tuple[str, Optional[int], Optional[str]]] = (
await asyncio.gather(*tasks)
)
return results
if __name__ == "__main__":
sample_urls = [
"https://httpbin.org/get",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/uuid",
"https://httpbin.org/ip",
]
results = asyncio.run(fetch_all(sample_urls, timeout=5.0, max_concurrency=5))
for url, status, body in results:
snippet = (body[:80].replace("\n", " ") + "...") if body else "N/A"
print(f"{status:>4} {url}\n {snippet}\n")
```
`fetch_one` owns the per-request lifecycle: it acquires the semaphore slot (bounding concurrency to `max_concurrency`), wraps `session.get` in `asyncio.wait_for` so the coroutine is cancelled and `asyncio.TimeoutError` is raised if the deadline passes, then reads and returns the body. Both `TimeoutError` and `aiohttp.ClientError` are caught locally so a single bad URL never poisons `gather`.
`fetch_all` creates one `Semaphore` and one `ClientSession` that is shared across all tasks (efficient connection pooling), builds a list of coroutine objects, and hands them all to `asyncio.gather`, which schedules them on the running event loop concurrently. Because each task must acquire the semaphore before issuing its HTTP request, at most `max_concurrency` requests are in-flight at any moment — the rest are suspended at the `async with semaphore` await point, cooperatively yielding control.
`asyncio.run` at the bottom creates a fresh event loop, drives `fetch_all` to completion, tears the loop down, and is correctly guarded by `if __name__ == "__main__"` so importing the module never triggers network I/O.
Grader chain
Each grader reads the same baseline + enhanced pair and casts an independent verdict. Disagreement is the framework's own signal — not noise to be averaged away.
V1samba/Meta-Llama-3.3-70B-Instruct
BetterV4qwen2.5-coder-3b-instruct
BetterV5cerebras/qwen-3-235b-a22b-instruct-2507
WorseV7qwen2.5-coder-7b-instruct
BetterV8openai/gpt-4.1
SameV9anthropic/claude-opus-4-7
WorseV12openai/gpt-4o
BetterComments
Our grader said what it said. What do you say? Comment as a guest below.
No comments yet. Be the first to say what you make of this round.