from rpy2.robjects import pandas2ri
import rpy2.robjects as ro
from rpy2.robjects.packages import importr
print('两独立样本率优效性检验的样本量计算 - Python程序 \nby Lokyshin\n')
# 请注意,该程序是python调用R的pwr.2p.test / pwr.p2n2.test与R内置的power.prop.test实现。
# reference: https://cran.r-project.org/web/packages/pwr
p1=0.5
p2=0.8
alpha=0.05
power=0.90
power_cal_method="binomial_enumeration"
R=2
alternative="two.sided"
# 打开pandas转r dataframe的功能
#pandas2ri.activate()
# 载入需要的R包
pwr = importr('pwr')
# 设定R代码
if R == 1 :
if power_cal_method == "binomial_enumeration":
#使用二项枚举方法计算
r_codes =f'''
# 计算效应量
h <- ES.h({p2}, {p1})
# 使用 if 语句根据 alternative 参数进行条件判断
if ("{alternative}" == "two.sided") {{
# H0: p2 = p1, Ha: p2 != p1
sample_size <- pwr.2p.test(h = h, sig.level = {alpha}, power = {power}, alternative = "two.sided")
}} else {{
if ({p2} > {p1}) {{
#H0: p2 <= p1, Ha: p2 > p1
sample_size <- pwr.2p.test(h = h, sig.level = {alpha}, power ={power}, alternative = "greater")
}} else {{
#H0: p2 >= p1, Ha: p2 < p1
sample_size <- pwr.2p.test(h = h, sig.level = {alpha}, power = {power}, alternative = "less")
}}
}}
# 打印结果
n <- ceiling(sample_size$n)
total_sample_size <- n * 2
R_result_str = paste0("使用二项枚举方法计算:",
"试验组和对照组样本量均为: ", n, ";",
"总样本量为: ", total_sample_size, "。")
print(R_result_str)
print(sample_size)
'''
else:
#使用正态分布近似方法计算
r_codes = f'''
# 计算样本量
result <- power.prop.test(p1 = {p1}, p2 = {p2}, sig.level = {alpha}, power = {power}, alternative = "{alternative}")
# 调整结果
n <- ceiling(result$n)
total_sample_size <- n * 2
# 打印结果
R_result_str = paste0("使用二项枚举方法计算:",
"试验组和对照组样本量均为: ", n, ";",
"总样本量为: ", total_sample_size, "。")
print(R_result_str)
print(result)
'''
else:
ratio = R
if power_cal_method == "binomial_enumeration":
#使用二项枚举方法迭代计算
r_codes =f'''
# 计算效应量
h <- ES.h({p2}, {p1})
# 初始化迭代计数器
iteration_counter <<- 0
# 定义计算样本量的函数
sample_size_function <- function(n1) {{
iteration_counter <<- iteration_counter + 1 # 递增迭代计数器
n2 <- {ratio} * n1
# 使用 if 语句根据 alternative 参数进行条件判断
if ("{alternative}" == "two.sided") {{
# H0: p2 = p1, Ha: p2 != p1
result <- pwr.2p2n.test(h = h, n1 = n1, n2 = n2, sig.level = {alpha}, power = NULL, alternative = "two.sided")
}} else {{
if ({p2} > {p1}) {{
#H0: p2 <= p1, Ha: p2 > p1
result <- pwr.2p2n.test(h = h, n1 = n1, n2 = n2, sig.level = {alpha}, power = NULL, alternative = "greater")
}} else {{
#H0: p2 >= p1, Ha: p2 < p1
result <- pwr.2p2n.test(h = h, n1 = n1, n2 = n2, sig.level = {alpha}, power = NULL, alternative = "less")
}}
}}
cat("迭代次数:", iteration_counter, "; n1:", n1, "; n2:", n2, "\n")
print(result)
return(result$power - {power})
}}
# 使用 uniroot 函数进行迭代计算
result <- uniroot(sample_size_function, interval = c(10, 10000))
# 得到样本量
n1 <- ceiling(result$root)
n2 <- ceiling({ratio} * n1)
R_result_str = paste0("使用二项枚举方法迭代计算:",
"对照组样本量为: ", n1, ";",
"试验组样本量为: ", n2, ";",
"总样本量为: ", n1+n2, "。")
print(R_result_str)
'''
else:
#使用正态分布近似方法迭代计算
power_target = power
r_codes = f'''
# 初始化迭代计数器
iteration_counter <- 0
if ("{alternative}" == "one.sided"){{
cat('进行单侧检验:\n')
}}else{{
cat('进行双侧检验:\n')
}}
# 迭代计算样本量
for (n1 in 1:10000) {{
n2 <- {ratio} * n1
total_n <- n1 + n2
# 计算合并比例
p_combined <- ({p1} * n1 +{p2} * n2) / (n1 + n2)
# 计算标准误
se <- sqrt(p_combined * (1 - p_combined) * (1 / n1 + 1 / n2))
# 计算Z值
if ("{alternative}" == "one.sided") {{
z_alpha <- qnorm(1 - {alpha}) # 单侧检验
}} else {{
z_alpha <- qnorm(1 - {alpha} / 2) # 双侧检验需要将alpha减半
}}
z_effect <- ({p2} - {p1}) / se
# 计算功效power
if ("{alternative}" == "one.sided") {{
power <- pnorm(z_effect - z_alpha)
}} else {{
power <- pnorm(z_effect - z_alpha) + (1 - pnorm(z_effect + z_alpha))
}}
iteration_counter <- iteration_counter + 1
cat("迭代次数:", iteration_counter, "; n1:", n1, "; n2:", n2, "; 实际Power:", power, "\n")
if (power >={ power_target}) {{
break
}}
}}
R_result_str = paste0("使用正态分布近似方法迭代计算:",
"对照组样本量为: ", n1, ";",
"试验组样本量为: ", n2, ";",
"总样本量为: ", n1+n2, "。")
print(R_result_str)
'''
# 执行R代码
ro.r(r_codes)