安装
uv pip install "baidusearch>=1.0.3"
异步代码如下:
import asyncio
from typing import List
from baidusearch.baidusearch import search
from app.tool.base import BaseTool
class BaiduSerach(BaseTool):
name: str = "baidu_search"
description: str = """Perform a Baidu search and return a list of relevant links.
Use this tool when you need to find information on the Chinese web, get up-to-date data, or research specific topics.
The tool returns a list of URLs that match the search query.
"""
parameters: dict = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "(required) The search query to submit to Google.",
},
"num_results": {
"type": "integer",
"description": "(optional) The number of search results to return. Default is 10.",
"default": 10,
},
},
"required": ["query"],
}
async def execute(self, query: str, num_results: int = 10) -> List[str]:
"""
Execute a Google search and return a list of URLs.
Args:
query (str): The search query to submit to Google.
num_results (int, optional): The number of search results to return. Default is 10.
Returns:
List[str]: A list of URLs matching the search query.
"""
# Run the search in a thread pool to prevent blocking
loop = asyncio.get_event_loop()
links = await loop.run_in_executor(
None, lambda: list(search(query, num_results=num_results))
)
return links
调用方式有两种:
方法一:在异步函数中使用 await
async def main():
baiduSearch = BaiduSerach()
results = await baiduSearch.execute("什么是百度")
print("BaiduSerach====", results)
asyncio.run(main())
方法二:在同步代码中用 asyncio.run()
包一层(适合脚本调用)
import asyncio
baiduSearch = BaiduSerach()
results = asyncio.run(baiduSearch.execute("什么是百度"))
print("BaiduSerach====", results)