import pycurl
from io import BytesIO
import json
def fetch_api(url, method, header=None, data=None):
"""
url: 获取api的url
method: 请求方法
header: 请求头
data: 请求参数
"""
if method == "get": ???_Curl = pycurl.Curl() ???buf = BytesIO() ???_Curl.setopt(pycurl.WRITEFUNCTION,buf.write) ???_Curl.setopt(pycurl.URL, url) ???_Curl.setopt(pycurl.HTTPHEADER, header) ???_Curl.perform() ???result = buf.getvalue().decode("utf-8") ???http_code = _Curl.getinfo(_Curl.HTTP_CODE) ???dns_time = _Curl.getinfo(_Curl.NAMELOOKUP_TIME) * 1000 # DNS解析时间 ???connect_time = _Curl.getinfo(_Curl.CONNECT_TIME) * 1000 # 建连时间 ???pretransafe_time = _Curl.getinfo(_Curl.PRETRANSFER_TIME) * 1000 # 连接上后到开始传输时的时间 ???starttransafe_time =_Curl.getinfo(_Curl.STARTTRANSFER_TIME) * 1000 # 接收到第一个字节的时间 ???redirect_time = _Curl.getinfo(_Curl.REDIRECT_TIME) * 1000 # 重定向时间 ???total_time = _Curl.getinfo(_Curl.TOTAL_TIME) * 1000 # 请求总时间 ???download_size = _Curl.getinfo(_Curl.SIZE_DOWNLOAD) # 下载数据包大小 ???download_speed = _Curl.getinfo(_Curl.SPEED_DOWNLOAD) # 下载速度 ???buf.close() ???print(result) ???print(http_code,dns_time,connect_time,pretransafe_time,starttransafe_time,redirect_time,total_time,download_size,download_speed) ???_Curl.close()else: ???_Curl = pycurl.Curl() ???_Curl.setopt(pycurl.HTTPHEADER, header) ???buf = BytesIO() ???_Curl.setopt(pycurl.WRITEFUNCTION, buf.write) ???_Curl.setopt(pycurl.URL, url) ???_Curl.setopt(_Curl.POSTFIELDS, data) ???_Curl.perform() ???result = buf.getvalue().decode("utf-8") ???http_code = _Curl.getinfo(_Curl.HTTP_CODE) ???dns_time = _Curl.getinfo(_Curl.NAMELOOKUP_TIME) * 1000 ?# DNS解析时间 ???connect_time = _Curl.getinfo(_Curl.CONNECT_TIME) * 1000 ?# 建连时间 ???pretransafe_time = _Curl.getinfo(_Curl.PRETRANSFER_TIME) * 1000 ?# 连接上后到开始传输时的时间 ???starttransafe_time = _Curl.getinfo(_Curl.STARTTRANSFER_TIME) * 1000 ?# 接收到第一个字节的时间 ???redirect_time = _Curl.getinfo(_Curl.REDIRECT_TIME) * 1000 ?# 重定向时间 ???total_time = _Curl.getinfo(_Curl.TOTAL_TIME) * 1000 # 请求总时间 毫秒 ???download_size = _Curl.getinfo(_Curl.SIZE_DOWNLOAD) ?# 下载数据包大小 ???download_speed = _Curl.getinfo(_Curl.SPEED_DOWNLOAD) ?# 下载速度 ???buf.close() ???print(result) ???print(http_code, dns_time, connect_time, pretransafe_time, starttransafe_time, redirect_time, total_time, ?????????download_size, download_speed) ???_Curl.close()
"""
get请求测试
header = [
‘Content-Type:application/json; charset=utf-8‘
]
fetch_api(‘http://xxxx‘, ‘get‘, header)
post请求测试
header = [
‘Content-Type:application/json; charset=utf-8‘
]
data = {"userid":"%s","nickname":"zxpp","sex":"1","birthday":"2015-01-01"}
json_data = json.dumps(data)
fetch_api(‘http://xxxx‘, ‘post‘, header=header,data=json_data)
"""
@asyncio.coroutine
def fetch_async(func, url, method, header=None, data=None):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, func, url, method,header,data)
#response = yield from future ?# 因为 func 函数未返回数据,所以可以不用返回值
yield from future
header = [
‘Content-Type:application/json; charset=utf-8‘
]
data = {"userid":"%s","nickname":"zxpp","sex":"1","birthday":"2015-01-01"}
json_data = json.dumps(data)
tasks = [
fetch_async(fetch_api, url=‘http://xxxx‘, method=‘get‘,header=header),
fetch_async(fetch_api, url=‘http://xxxx‘, method=‘post‘, header=header,data=json_data)
]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
asyncio + pycurl + BytesIO 异步批量调用url请求
原文地址:https://blog.51cto.com/haoyonghui/2370145