在 async 函數中執行 sync blocking function
Demo Code
import asyncio
import requests # 同步 library
async def job_func():
loop = asyncio.get_running_loop()
# 把整個 blocking call 包起來
response = await loop.run_in_executor(
None,
lambda: requests.get("https://example.com")
)
return response.json()What Happened?
Problem
requests.get() 是同步 blocking 函數。如果直接在 async 函數裡呼叫,會卡死整個 Event Loop:
# ❌ 錯誤寫法:直接呼叫會阻塞 Event Loop
async def job_func():
response = requests.get("https://example.com") # 卡住整個 loopEvent Loop 是單執行緒,一旦被 blocking 函數佔住,所有其他 coroutine 都會停擺。
sequenceDiagram
participant EL as Event Loop(主執行緒)
participant A as coroutine A (job_func)
participant B as coroutine B
participant C as coroutine C
EL->>A: 執行 job_func()
A->>EL: requests.get()(blocking)
Note over EL,A: ⏸ Event Loop 凍結,等待網路 I/O
Note over B: 無法執行
Note over C: 無法執行
EL-->>A: 數秒後回傳 response
Note over EL: 才能繼續處理其他 coroutine
Explanation
1. asyncio.get_running_loop()
取得目前正在執行的 Event Loop 物件。
- 必須在 async 函數內呼叫
- 保證拿到的是「同一個」loop,
await才能正確接收結果 - 不能用
new_event_loop(),那會建立一個全新、閒置的 loop,Future 永遠不會被處理
2. loop.run_in_executor(None, lambda: requests.get(...))
做了三件事:
- 把
lambda丟到 ThreadPool(背景執行緒)去執行 - 建立一個 Future 物件代表「還沒完成的結果」
- 立刻回傳那個 Future(不等結果)
None 表示使用預設的 ThreadPoolExecutor,執行緒數量 = min(32, cpu_count + 4)。
3. await
Event Loop 看到 await 後:
- 把這個 coroutine 掛起,釋放執行緒
- 去跑其他 coroutine,不閒置
- 等 ThreadPool 的執行緒完成,把結果寫進 Future
- Future resolved → 喚醒這個 coroutine,繼續執行
stateDiagram-v2
direction LR
[*] --> 執行中
執行中 --> 掛起 : await Future\n(run_in_executor 建立)
掛起 --> 執行中 : Future resolved\n(ThreadPool 完成任務)
執行中 --> [*] : return
4. 為什麼用 lambda?
run_in_executor 只接受 callable,lambda 把 requests.get(url) 包成一個無參數的函數,方便傳入。
如果需要傳參數,也可以用 functools.partial:
from functools import partial
response = await loop.run_in_executor(
None,
partial(requests.get, "https://example.com", timeout=5)
)Workflow
sequenceDiagram
participant EL as Event Loop(主執行緒)
participant TP as ThreadPool(背景執行緒)
EL->>EL: job_func() 開始執行
EL->>TP: run_in_executor(lambda: requests.get(...))
Note over EL: 建立 Future,掛起 coroutine
EL->>EL: 去跑其他 coroutine
TP->>TP: requests.get() 執行中(等網路回應)
TP-->>EL: 完成,把 response 寫進 Future
Note over EL: Future resolved,喚醒 coroutine
EL->>EL: response = 拿到結果
EL->>EL: return response.json()
asyncio.to_thread(Python 3.9+)
用法
import asyncio
import requests
async def job_func():
response = await asyncio.to_thread(
requests.get, "https://example.com"
)
return response.json()它做了什麼?
asyncio.to_thread 是 run_in_executor 的高階封裝,內部等同於:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, func, *args, **kwargs)
flowchart TB
A["asyncio.to_thread(func, *args)"] --> B["loop.run_in_executor(None, func, *args)"]
B --> C["ThreadPoolExecutor(預設)"]
C --> D["背景執行緒執行 func()"]
差別在於它直接接受參數,不需要 lambda 或 partial:
# run_in_executor:需要包成 lambda 或用 partial
await loop.run_in_executor(None, lambda: requests.get(url, timeout=5))
# to_thread:直接傳函數與參數
await asyncio.to_thread(requests.get, url, timeout=5)比較:三種寫法
run_in_executor | to_thread | |
|---|---|---|
| Python 版本 | 3.4+ | 3.9+ |
| 寫法複雜度 | 較繁瑣,需要 get_running_loop() | 簡潔,直接呼叫 |
| 傳參數 | 需要 lambda 或 partial | 直接傳 *args, **kwargs |
| 自訂 Executor | 可以指定 ThreadPoolExecutor | 只能用預設 Executor |
| 底層機制 | 直接操作 loop | 封裝 run_in_executor(None, ...) |
什麼時候用哪個?
- 一般情況 → 用
to_thread,更簡潔易讀 - 需要自訂 thread 數量或使用
ProcessPoolExecutor→ 用run_in_executor
關鍵原則
requests.get()跑在別的執行緒,Event Loop 的執行緒完全沒被卡住,其他 coroutine 可以繼續執行。
| 執行位置 | 是否阻塞 Event Loop | |
|---|---|---|
requests.get() 直接呼叫 | Event Loop 執行緒 | ✅ 會阻塞 |
透過 run_in_executor | ThreadPool 執行緒 | ❌ 不會阻塞 |
Last updated on