DEV Community

Cover image for Pybit v5 how to subscribe to websocket topics
Kyle Foo
Kyle Foo

Posted on • Edited on

Pybit v5 how to subscribe to websocket topics

Well, in one of my previous post, I talked about Pybit's v2 API, and we are now at v5!

Quite some breaking updates we are expecting especially the parameters' format change for most APIs, basically it is changed from under_score to snakeCase. Upgrading it would make life easier down the road since the v5 documentations that are being maintained are much readable and informed. And of course, good amount of improvements come with it. See change log

Below is an example of using Pybit API to subscribe to account's order stream, so that we can trigger callback handler base on its socket events:

from pybit.unified_trading import HTTP
from pybit.unified_trading import WebSocket

api_key = os.environ.get("BYBIT_API_KEY")
api_secret = os.environ.get("BYBIT_API_SECRET")

class AlgoOrderStream:
    def __init__(self, trading_pair):
        self.trading_pair = trading_pair
        self.session = HTTP(testnet=True, api_key=api_key, api_secret=api_secret)
        print("Initialized ORDER_WEBSOCKET_STREAM")

    # callback handler when socket event triggers
    def handle_order_stream(self, message):
        data = message["data"]
        print(data)
        # You can see the changed order data here and perform action, below is just a example action
        # If Long position filled, place a conditional Short order
        if data[0]['side'] == 'Buy' and data[0]['orderStatus'] == 'Filled' :
            self.session.place_order(
                category="linear",
                symbol=self.trading_pair,
                side="Sell",
                orderType="Market",
                qty=0.01, # quantity
                triggerDirection=1, #triggered when market price rises to triggerPrice
                triggerPrice="YourPriceTarget",
                triggerBy="LastPrice",
                positionIdx=0,
                stopLoss="CutLossPrice",
                slTriggerBy="LastPrice",
            )

class AlgoBot:
    def __init__(self, trading_pair):
        self.trading_pair = trading_pair
        # initialize websocket instance and point to private topic -> your order stream
        self.ws = WebSocket(
            testnet=True,
            channel_type="private",
            api_key=api_key,
            api_secret=api_secret,
            trace_logging=True,
            restart_on_error=True
        )

    def start():
        orderStream = AlgoOrderStream(trading_pair=self.trading_pair)
        self.ws.order_stream(
            callback=orderStream.handle_order_stream
        )
Enter fullscreen mode Exit fullscreen mode
algoBot = AlgoBot(trading_pair="BTCUSDT")
algoBot.start()

Enter fullscreen mode Exit fullscreen mode

** Notice that the callback will only fire when there's updates to your order stream, do trigger order change manually for testing purpose.

Top comments (2)

Collapse
 
mrshoja profile image
karen & kavan

Hi there, I have a question about my below code that related to your post.
I wanna create order on Bybit Future but i got error. help me please

from pybit.unified_trading import HTTP
session = HTTP(
testnet=True,
api_key=api_key,
api_secret=api_secret,
)

print(session.place_order(
category="linear",
symbol="BTCUSDT",
side="Buy",
orderType="market",
qty="0.001",
timeInForce="GTC",
reduceOnly=False,
))

Error:
param_str = str(timestamp) + api_key + str(recv_window) + payload
TypeError: can only concatenate str (not "tuple") to str

Collapse
 
kylefoo profile image
Kyle Foo • Edited

@mrshoja According to v5 source code, i think your code fails here:
github.com/bybit-exchange/pybit/bl...

is your api_key in string format? or it might be the payload

might wanna trace from this place_order call: github.com/bybit-exchange/pybit/bl...