这是使用 kucoin-futures-python-sdk ( https://github.com/Kucoin/kucoin-futures-python-sdk ) 的 Python + Flask 应用程序。
这是从 github ( https://github.com/Kucoin/kucoin-futures-python-sdk )复制并修改为使用公共客户端而不是私人客户端的样板代码。这是使用 asyncio 订阅 Kucoin API 的实时比特币价格馈送的示例。
import asyncio
from kucoin_futures.client import WsToken
from kucoin_futures.ws_client import KucoinFuturesWsClient
async def main():
async def deal_msg(msg):
if msg['topic'] == '/contractMarket/level2:XBTUSDM':
print(f'Get XBTUSDM Ticker:{msg["data"]}')
elif msg['topic'] == '/contractMarket/level3:XBTUSDTM':
print(f'Get XBTUSDTM level3:{msg["data"]}')
# is public
client = WsToken()
# is private
# client = WsToken(key='', secret='', passphrase='', is_sandbox=False, url='')
# is sandbox
# client = WsToken(is_sandbox=True)
ws_client = await KucoinFuturesWsClient.create(loop, client, deal_msg, private=False)
await ws_client.subscribe('/contractMarket/level2:XBTUSDM')
await ws_client.subscribe('/contractMarket/level3:XBTUSDM')
while True:
await asyncio.sleep(60, loop=loop)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
这是 Flask 渲染部分:
@app.route('/', methods=['GET', 'POST'])
def index():
form = Form() # referencing Flask-WTForm library
if request.method == 'POST':
# get data of selected contract (form.contract.data)
symbol_ticker = get_symbol_ticker(form.contract.data)
contract_detail = get_contract_detail(form.contract.data)
return render_template('contract.html', contract=form.contract.data, account_overview=account_overview, contracts=contracts, symbol_ticker=symbol_ticker, contract_detail=contract_detail)
if request.method == 'GET':
return render_template('index.html', form=form, account_overview=account_overview, contracts=contracts)
如何在异步代码块之外访问 Kucoin websockets 数据?我尝试声明一个全局变量并在 async def deal_msg(msg) 中分配给它,但这并没有成功。