import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import weibull_min
from scipy.stats import norm
from scipy import stats
import math
import progressbar
import lifelines
from lifelines.utils import median_survival_times
print('时间终点的样本量计算与模拟的Python程序\nby Lokyshin\n')
#显示所有列
pd.set_option('display.max_columns', None)
#显示所有行
pd.set_option('display.max_rows', None)
# 参数设定
histControl_time_to_event = 5 # 单位不限,月/年/日均可
expected_time_to_event = 8 # 单位不限,月/年/日均可
selected_time_to_event = 5 # 选择历史对照或预期组,用于计算weibull分布的scale
HR = histControl_time_to_event / expected_time_to_event
alpha = 0.05
power = 0.8
# 假设试验组和对照组的人数相等,即nE/nC = 1
k = 1
# 设定事件数占比,比如治疗组vs对照组1:1 则治疗组与对照组各1/2;如2:1,则治疗组2/3,对照组占1/3
ratio_of_events = 1/2
# 设定模拟次数
n_simulation = 100
# 需要模拟的总研究时间
total_time = 100
# 模拟参数
accrual_rate = 3
duration_shape = 1.5
# 设定目标
maturity_target = 0.7
# 计算scale参数
duration_scale = selected_time_to_event / (np.log(2) ** (1/duration_shape))
# 计算预期的事件总数
excepted_events_total = (-np.log(power) + norm.ppf(1 - alpha / 2)) ** 2 / (k * (HR - 1) ** 2 / ((k * HR + 1) ** 2))
# 对应组的预期事件数
excepted_events = excepted_events_total * ratio_of_events
# 初始化结果变量
result_sim = ''
result_confirm = ''
def sim_process(excepted_events, n_simulation, total_time, duration_shape, duration_scale, accrual_rate):
# 初始化其他参数
df_enrollment = pd.DataFrame()
time_result = None
df_sim_TAF = pd.DataFrame({
'Total': [],
'Accrual_time': [],
'Follow_up_time':[]
})
bar_target = n_simulation
bar_sim = progressbar.ProgressBar(maxval=bar_target, widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()])
bar_sim.start()
for i_simulation in range(1, bar_target + 1):
# 按照设定的模拟次数模拟入组过程
for time in range(1, total_time + 1):
duration_time_to_events = weibull_min.rvs(duration_shape, scale=duration_scale, size=accrual_rate)
event = 0
if time == 1:
df_enrollment = pd.DataFrame({'time': [time] * accrual_rate,
'time_to_event': duration_time_to_events,
'event': event})
else:
df_enrollment_new = pd.DataFrame({'time': [time] * accrual_rate,
'time_to_event': duration_time_to_events,
'event': event})
# 在合并前,处理df_enrollment的event
for i_row, row in df_enrollment.iterrows():
follow_up = time - row['time']
time_to_event = row['time_to_event']
if time_to_event <= follow_up:
df_enrollment.at[i_row, 'event'] = 1
else:
df_enrollment.at[i_row, 'event'] = 0
df_enrollment = pd.concat([df_enrollment, df_enrollment_new], ignore_index=True)
# 计算累积事件数量
events = df_enrollment['event'].sum()
# 计算数据成熟度
maturity = events/excepted_events
# 过滤出event=1的行
df_event = df_enrollment[df_enrollment['event'] == 1]
# 在time列取最大值,即末例患者入组的时间点。
LPI_time_to_event = df_event['time'].max()
median_follow_up_event = df_event['time_to_event'].median()
#print(f"time: {time}, Accrued: {accrued}, Events: {events}, Maturity: {maturity:.2f}")
#print('\n模拟的数据集\n')
#print(df_enrollment)
#print('\n')
if maturity > maturity_target:
time_result = time
df_sim_TAF_new = pd.DataFrame({
'Total': [time_result],
'Accrual_time': [LPI_time_to_event],
'Follow_up_time':[median_follow_up_event],
'Events': [events],
'Maturity': [maturity]
})
df_sim_TAF = pd.concat([df_sim_TAF, df_sim_TAF_new], ignore_index=True)
#print('\n找到对应选定数据成熟度的合适时间\n')
#print(df_sim_TAF)
#print('\n')
break
#print(f'第【{i_simulation}】次模拟入组')
bar_sim.update(i_simulation)
return df_sim_TAF
# 模拟确认过程
def sim_confirm_process(expected_events, n_sim, accrual_time, followup_time, duration_shape, duration_scale, accrual_rate):
total_time = accrual_time + math.ceil(followup_time)
df_confirm = pd.DataFrame({'Events':[], 'Maturity':[], 'LPI':[]})
bar_target = n_sim
bar_confirm = progressbar.ProgressBar(maxval=bar_target, widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()])
bar_confirm.start()
# 初始化一个空的 DataFrame 来存储结果
total_survival_result = pd.DataFrame()
for i in range(1, bar_target + 1):
# 每次模拟充值数据集
df_enroll = pd.DataFrame()
for time in range(1, total_time + 1):
if time <= accrual_time:
duration_time_to_events = weibull_min.rvs(duration_shape, scale=duration_scale, size=accrual_rate)
df_new_enroll = pd.DataFrame({'time': [time] * accrual_rate,
'time_to_event': duration_time_to_events,
'event': [0] * accrual_rate}) # 先都设为0
# 遍历df_new的每一行,判断event
for i_new, row in df_new_enroll.iterrows():
follow_up = total_time - row['time']
time_to_event = row['time_to_event']
if time_to_event <= follow_up:
df_new_enroll.at[i_new, 'event'] = 1
else:
df_new_enroll.at[i_new, 'event'] = 0
if time <= accrual_time:
df_enroll = pd.concat([df_enroll, df_new_enroll])
# 计算累积事件数量
events = df_enroll['event'].sum()
# 计算数据成熟度
maturity = events/excepted_events
df_new_confirm = pd.DataFrame({'Events': [events],
'Maturity': [maturity],
'LPI': [df_enroll[df_enroll['time_to_event'] <= time - df_enroll['time']]['time'].max()]})
df_confirm = pd.concat([df_confirm, df_new_confirm])
#print('\n模拟确认的数据集\n')
#print(df_enroll)
#print('\n')
# 创建KaplanMeierFitter对象
kmf_des_result = lifelines.KaplanMeierFitter()
# 拟合生存曲线
kmf_des_result.fit(df_enroll['time_to_event'], df_enroll['event'])
# 计算时间
n = len(df_enroll)
median_total = kmf_des_result.median_survival_time_
median_confidence_interval_ = median_survival_times(kmf_des_result.confidence_interval_)
lcl95_total = median_confidence_interval_.iloc[0, 0]
ucl95_total = median_confidence_interval_.iloc[0, 1]
# 把结果存入 DataFrame
df_enroll_confirm_result_new = pd.DataFrame({
'Sample size': [n],
'Median (total)': [round(median_total, 3)],
'[95% conf.': [round(lcl95_total, 3)],
'interval]': [round(ucl95_total, 3)]
}, index=['Groups']) # 使用'All Groups'作为索引
# 把结果添加到总的 DataFrame 中
total_survival_result = pd.concat([total_survival_result, df_enroll_confirm_result_new])
#print(f'第【{i}】次确认模拟完成')
bar_confirm.update(i)
#print(total_survival_result)
return df_confirm, total_survival_result, df_enroll
# 定义计算统计量函数
def stat_des(col_name, data):
mean = data.mean() # 均值
median = data.median() # 中位数
std_dev = data.std() # 标准差
std_error = data.sem() # 标准误
upper_quartile = data.quantile(0.75) # 上四分位数
lower_quartile = data.quantile(0.25) # 下四分位数
iqr = upper_quartile - lower_quartile # 四分位间距
max_value = data.max() # 极大值
min_value = data.min() # 极小值
range_ = max_value - min_value # 极差
# 计算95%可信区间,然后取小数点后3位
confidence_interval = stats.t.interval(0.95, len(data)-1, loc=mean, scale=std_error)
confidence_interval = [round(value,2) for value in confidence_interval]
# 创建结果数据框
df_sata_des = pd.DataFrame({
'Item': col_name,
'Mean': mean,
'Median': median,
'Std Dev': std_dev,
'Std Error': std_error,
'Max Value': max_value,
'Min Value': min_value,
'Range': range_,
'[95% conf. interval]': [confidence_interval]
}, index=[0])
return df_sata_des
print(f'\n模拟计算中,请稍后...')
df_sim_TAF = sim_process(excepted_events, n_simulation, total_time, duration_shape, duration_scale, accrual_rate)
print(f'\n模拟计算完成')
print(f'\n模拟 {n_simulation} 次的预估时间与可信区间汇总\n')
df_stat_des_Total = stat_des('Total', df_sim_TAF['Total']).round(2)
df_stat_des_Accrual_time = stat_des('Accrual', df_sim_TAF['Accrual_time']).round(2)
df_stat_des_Follow_up_time = stat_des('Follow_up', df_sim_TAF['Follow_up_time']).round(2)
df_stat_des_Events = stat_des('Events', df_sim_TAF['Events']).round(2)
df_stat_des_Maturity = stat_des('Maturity', df_sim_TAF['Maturity']).round(2)
df_stat_des_result = pd.DataFrame()
df_stat_des_result = pd.concat([df_stat_des_result, df_stat_des_Total])
df_stat_des_result = pd.concat([df_stat_des_result, df_stat_des_Accrual_time])
df_stat_des_result = pd.concat([df_stat_des_result, df_stat_des_Follow_up_time])
df_stat_des_result = pd.concat([df_stat_des_result, df_stat_des_Events])
df_stat_des_result = pd.concat([df_stat_des_result, df_stat_des_Maturity])
result_sim += f'\n\n1. 模拟 {n_simulation} 次的预估时间与可信区间汇总\n'
result_sim += f'\n依据您给定的全部模拟参数,所需事件总数为:{excepted_events:.2f}'
samplesize_sim = df_stat_des_Accrual_time['Median'][0].round(0) * accrual_rate
result_sim += f'\n按照入组速率:{accrual_rate},本组所需的无脱落样本量是:{samplesize_sim:.0f}\n'
result_sim += f'\n满足数据成熟度({maturity_target})的总研究时间、入组时间、随访时间、事件发生数、数据成熟度分别为'
df_stat_des_result_str = df_stat_des_result.to_string(index=False)
result_sim += f'\n{df_stat_des_result_str}'
# 之前给定的参数确定了excepte_events, 现在给定Accrual_time, Follow_up_time
# 模拟入组时间设定为中位时间
accrual_time_target = math.ceil(df_sim_TAF['Accrual_time'].median())
# 选择选择 df_sim_TAF['Accrual_time'].median()和expected_time_to_event比较大的作为随访时间
follow_up_time_target = math.ceil(max(df_sim_TAF['Follow_up_time'].median(), expected_time_to_event))
# 更新模拟入组的总研究时间
total_time = accrual_time_target + math.ceil(follow_up_time_target)
print("\n测试计算中,请稍后...")
df_sim_confirm, total_survival_result, df_enroll = sim_confirm_process(excepted_events, n_simulation, accrual_time_target, follow_up_time_target, duration_shape, duration_scale, accrual_rate)
print(f'\n测试模拟计算完成')
df_stat_des_Events = stat_des('Events', df_sim_confirm['Events']).round(2)
df_stat_des_Maturity = stat_des('Maturity', df_sim_confirm['Maturity']).round(2)
df_stat_des_time_LPI = stat_des('LPI', df_sim_confirm['LPI']).round(2)
df_stat_des_Median_est = stat_des('Median_est', total_survival_result['Median (total)']).round(2)
df_stat_des_confirm = pd.DataFrame()
df_stat_des_confirm = pd.concat([df_stat_des_confirm, df_stat_des_Events])
df_stat_des_confirm = pd.concat([df_stat_des_confirm, df_stat_des_Maturity])
df_stat_des_confirm = pd.concat([df_stat_des_confirm, df_stat_des_time_LPI])
df_stat_des_confirm = pd.concat([df_stat_des_confirm, df_stat_des_Median_est])
result_confirm += f'\n\n2. 按照给定入组时间段({accrual_time_target})和给定随访时间({follow_up_time_target}),模拟 {n_simulation} 次的数据成熟情况\n'
samplesize_sim = round(accrual_time_target,0) * accrual_rate
result_confirm += f'\n按照入组速率:{accrual_rate},本组的无脱落样本量是:{samplesize_sim:.0f}\n'
result_confirm += f'\n模拟的累积事件数量、模拟的数据成熟度、最后发生事件患者的入组时间点,以及根据模拟数据预估事件发生的中位时间分别为'
df_stat_des_confirm_str = df_stat_des_confirm.to_string(index=False)
result_confirm += f'\n{df_stat_des_confirm_str}'
df_fig = df_enroll
df_enroll['time_to_event'] = df_enroll['time_to_event'].round(2)
df_enroll = df_enroll.rename(columns={'time_to_event': '事件发生时间'})
df_enroll = df_enroll.rename(columns={'time': '入组时间'})
df_enroll = df_enroll.rename(columns={'event': '事件发生(1为发生)'})
df_enroll_str = df_enroll.to_string(index=False)
result_confirm += f'\n\n\n{n_simulation}次模拟中,其中1次匀速入组速率为 {accrual_rate} 的入组与事件发生情况'
result_confirm += f'\n(模拟的入组数据生成的Kaplan-Meier曲线请查图片)'
result_confirm += f'\n\n{df_enroll_str}'
# 数据整理与拟合
T = df_fig['time_to_event']
E = df_fig['event']
# 创建KaplanMeierFitter对象
kmf_sim = lifelines.KaplanMeierFitter()
# 拟合生存曲线
kmf_sim.fit(T, event_observed = E)
fig, ax = plt.subplots()
# 绘制KM曲线
kmf_sim.plot(ax=ax, show_censors=True, at_risk_counts=True)
# 添加标题
ax.set_xlabel('Duration')
ax.set_ylabel('Survival(%)')
ax.set_title('Kaplan-Meier Curve of simulation', pad=20) # pad参数可以调整标题与图像的距离
# 自适应调整图像布局
plt.tight_layout()
print(result_sim)
print(result_confirm)
plt.show()
print('\n感谢您使用本程序进行统计分析,再见。\n')