Source code for lobster_reconstructor.lobster_sim

import os
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import numpy as np
import csv
from scipy.stats import zscore
from typing import Literal

from .orderbook import Orderbook
from .orders import Order
from .utils import format_timestamp
from dash import Dash, dcc, html, Input, Output, State, callback_context
from plotly.subplots import make_subplots


[docs] class MatchingError(Exception): def __init__(self, side, csv_price, csv_size, recon_price, recon_size, message): self.side = side self.csv_price = csv_price self.csv_size = csv_size self.recon_price = recon_price self.recon_size = recon_size super().__init__(message)
[docs] class LobsterSim: """ LOBSTER simulation and visualization interface. Provides functionality for replaying limit order book events, computing order flow imbalance (OFI), and generating visualizations using Plotly and Dash. Parameters ---------- orderbook : Orderbook Orderbook object to operate on. See :class:`Orderbook` in `orderbook.py` for full definition. msg_book_file_path : str LOBSTER message.csv file path. lob_book_file_path : str, default=None LOBSTER orderbook.csv file path. Not necessary for end user (just use default val), used solely in debugging/testing to ensure matching between reconstructed and expected. Attributes ---------- orderbook : Orderbook Orderbook object to operate on. See :class:`Orderbook` in `orderbook.py` for full definition. dataM : pd.DataFrame Contains message data pulled from LOBSTER message.csv with columns: - `Time`: float - `Type`: Literal['submit', 'cancel', 'delete', 'vis_exec', 'hid_exec', 'cross', 'halt'] - `OrderID`: int - `Size`: int - `Price`: int - `Direction`: Literal['bid', 'ask'] """ def __init__(self, orderbook: Orderbook, msg_book_file_path: str, lob_book_file_path: str = None): self.orderbook = orderbook self._last_idx = 0 columns = ["Time", "Type", "OrderID", "Size", "Price", "Direction"] dtype_map = { "Time": float, "Type": "Int64", "OrderID": "Int64", "Size": "Int64", "Price": "Int64", "Direction": "Int64" } dataM = pd.read_csv( msg_book_file_path, header=None, names=columns, usecols=range(len(columns)), # drop any extra columns in the file dtype=dtype_map, na_values=["", "NA"], # treat blanks as NaN low_memory=False ) # dataM = dataM[~dataM['Type'].isin([6, 7])] #Remove halts and auction trades event_map = { 1: 'submit', 2: 'cancel', 3: 'delete', 4: 'vis_exec', 5: 'hid_exec', 6: 'cross', 7: 'halt' } dataM['Type'] = dataM['Type'].map(event_map) dataM['Direction'] = dataM['Direction'].map({-1: 'ask', 1: 'bid'}) self.dataM = dataM if lob_book_file_path is None: self._dataL = None else: sample = pd.read_csv(lob_book_file_path, nrows=1, header=None) num_cols = sample.shape[1] if num_cols % 4 != 0: raise ValueError("Orderbook file column count is not a multiple of 4.") num_levels = num_cols // 4 col_names = [] for i in range(1, num_levels + 1): col_names.extend([ f"AskPrice{i}", f"AskSize{i}", f"BidPrice{i}", f"BidSize{i}" ]) dataL = pd.read_csv( lob_book_file_path, header=None, names=col_names, dtype=int, low_memory=False ) self._dataL = dataL
[docs] def simulate_until(self, time: float) -> None: """ Resets orderbook state. Reconstructs orderbook state from beginning of message file until specified timestamp. Parameters ---------- time : float Time in seconds after midnight to simulate until """ self._last_idx = 0 self.orderbook.clear_orderbook() for row in self.dataM.itertuples(index=False): if row.Time > time: break curr_order = Order(row.Time, row.Type, row.OrderID, row.Size, row.Price, row.Direction) self.orderbook.process_order(curr_order) self._last_idx += 1
[docs] def simulate_from_current_until(self, time: float) -> None: """ Continue reconstructing the order book from the current simulation state up to a specified timestamp. Does NOT reset orderbook state, unlike simulate_until. Parameters ---------- time : float Time in seconds after midnight to simulate until. Raises ------ ValueError If `time` is earlier than the current order book timestamp. """ if time < self.orderbook.curr_book_timestamp: raise ValueError("time parameter must be greater than current book timestamp") for row in self.dataM.iloc[self._last_idx:].itertuples(index=False): if row.Time > time: break curr_order = Order(row.Time, row.Type, row.OrderID, row.Size, row.Price, row.Direction) self.orderbook.process_order(curr_order) self._last_idx += 1
[docs] def display_L3_snapshots(self, start_time: float, end_time: float, interval: float) -> None: """ Display multiple L3 order book snapshots as subplots over a specified time range. Simulates the order book from `start_time` to `end_time` and generates a Plotly figure with subplots showing the L3 state at each interval. Each subplot title includes the current time, midprice, and bid-ask spread. Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation and plotting. end_time : float Timestamp (seconds after midnight) to end the simulation and plotting. interval : float Time interval (in seconds) between consecutive snapshots. """ self.simulate_until(start_time) curr_time = start_time + interval traces_tuples = [] subplot_titles = [] while curr_time <= end_time: self.simulate_from_current_until(curr_time) subplot_titles.append(f"Time: {format_timestamp(curr_time)}<br>" f"Mid Price: {self.orderbook.midprice*self.orderbook.price_scaling:.2f}<br>" f"Spread: {self.orderbook.bid_ask_spread()*self.orderbook.price_scaling:.2f}") traces_tuples.append(self.orderbook._get_L3_plot_traces()) curr_time += interval cols = 3 rows = (len(traces_tuples) + cols-1)//cols fig = make_subplots(rows=rows, cols=cols, subplot_titles=subplot_titles) for i, traces in enumerate(traces_tuples): for trace in traces: fig.add_trace(trace, row=i // cols + 1, col=i % cols + 1) fig.update_traces(width=self.orderbook.tick_size) fig.update_layout( height=300 * len(traces_tuples), title_text=f"{self.orderbook.ticker} L3 Snapshots", showlegend=False ) fig.show()
[docs] def display_L2_snapshots(self, start_time: float, end_time: float, interval: float) -> None: """ Display multiple L2 order book snapshots as subplots over a specified time range. Simulates the order book from `start_time` to `end_time` and generates a Plotly figure with subplots showing the L3 state at each interval. Each subplot title includes the current time, midprice, and bid-ask spread. Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation and plotting. end_time : float Timestamp (seconds after midnight) to end the simulation and plotting. interval : float Time interval (in seconds) between consecutive snapshots. """ self.simulate_until(start_time) curr_time = start_time + interval traces_tuples = [] subplot_titles = [] while curr_time <= end_time: self.simulate_from_current_until(curr_time) subplot_titles.append( f"Time: {format_timestamp(curr_time)}<br>" f"Mid Price: {self.orderbook.midprice * self.orderbook.price_scaling:.2f}<br>" f"Spread: {self.orderbook.bid_ask_spread() * self.orderbook.price_scaling:.2f}" ) traces_tuples.append(self.orderbook._get_L2_plot_traces()) curr_time += interval cols = 3 rows = (len(traces_tuples) + cols - 1) // cols fig = make_subplots(rows=rows, cols=cols, subplot_titles=subplot_titles) for i, traces in enumerate(traces_tuples): for trace in traces: fig.add_trace(trace, row=i // cols + 1, col=i % cols + 1) fig.update_traces(width=self.orderbook.tick_size) fig.update_layout( height=300 * len(traces_tuples), title_text=f"{self.orderbook.ticker} L2 Snapshots", showlegend=False ) fig.show()
[docs] def sim_size_OFI(self, start_time: float, end_time: float) -> int: """ Simulate the order book between two timestamps and compute the cumulative size-based Order Flow Imbalance (OFI). Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation. end_time : float Timestamp (seconds after midnight) to end the simulation. Returns ------- int The cumulative size-based OFI computed over the simulation interval. Notes ----- The method resets the cumulative OFI at the start of the simulation, then processes all messages between `start_time` and `end_time`. """ self.simulate_until(start_time) self.orderbook.reset_cum_OFI() self.simulate_from_current_until(end_time) return self.orderbook.calc_size_OFI()
[docs] def sim_count_OFI(self, start_time: float, end_time: float) -> int: """ Simulate the order book between two timestamps and compute the cumulative count-based Order Flow Imbalance (OFI). Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation. end_time : float Timestamp (seconds after midnight) to end the simulation. Returns ------- int The cumulative count-based OFI computed over the simulation interval. Notes ----- The method resets the cumulative OFI at the start of the simulation, then processes all messages between `start_time` and `end_time`. """ self.simulate_until(start_time) self.orderbook.reset_cum_OFI() self.simulate_from_current_until(end_time) return self.orderbook.calc_count_OFI()
[docs] def create_animated_L3_app(self, start_time: float, end_time: float, interval: float) -> Dash: """ Create an interactive Dash application showing an animated L3 order book. The application displays horizontal bar charts of order sizes at each price level, updating over time to animate the evolution of the L3 book. Users can play/pause the animation or manually slide through frames. Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation. end_time : float Timestamp (seconds after midnight) to end the simulation. interval : float Time interval (in seconds) between consecutive frames. Returns ------- dash.Dash A Dash application instance that can be run or embedded in a web server. Notes ----- - The method simulates the order book over the specified interval and stores snapshots in memory. - Each frame shows a horizontal bar chart of L3 order sizes by price and direction. - Users can interact via a play/pause button and a slider for manual navigation. """ frames = [] timestamps = [] self.simulate_until(start_time) curr_time = start_time while curr_time <= end_time: self.simulate_from_current_until(curr_time) df = self.orderbook.convert_orderbook_to_L3_dataframe() df.price = df.price * self.orderbook.price_scaling frames.append(df) timestamps.append(curr_time) curr_time += interval all_prices = pd.concat([df["price"] for df in frames]) price_min = all_prices.min() price_max = all_prices.max() app = Dash(__name__) app.layout = html.Div([ dcc.Graph(id='l3-graph'), html.Div([ html.Button("⏯ Play/Pause", id="play-pause", n_clicks=0) ], style={'marginTop': '10px'}), dcc.Slider( id='frame-slider', min=0, max=len(frames) - 1, step=1, value=0, marks={i: str(i) for i in range(0, len(frames), max(1, len(frames) // 10))}, tooltip={"placement": "bottom", "always_visible": True} ), dcc.Interval(id='interval', interval=2000, n_intervals=0), dcc.Store(id='paused', data=False), ]) @app.callback( Output('l3-graph', 'figure'), Output('frame-slider', 'value'), Output('paused', 'data'), Input('interval', 'n_intervals'), Input('frame-slider', 'value'), Input('play-pause', 'n_clicks'), State('paused', 'data') ) def update_l3_graph(n_intervals, slider_value, play_clicks, paused): ctx = callback_context triggered = ctx.triggered[0]['prop_id'].split('.')[0] if triggered == "play-pause": paused = not paused if triggered == "interval" and not paused: frame = (slider_value + 1) % len(frames) elif triggered == "frame-slider": frame = slider_value else: frame = slider_value df = frames[frame] timestamp = timestamps[frame] fig = px.bar( df, orientation='h', x="size", y="price", color="direction", title=f"{self.orderbook.ticker}<br><sup>{format_timestamp(timestamp)}", color_discrete_sequence=["green", "red"] ) fig.update_layout( xaxis=dict(range=[0, 2000], autorange=False), yaxis=dict(range=[price_min, price_max], autorange=False), uirevision="static", height=600 ) return fig, frame, paused return app
[docs] def plot_price_levels_heatmap(self, start_time: float, end_time: float, interval: float, show_midprice:bool=True) -> None: """ Creates a heatmap graph of order book price levels over time. The heatmap visualizes the depth of the order book at different price levels over a specified time range. The x-axis represents time, the y-axis represents price, and the color intensity at each point indicates the total size (volume) of orders at that price level at that specific time. Parameters ---------- start_time : float The timestamp in seconds after midnight to begin the simulation and plotting. end_time : float The timestamp in seconds after midnight to end the simulation and plotting. interval : float The time interval in seconds between each data point (snapshot) on the heatmap. show_midprice : bool, optional If True, a white line representing the mid-price of the order book is overlaid on the heatmap. Defaults to True. Notes ----- - The `self.simulate_until()` and `self.simulate_from_current_until()` methods are used to advance the simulation and collect order book snapshots. - The price values are scaled by `self.orderbook.price_scaling` for accurate visualization. - This function uses the `plotly.graph_objects` library to generate an interactive heatmap. """ l2_snapshots = [] timestamps = [] midprices = [] curr_time = start_time self.simulate_until(curr_time) while curr_time <= end_time: self.simulate_from_current_until(curr_time) df = self.orderbook.convert_orderbook_to_L2_dataframe() df.price = df.price * self.orderbook.price_scaling l2_snapshots.append(df) timestamps.append(format_timestamp(curr_time)) midprices.append(self.orderbook.mid_price() * self.orderbook.price_scaling) curr_time += interval all_prices = sorted(set().union(*(df['price'] for df in l2_snapshots))) price_to_idx = {price: i for i, price in enumerate(all_prices)} heatmap = np.zeros((len(all_prices), len(l2_snapshots))) for t, snapshot in enumerate(l2_snapshots): for _, row in snapshot.iterrows(): price = row['price'] size = row['size'] i = price_to_idx[price] heatmap[i, t] = size fig = go.Figure() fig.add_trace(go.Heatmap( z=heatmap, x=timestamps, y=all_prices, colorscale='Turbo', colorbar=dict(title='Size'), zsmooth='best' )) if show_midprice: fig.add_trace(go.Scatter( x=timestamps, y=midprices, mode='lines', line=dict(color='white', width=2), name='Midprice', )) fig.update_layout( title=f'{self.orderbook.ticker} Orderbook Price Level Heatmap', template='plotly_dark', xaxis_title='Time', yaxis_title='Price', height=800, width=1400, margin=dict(l=40, r=40, t=40, b=40) ) fig.show()
[docs] def size_OFI_graph(self, start_time: float, end_time: float, frame_interval: float, reset_ofi_interval: float =np.inf) -> None: """ Plots a time series graph of the cumulative Size Order Flow Imbalance (OFI). This function simulates the order book over a specified time range, calculating the cumulative Size OFI at regular intervals and plotting the results. The Size OFI measures the imbalance between the total size of buy and sell orders. Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation. end_time : float Timestamp (seconds after midnight) to end the simulation. frame_interval : float Time interval (in seconds) between each point plotted on the graph. reset_ofi_interval : float, optional The time interval (in seconds) at which the cumulative OFI value is reset to zero. Defaults to `np.inf`, meaning the OFI is never reset within the plotting range. """ timestamps = [] ofi_values = [] self.simulate_until(start_time) self.orderbook.reset_cum_OFI() curr_time = start_time reset_time = 0 while curr_time <= end_time: if reset_time >= reset_ofi_interval: self.orderbook.reset_cum_OFI() reset_time = 0 self.simulate_from_current_until(curr_time) ofi_values.append(self.orderbook.calc_size_OFI()) timestamps.append(format_timestamp(curr_time)) curr_time += frame_interval reset_time += frame_interval fig = go.Figure() fig.add_trace(go.Scatter( x=timestamps, y=ofi_values, mode='lines+markers', line=dict(color='cyan'), name='Size OFI' )) fig.update_layout( title=f"{self.orderbook.ticker} OFI Time Series", xaxis_title='Time', yaxis_title='Order Flow Imbalance', template='plotly_dark', height=500, width=1200, margin=dict(l=40, r=40, t=40, b=40) ) fig.show()
[docs] def count_OFI_graph(self, start_time: float, end_time: float, frame_interval: float, reset_ofi_interval: float=np.inf) -> None: """ Plots a time series graph of the cumulative Count Order Flow Imbalance (OFI). This function simulates the order book over a specified time range, calculating the cumulative Count OFI at regular intervals and plotting the results. The Count OFI measures the imbalance between the number of buy and sell orders. Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation. end_time : float Timestamp (seconds after midnight) to end the simulation. frame_interval : float Time interval (in seconds) between each point plotted on the graph. reset_ofi_interval : float, optional The time interval (in seconds) at which the cumulative OFI value is reset to zero. Defaults to `np.inf`, meaning the OFI is never reset within the plotting range. """ timestamps = [] ofi_values = [] self.simulate_until(start_time) self.orderbook.reset_cum_OFI() curr_time = start_time reset_time = 0 while curr_time <= end_time: if reset_time >= reset_ofi_interval: self.orderbook.reset_cum_OFI() reset_time = 0 self.simulate_from_current_until(curr_time) ofi_values.append(self.orderbook.calc_count_OFI()) timestamps.append(format_timestamp(curr_time)) curr_time += frame_interval reset_time += frame_interval fig = go.Figure() fig.add_trace(go.Scatter( x=timestamps, y=ofi_values, mode='lines+markers', line=dict(color='cyan'), name='Size OFI' )) fig.update_layout( title=f"{self.orderbook.ticker} OFI Time Series", xaxis_title='Time', yaxis_title='Order Flow Imbalance', template='plotly_dark', height=500, width=1200, margin=dict(l=40, r=40, t=40, b=40) ) fig.show()
[docs] def midprice_graph(self, start_time: float, end_time: float, interval: float) -> None: """ Plots a time series graph of the mid-price of the order book. This function simulates the order book over a specified time range, capturing the mid-price at regular intervals and plotting the results as a line graph. Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation. end_time : float Timestamp (seconds after midnight) to end the simulation. interval : float Time interval (in seconds) between each data point plotted on the graph. """ timestamps = [] midprices = [] curr_time = start_time self.simulate_until(curr_time) while curr_time <= end_time: self.simulate_from_current_until(curr_time) timestamps.append(format_timestamp(curr_time)) midprices.append(self.orderbook.mid_price() * self.orderbook.price_scaling) curr_time += interval fig = go.Figure() fig.add_trace(go.Scatter( x=timestamps, y=midprices, mode='lines', line=dict(color='white', width=2), name='Midprice', )) fig.update_layout( title=f"{self.orderbook.ticker} Mid Price", xaxis_title='Time', yaxis_title='Price', template='plotly_dark', height=500, width=1200, margin=dict(l=40, r=40, t=40, b=40) ) fig.show()
[docs] def depth_percentile_graph(self, start_time: float, end_time: float, interval: float) -> None: """ Creates a heatmap graph of order book depth in basis points (BPS) from the mid-price. The heatmap visualizes the depth of the order book relative to the mid-price over time. The x-axis is time, the y-axis is the price level in BPS from the mid-price, and the color intensity at each point represents the size (volume) at that price level. A white horizontal line at 0 BPS indicates the mid-price. Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation. end_time : float Timestamp (seconds after midnight) to end the simulation. interval : float Time interval (in seconds) between each data point (snapshot) on the heatmap. """ timestamps = [] l2_snapshots = [] midprices = [] curr_time = start_time self.simulate_until(curr_time) while curr_time <= end_time: self.simulate_from_current_until(curr_time) df = self.orderbook.convert_orderbook_to_L2_dataframe() df.price = df.price.astype(float) * self.orderbook.price_scaling l2_snapshots.append(df) timestamps.append(format_timestamp(curr_time)) midprices.append(self.orderbook.mid_price() * self.orderbook.price_scaling) curr_time += interval all_bps = set() for i, df in enumerate(l2_snapshots): bps_values = ((df["price"] - midprices[i]) / midprices[i] * 10000) bps_values = bps_values.round().astype(int) all_bps.update(bps_values.tolist()) all_bps = sorted(all_bps) bps_abs_max = abs(max(all_bps[0], -all_bps[-1])) bps_to_idx = {bps: i for i, bps in enumerate(all_bps)} heatmap = np.zeros((len(all_bps), len(l2_snapshots))) for t, snapshot in enumerate(l2_snapshots): for _, row in snapshot.iterrows(): bps = int(round(((row['price'] - midprices[t]) / midprices[t] * 10000))) size = row['size'] i = bps_to_idx[bps] heatmap[i, t] = size fig = go.Figure(data=go.Heatmap( z=heatmap, x=timestamps, y=all_bps, colorscale='Turbo', colorbar=dict(title='Volume'), )) fig.update_layout( title="Depth Percentile Graph", xaxis_title="Time", yaxis=dict(title="BPS from Midprice", range=[-bps_abs_max, bps_abs_max], autorange=False), template="plotly_dark", height=600, ) fig.add_shape( type="line", x0=0, x1=1, y0=0, y1=0, xref="paper", yref="y", line=dict( color="white", width=2 ) ) fig.show()
[docs] def graph_trade_arrival_time(self, start_time: float, end_time: float, bin_size: float =None, filter_trade_type: Literal["aggro_lim", "vis_exec", "hid_exec"] = None) -> None: """ Graphs the arrival count of bid and ask trades over time. The function simulates trades within a specified time range, aggregates them into time bins, and plots a bar chart showing the number of buy (bid) and sell (ask) trades in each bin. Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation. end_time : float Timestamp (seconds after midnight) to end the simulation. bin_size : float, optional The size of each time bin in seconds. If None, the bin size is set to 1/100th of the total time range. filter_trade_type : Literal["aggro_lim", "vis_exec", "hid_exec"], optional A filter to display only a specific type of trade. Defaults to None, meaning all trade types are included. """ if bin_size is None: bin_size = (end_time - start_time) / 100 self.simulate_until(start_time) self.orderbook.clear_trade_log() self.simulate_from_current_until(end_time) df = pd.DataFrame(self.orderbook.trade_log) df["time_bin"] = (df["timestamp"] // bin_size) * bin_size if filter_trade_type is not None: df = df[df["trade_type"] == filter_trade_type] grouped = df.groupby(["time_bin", "direction"]).size().unstack(fill_value=0) if "bid" not in grouped.columns: grouped["bid"] = 0 if "ask" not in grouped.columns: grouped["ask"] = 0 grouped = grouped.sort_index() max_count = max(grouped["bid"].max(), grouped["ask"].max()) fig = go.Figure() fig.add_trace(go.Bar( x=[format_timestamp(ts) for ts in grouped.index], y=grouped["bid"], name="Bids", marker_color="blue" )) fig.add_trace(go.Bar( x=[format_timestamp(ts) for ts in grouped.index], y=-grouped["ask"], name="Asks", marker_color="red" )) fig.update_layout( title="Bid vs Ask Trade Counts Over Time", xaxis_title="Time", yaxis_title="# of Trades", barmode='relative', bargap=0.1, legend=dict(x=1, y=1), yaxis=dict( range=[-max_count, max_count], zeroline=True, zerolinewidth=2, zerolinecolor='black', tickmode='linear', tick0=0, dtick=100 ) ) fig.show()
[docs] def graph_trade_size_distribution(self, start_time: float, end_time: float, bin_size:int=20, filter_trade_type: Literal["aggro_lim", "vis_exec", "hid_exec"] = None) -> None: """ Graphs the size distribution of bid and ask trades. This function simulates trades within a specified time range, filters out outliers using Z-score, and then creates a bar chart showing the distribution of trade sizes for both bids and asks. Parameters ---------- start_time : float Timestamp (seconds after midnight) to start the simulation. end_time : float Timestamp (seconds after midnight) to end the simulation. bin_size : int, optional The size of each trade size bin. Defaults to 20. filter_trade_type : Literal["aggro_lim", "vis_exec", "hid_exec"], optional A filter to display only a specific type of trade. Defaults to None, meaning all trade types are included. """ self.simulate_until(start_time) self.orderbook.clear_trade_log() self.simulate_from_current_until(end_time) df = pd.DataFrame(self.orderbook.trade_log) if df.empty: print("No trades in the given time range.") return df["z_score"] = zscore(df["size"]) df = df[df["z_score"].abs() <= 3] df.drop(columns="z_score", inplace=True) if df.empty: print("All trades were filtered out as outliers.") return if filter_trade_type is not None: df = df[df["trade_type"] == filter_trade_type] df["size_bin"] = (df["size"] // bin_size) * bin_size grouped = df.groupby(["size_bin", "direction"]).size().unstack(fill_value=0) if "bid" not in grouped.columns: grouped["bid"] = 0 if "ask" not in grouped.columns: grouped["ask"] = 0 grouped = grouped.sort_index() max_count = max(grouped["bid"].max(), grouped["ask"].max()) fig = go.Figure() fig.add_trace(go.Bar( x=grouped.index, y=grouped["bid"], name="Bids", marker_color="blue" )) fig.add_trace(go.Bar( x=grouped.index, y=-grouped["ask"], name="Asks", marker_color="red" )) fig.update_layout( title="Bid & Ask Trade Sizes Distribution (Outliers Removed)", xaxis_title="Size of Trade", yaxis_title="# of Trades", barmode='relative', bargap=0.1, legend=dict(x=1, y=1), yaxis=dict( range=[-max_count, max_count], zeroline=True, zerolinewidth=2, zerolinecolor='black', tickmode='linear', tick0=0, dtick=max(1, int(max_count / 5)) ) ) fig.show()
[docs] def print_features_to_csv( self, filename: str, start_time: float, end_time: float, interval: float, features: dict, batch_date: str, # e.g., "2025-08-20" (YYYY-MM-DD recommended) symbol: str, # ticker; must match existing file's ticker to append directory: str = ".", timestamp_round: int = 9, # rounding for overlap checks to avoid float noise ) -> None: """ Exports order book features to a CSV file with a default 'timestamp' column. This function simulates the order book over a specified time range at fixed intervals, computes user-specified features, and writes the results to a CSV. If the file already exists and both its schema and ticker match, non-overlapping rows are appended and the data is re-sorted so earlier times appear first. Otherwise, the file is overwritten with the new data. Parameters ---------- filename : str Base name for the CSV file ('.csv' is added automatically if missing). start_time : float Timestamp (seconds after midnight) to start the simulation. end_time : float Timestamp (seconds after midnight) to end the simulation. interval : float Time step in seconds between samples (must be > 0). features : dict Dictionary where keys are feature names and values are dictionaries specifying the order book method to call and its arguments. Example: {"mid_price": {"method": "mid_price", "args": []}}. batch_date : str The trading date to associate with the exported rows (e.g., "2025-08-20"). symbol : str The ticker symbol; must match the file's ticker to append, otherwise the file is overwritten. directory : str, optional Output directory for the CSV. Defaults to the current directory ".". timestamp_round : int, optional Decimal places to round timestamps for overlap checks. Defaults to 9. Returns ------- None Writes the features to a CSV file. Prints status messages indicating whether rows were written, appended, dropped due to overlap, or skipped. """ if interval <= 0: raise ValueError("interval must be > 0") if end_time < start_time: raise ValueError("end_time must be >= start_time") fname = filename if filename.lower().endswith(".csv") else f"{filename}.csv" path = os.path.join(directory, fname) os.makedirs(directory, exist_ok=True) base_cols = ["date", "ticker", "timestamp"] feature_cols = list(features.keys()) expected_feature_set = set(feature_cols) expected_len = len(base_cols) + len(feature_cols) do_append = False existing_header = None if os.path.exists(path): try: existing_header = list(pd.read_csv(path, nrows=0).columns) if len(existing_header) == expected_len: existing_set = set(existing_header) if existing_set == (set(base_cols) | expected_feature_set): # Check ticker(s) existing_symbols = pd.read_csv(path, usecols=["ticker"])["ticker"].dropna().unique() if len(existing_symbols) == 0 or (len(existing_symbols) == 1 and existing_symbols[0] == symbol): do_append = True except Exception: do_append = False if do_append and existing_header is not None: write_cols = existing_header else: write_cols = base_cols + feature_cols self.simulate_until(start_time) rows, ts_list = [], [] t = start_time while t <= end_time + 1e-12: self.simulate_from_current_until(t) row = {} for feat_name, spec in features.items(): method_name = spec.get("method") args = spec.get("args", []) if not hasattr(self.orderbook, method_name): raise AttributeError(f"Orderbook has no method '{method_name}'") method = getattr(self.orderbook, method_name) try: value = method(*args) except Exception as e: value = None print(f"Error computing {feat_name} at {t}: {e}") row[feat_name] = value rows.append(row) ts_list.append(t) t += interval new_df = pd.DataFrame(rows) new_df.insert(0, "timestamp", ts_list) new_df.insert(0, "ticker", symbol) new_df.insert(0, "date", batch_date) new_df = new_df.reindex(columns=write_cols) if not do_append: new_df.to_csv(path, index=False) print(f"Wrote {path} ({len(new_df)} rows) [overwrote: new file/schema/ticker mismatch]") return try: existing_df = pd.read_csv(path) except Exception: new_df.to_csv(path, index=False) print(f"Wrote {path} ({len(new_df)} rows) [fallback overwrite: failed to read existing file]") return existing_df = existing_df.reindex(columns=write_cols) same_day_mask = (existing_df["date"] == batch_date) & (existing_df["ticker"] == symbol) if same_day_mask.any(): exist_ts = existing_df.loc[same_day_mask, "timestamp"] exist_min, exist_max = exist_ts.min(), exist_ts.max() new_min, new_max = new_df["timestamp"].min(), new_df["timestamp"].max() if new_max < exist_min: relation = "before (entirely earlier than existing range)" elif new_min > exist_max: relation = "after (entirely later than existing range)" else: relation = "overlapping (some timestamps fall inside existing range)" print(f"{batch_date} {symbol}: existing range [{exist_min}, {exist_max}], " f"new range [{new_min}, {new_max}] -> {relation}") else: print(f"{batch_date} {symbol}: no existing rows; writing entire range.") existing_df["__ts_key__"] = existing_df["timestamp"].round(timestamp_round) new_df["__ts_key__"] = new_df["timestamp"].round(timestamp_round) existing_ts_keys = set(existing_df.loc[same_day_mask, "__ts_key__"]) if existing_ts_keys: before_len = len(new_df) new_df = new_df[~new_df["__ts_key__"].isin(existing_ts_keys)] dropped = before_len - len(new_df) if dropped > 0: print(f"Dropped {dropped} overlapping rows for {batch_date} {symbol} based on timestamp match.") existing_df = existing_df.drop(columns=["__ts_key__"]) new_df = new_df.drop(columns=["__ts_key__"]) if new_df.empty: print(f"No new (non-overlapping) rows to add for {batch_date} {symbol}. Left {path} unchanged.") return combined = pd.concat([existing_df, new_df], ignore_index=True) sort_date = pd.to_datetime(combined["date"], errors="coerce") combined = combined.assign(__date_sort__=sort_date) combined = combined.sort_values( by=["__date_sort__", "timestamp"], ascending=[True, True], kind="mergesort", # stable ).drop(columns="__date_sort__") combined = combined.reindex(columns=write_cols) combined.to_csv(path, index=False) print(f"Updated {path}: added {len(new_df)} new rows; total rows = {len(combined)}")
# -------------------------- # DEBUGGING # -------------------------- def _check_books_match(self, num_levels_to_check, dataL_loc, verbose=False): snapshot = self._dataL.iloc[dataL_loc] reconstructed = self.orderbook.convert_orderbook_to_L2_dataframe() for side in ["ask", "bid"]: for level in range(num_levels_to_check): csv_price = snapshot[f"{side.capitalize()}Price{level + 1}"] csv_size = snapshot[f"{side.capitalize()}Size{level + 1}"] recon_key = f"{side}_{level}" if recon_key in reconstructed.index: recon_row = reconstructed.loc[recon_key] recon_price = recon_row["price"] recon_size = recon_row["size"] # If CSV has dummy value but reconstructed has a real value, fail if (side == "ask" and csv_price == 9999999999) or (side == "bid" and csv_price == -9999999999): raise MatchingError( side=side, csv_price=csv_price, csv_size=csv_size, recon_price=recon_price, recon_size=recon_size, message = f"{side.upper()} level {level+1} unexpectedly present in reconstruction: CSV has dummy value but reconstruction has price {recon_price}" ) if recon_price != csv_price: raise MatchingError( side=side, csv_price=csv_price, csv_size=csv_size, recon_price=recon_price, recon_size=recon_size, message= f"{side.upper()} level {level+1} price mismatch: CSV={csv_price}, Reconstructed={recon_price}" ) if recon_size != csv_size: raise MatchingError( side=side, csv_price=csv_price, csv_size=csv_size, recon_price=recon_price, recon_size=recon_size, message= f"{side.upper()} level {level+1} size mismatch: CSV={csv_size}, Reconstructed={recon_size}" ) else: if not ((side == "ask" and csv_price == 9999999999) or (side == "bid" and csv_price == -9999999999)): raise MatchingError( side=side, csv_price=csv_price, csv_size=csv_size, recon_price=-1, recon_size=-1, message=f"{side.upper()} level {level+1} missing in reconstruction: expected CSV price {csv_price}" ) if verbose: print("Messagebook and orderbook match.") def _check_full_book(self, num_levels_to_check, n_level_message_df): print("Checking books match...") print(f"Comparing {num_levels_to_check} levels of books match from a csv file {self.dataM.shape[0]} lines long...") dataL_loc = 0 curr_order = None reference_order = None for r in n_level_message_df.itertuples(index=False): reference_order = Order(r.Time, r.Type, r.OrderID, r.Size, r.Price, r.Direction) for row in self.dataM.iloc[self._last_idx:].itertuples(index=False): curr_order = Order(row.Time, row.Type, row.OrderID, row.Size, row.Price, row.Direction) self.orderbook.process_order(curr_order) self._last_idx += 1 if reference_order == curr_order: break try: self._check_books_match(num_levels_to_check, dataL_loc) dataL_loc += 1 batch_start_index = self._last_idx except MatchingError as e: print("\n--- ERROR: Mismatch detected! ---") print(f"Error caught at message index: {self._last_idx} (Snapshot Time: {curr_order.timestamp})") print( f"The bug was caused by one of the messages in this batch (indices {batch_start_index} to {self._last_idx - 1}):") print(self.dataM.iloc[batch_start_index: self._last_idx]) print("\n--- ERROR DETAILS ---") print(e) self.orderbook.display_L2_order_book() return if self._last_idx%10000 == 0: print(f"{self._last_idx} / {self.dataM.shape[0]}") print("Full book is good")