在 async 函數中執行 sync blocking function

在 async 函數中執行 sync blocking function

Demo Code

import asyncio
import requests  # 同步 library

async def job_func():
    loop = asyncio.get_running_loop()

    # 把整個 blocking call 包起來
    response = await loop.run_in_executor(
        None,
        lambda: requests.get("https://example.com")
    )

    return response.json()

What Happened?

Problem

requests.get() 是同步 blocking 函數。如果直接在 async 函數裡呼叫,會卡死整個 Event Loop:

# ❌ 錯誤寫法:直接呼叫會阻塞 Event Loop
async def job_func():
    response = requests.get("https://example.com")  # 卡住整個 loop

Event Loop 是單執行緒,一旦被 blocking 函數佔住,所有其他 coroutine 都會停擺。

  sequenceDiagram
    participant EL as Event Loop(主執行緒)
    participant A as coroutine A (job_func)
    participant B as coroutine B
    participant C as coroutine C

    EL->>A: 執行 job_func()
    A->>EL: requests.get()(blocking)
    Note over EL,A: ⏸ Event Loop 凍結,等待網路 I/O
    Note over B: 無法執行
    Note over C: 無法執行
    EL-->>A: 數秒後回傳 response
    Note over EL: 才能繼續處理其他 coroutine

Explanation

1. asyncio.get_running_loop()

取得目前正在執行的 Event Loop 物件。

  • 必須在 async 函數內呼叫
  • 保證拿到的是「同一個」loop,await 才能正確接收結果
  • 不能用 new_event_loop(),那會建立一個全新、閒置的 loop,Future 永遠不會被處理

2. loop.run_in_executor(None, lambda: requests.get(...))

做了三件事:

  1. lambda 丟到 ThreadPool(背景執行緒)去執行
  2. 建立一個 Future 物件代表「還沒完成的結果」
  3. 立刻回傳那個 Future(不等結果)

None 表示使用預設的 ThreadPoolExecutor,執行緒數量 = min(32, cpu_count + 4)

3. await

Event Loop 看到 await 後:

  • 把這個 coroutine 掛起,釋放執行緒
  • 去跑其他 coroutine,不閒置
  • 等 ThreadPool 的執行緒完成,把結果寫進 Future
  • Future resolved → 喚醒這個 coroutine,繼續執行
  stateDiagram-v2
    direction LR
    [*] --> 執行中
    執行中 --> 掛起 : await Future\n(run_in_executor 建立)
    掛起 --> 執行中 : Future resolved\n(ThreadPool 完成任務)
    執行中 --> [*] : return

4. 為什麼用 lambda

run_in_executor 只接受 callablelambdarequests.get(url) 包成一個無參數的函數,方便傳入。

如果需要傳參數,也可以用 functools.partial

from functools import partial

response = await loop.run_in_executor(
    None,
    partial(requests.get, "https://example.com", timeout=5)
)

Workflow

  sequenceDiagram
    participant EL as Event Loop(主執行緒)
    participant TP as ThreadPool(背景執行緒)

    EL->>EL: job_func() 開始執行
    EL->>TP: run_in_executor(lambda: requests.get(...))
    Note over EL: 建立 Future,掛起 coroutine
    EL->>EL: 去跑其他 coroutine

    TP->>TP: requests.get() 執行中(等網路回應)
    TP-->>EL: 完成,把 response 寫進 Future

    Note over EL: Future resolved,喚醒 coroutine
    EL->>EL: response = 拿到結果
    EL->>EL: return response.json()

asyncio.to_thread(Python 3.9+)

用法

import asyncio
import requests

async def job_func():
    response = await asyncio.to_thread(
        requests.get, "https://example.com"
    )
    return response.json()

它做了什麼?

asyncio.to_threadrun_in_executor 的高階封裝,內部等同於:

loop = asyncio.get_running_loop()
await loop.run_in_executor(None, func, *args, **kwargs)
  flowchart TB
    A["asyncio.to_thread(func, *args)"] --> B["loop.run_in_executor(None, func, *args)"]
    B --> C["ThreadPoolExecutor(預設)"]
    C --> D["背景執行緒執行 func()"]

差別在於它直接接受參數,不需要 lambdapartial

# run_in_executor:需要包成 lambda 或用 partial
await loop.run_in_executor(None, lambda: requests.get(url, timeout=5))

# to_thread:直接傳函數與參數
await asyncio.to_thread(requests.get, url, timeout=5)

比較:三種寫法

run_in_executorto_thread
Python 版本3.4+3.9+
寫法複雜度較繁瑣,需要 get_running_loop()簡潔,直接呼叫
傳參數需要 lambdapartial直接傳 *args, **kwargs
自訂 Executor可以指定 ThreadPoolExecutor只能用預設 Executor
底層機制直接操作 loop封裝 run_in_executor(None, ...)

什麼時候用哪個?

  • 一般情況 → 用 to_thread,更簡潔易讀
  • 需要自訂 thread 數量或使用 ProcessPoolExecutor → 用 run_in_executor

關鍵原則

requests.get() 跑在別的執行緒,Event Loop 的執行緒完全沒被卡住,其他 coroutine 可以繼續執行。

執行位置是否阻塞 Event Loop
requests.get() 直接呼叫Event Loop 執行緒✅ 會阻塞
透過 run_in_executorThreadPool 執行緒❌ 不會阻塞
Last updated on