-1

这是使用 kucoin-futures-python-sdk ( https://github.com/Kucoin/kucoin-futures-python-sdk ) 的 Python + Flask 应用程序。

这是从 github ( https://github.com/Kucoin/kucoin-futures-python-sdk )复制并修改为使用公共客户端而不是私人客户端的样板代码。这是使用 asyncio 订阅 Kucoin API 的实时比特币价格馈送的示例。

import asyncio
from kucoin_futures.client import WsToken
from kucoin_futures.ws_client import KucoinFuturesWsClient


async def main():
    async def deal_msg(msg):
        if msg['topic'] == '/contractMarket/level2:XBTUSDM':
            print(f'Get XBTUSDM Ticker:{msg["data"]}')
        elif msg['topic'] == '/contractMarket/level3:XBTUSDTM':
            print(f'Get XBTUSDTM level3:{msg["data"]}')

    # is public
    client = WsToken()
    # is private
    # client = WsToken(key='', secret='', passphrase='', is_sandbox=False, url='')
    # is sandbox
    # client = WsToken(is_sandbox=True)
    ws_client = await KucoinFuturesWsClient.create(loop, client, deal_msg, private=False)
    await ws_client.subscribe('/contractMarket/level2:XBTUSDM')
    await ws_client.subscribe('/contractMarket/level3:XBTUSDM')
    while True:
        await asyncio.sleep(60, loop=loop)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

这是 Flask 渲染部分:

@app.route('/', methods=['GET', 'POST'])
def index():
    form = Form() # referencing Flask-WTForm library

    if request.method == 'POST':
        # get data of selected contract (form.contract.data)
        symbol_ticker = get_symbol_ticker(form.contract.data)
        contract_detail = get_contract_detail(form.contract.data)

        return render_template('contract.html', contract=form.contract.data, account_overview=account_overview, contracts=contracts, symbol_ticker=symbol_ticker, contract_detail=contract_detail)

    if request.method == 'GET':
        return render_template('index.html', form=form, account_overview=account_overview, contracts=contracts)

如何在异步代码块之外访问 Kucoin websockets 数据?我尝试声明一个全局变量并在 async def deal_msg(msg) 中分配给它,但这并没有成功。

4

0 回答 0