"""
MNQ Initial Balance Break / Extension Regime Study.
Definitions:
- RTH session: 09:30-16:00
- Initial Balance: 09:30-10:30
- IB mid: (IB high + IB low) / 2
- Post-IB break window: bars after 10:30
- High-only break: post-IB high breaks IB high and post-IB low does not break IB low
- Low-only break: post-IB low breaks IB low and post-IB high does not break IB high
- Both breaks: post-IB high breaks IB high and post-IB low breaks IB low
- Extension buckets are mutually exclusive:
* 25% extension, no 50%
* 50% extension, no 75%
* 75% extension, no 100%
* 100% extension or greater
Output:
- ib_break_extension_summary.csv
- ib_break_extension_day_detail.csv
"""
import math
from datetime import time
import numpy as np
import pandas as pd
DATA_PATH = "MNQ LT 2.csv"
RTH_START = time(9, 30)
RTH_END = time(16, 0)
IB_START = time(9, 30)
IB_END = time(10, 30)
def mask_between(t_series, start_t, end_t):
return (t_series >= start_t) & (t_series < end_t)
def wilson_ci(k, n, z=1.959963984540054):
if n == 0:
return (np.nan, np.nan)
p = k / n
denom = 1 + z * z / n
center = (p + z * z / (2 * n)) / denom
half = z * math.sqrt((p * (1 - p) + z * z / (4 * n)) / n) / denom
return center - half, center + half
def pct(x):
return 100 * x if pd.notna(x) else np.nan
# ---- Load data ----
cols = pd.read_csv(DATA_PATH, nrows=0).columns.tolist()
dt_candidates = [c for c in cols if any(k in c.lower() for k in ["date time", "datetime", "timestamp"])]
if not dt_candidates:
dt_candidates = [c for c in cols if ("date" in c.lower() and "time" in c.lower())]
if not dt_candidates:
raise ValueError("Could not locate a datetime column.")
dt_col = dt_candidates[0]
high_col = "High" if "High" in cols else ("HIGH" if "HIGH" in cols else "high")
low_col = "Low" if "Low" in cols else ("LOW" if "LOW" in cols else "low")
close_col = "Close" if "Close" in cols else ("CLOSE" if "CLOSE" in cols else "close")
df = pd.read_csv(DATA_PATH, usecols=[dt_col, high_col, low_col, close_col], low_memory=False)
df["dt"] = pd.to_datetime(df[dt_col], errors="coerce")
df = df.dropna(subset=["dt"]).copy()
df = df.rename(columns={high_col: "High", low_col: "Low", close_col: "Close"})
df["date"] = df["dt"].dt.date
df["t"] = df["dt"].dt.time
# ---- Session-level computation ----
rows = []
for d, day in df.groupby("date", sort=True):
rth = day[mask_between(day["t"], RTH_START, RTH_END)].copy()
ib = day[mask_between(day["t"], IB_START, IB_END)].copy()
post_ib = rth[rth["t"] >= IB_END].copy()
if rth.empty or ib.empty or post_ib.empty:
continue
ib_high = ib["High"].max()
ib_low = ib["Low"].min()
ib_range = ib_high - ib_low
ib_mid = (ib_high + ib_low) / 2
rth_close = rth.iloc[-1]["Close"]
if not np.isfinite(ib_range) or ib_range <= 0:
continue
post_high = post_ib["High"].max()
post_low = post_ib["Low"].min()
hi_break = post_high > ib_high
lo_break = post_low < ib_low
hi_only = hi_break and not lo_break
lo_only = lo_break and not hi_break
both = hi_break and lo_break
hi_ext = (post_high - ib_high) / ib_range if hi_break else 0.0
lo_ext = (ib_low - post_low) / ib_range if lo_break else 0.0
rows.append({
"date": d,
"ib_high": ib_high,
"ib_low": ib_low,
"ib_mid": ib_mid,
"ib_range": ib_range,
"rth_close": rth_close,
"hi_break": hi_break,
"lo_break": lo_break,
"hi_only": hi_only,
"lo_only": lo_only,
"both_breaks": both,
"close_above_ib_mid": rth_close > ib_mid,
"close_below_ib_mid": rth_close < ib_mid,
"hi_ext_pct_of_ib": hi_ext,
"lo_ext_pct_of_ib": lo_ext,
"hi_ext_25_no_50": hi_only and hi_ext >= 0.25 and hi_ext < 0.50,
"hi_ext_50_no_75": hi_only and hi_ext >= 0.50 and hi_ext < 0.75,
"hi_ext_75_no_100": hi_only and hi_ext >= 0.75 and hi_ext < 1.00,
"hi_ext_100_plus": hi_only and hi_ext >= 1.00,
"lo_ext_25_no_50": lo_only and lo_ext >= 0.25 and lo_ext < 0.50,
"lo_ext_50_no_75": lo_only and lo_ext >= 0.50 and lo_ext < 0.75,
"lo_ext_75_no_100": lo_only and lo_ext >= 0.75 and lo_ext < 1.00,
"lo_ext_100_plus": lo_only and lo_ext >= 1.00,
})
daily = pd.DataFrame(rows)
def summarize(label, denominator_mask, numerator_mask):
n = int(denominator_mask.sum())
k = int((denominator_mask & numerator_mask).sum())
p = k / n if n else np.nan
lo, hi = wilson_ci(k, n)
return {
"conditional_probability": label,
"N": n,
"count": k,
"probability": p,
"probability_pct": pct(p),
"wilson_ci_low_pct": pct(lo),
"wilson_ci_high_pct": pct(hi),
}
all_days = pd.Series(True, index=daily.index)
summary = pd.DataFrame([
summarize("P(IB hi or lo break)", all_days, daily["hi_break"] | daily["lo_break"]),
summarize("P(only IB hi breaks)", all_days, daily["hi_only"]),
summarize("P(only IB lo breaks)", all_days, daily["lo_only"]),
summarize("P(both breaks)", all_days, daily["both_breaks"]),
summarize("P(close above IB mid | IB hi break only)", daily["hi_only"], daily["close_above_ib_mid"]),
summarize("P(close above IB mid | hi-only + 25% ext, no 50%)", daily["hi_ext_25_no_50"], daily["close_above_ib_mid"]),
summarize("P(close above IB mid | hi-only + 50% ext, no 75%)", daily["hi_ext_50_no_75"], daily["close_above_ib_mid"]),
summarize("P(close above IB mid | hi-only + 75% ext, no 100%)", daily["hi_ext_75_no_100"], daily["close_above_ib_mid"]),
summarize("P(close above IB mid | hi-only + 100% ext)", daily["hi_ext_100_plus"], daily["close_above_ib_mid"]),
summarize("P(close below IB mid | IB lo break only)", daily["lo_only"], daily["close_below_ib_mid"]),
summarize("P(close below IB mid | lo-only + 25% ext, no 50%)", daily["lo_ext_25_no_50"], daily["close_below_ib_mid"]),
summarize("P(close below IB mid | lo-only + 50% ext, no 75%)", daily["lo_ext_50_no_75"], daily["close_below_ib_mid"]),
summarize("P(close below IB mid | lo-only + 75% ext, no 100%)", daily["lo_ext_75_no_100"], daily["close_below_ib_mid"]),
summarize("P(close below IB mid | lo-only + 100% ext)", daily["lo_ext_100_plus"], daily["close_below_ib_mid"]),
])
print(summary)
summary.to_csv("ib_break_extension_summary.csv", index=False)
daily.to_csv("ib_break_extension_day_detail.csv", index=False)