## Code

```
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import math
import statsmodels.formula.api as smf
from IPython.display import Markdown
from tabulate import tabulate
from typing import List
from scipy.stats import gmean, linregress, norm
from collections import namedtuple
from collections import defaultdict
= [100, 200, 400]
DEFAULT_SWAB_SAMPLE_SIZES = [50, 75]
SMALL_SWAB_SAMPLE_SIZES = 0.35
ASYMPTOMATIC_SHARE = 0.01
TARGET_INCIDENCE_W = 3
DOUBLING_PERIOD_D
= False
DEBUG
= ["Rothman et al. 2021", "Crits-Christoph et al. 2021"]
WW_STUDY_TITLES
def get_num_and_exp(x):
= f"{x:.1e}"
scientific_x = scientific_x.split("e")[0], int(scientific_x.split("e")[1])
number, exponent return number, exponent
def logit(x):
return np.log(x / (1 - x))
def logistic(x):
return 1 / (1 + np.exp(-x))
def get_studies():
= pd.read_csv(
df_op_lu "../data/2024-06-17-swab-sensitivity/lu_op_ct_mgs.tsv", sep="\t", skiprows=1
# Data obtained from Table S1.
)
df_op_lu.rename(={"SCV-2 Relative Abundance": "scv2_ra", "Ct value": "scv2_ct"},
columns=True,
inplace
)"patient_status", "swab_type", "Study"]] = [
df_op_lu[["Inpatient",
"op",
"Lu et al. 2021",
]
= pd.read_csv(
df_np_rodriguez "../data/2024-06-17-swab-sensitivity/rodriguez_np_ct_mgs.csv", sep=";"
# Data sent to us by authors.
) = {
rodriguez_patient_status_dict "Hospit": "Inpatient",
"Out_Patient": "Outpatient",
"Intensive_Care": "ICU",
}"patient_status"] = df_np_rodriguez["Group"].replace(
df_np_rodriguez[
rodriguez_patient_status_dict
)"scv2_ra"] = (
df_np_rodriguez["Reads_2019_CoV"] / df_np_rodriguez["Reads_Total"]
df_np_rodriguez[
)
={"CoV_Ct_number": "scv2_ct"}, inplace=True)
df_np_rodriguez.rename(columns"swab_type", "Study"]] = ["np", "Rodriguez et al. 2021"]
df_np_rodriguez[[
= pd.read_csv(
df_np_babiker "../data/2024-06-17-swab-sensitivity/babiker_np_ct_mgs.tsv",
="\t",
sep=1,
skiprows# Data obtained from table S2
)
df_np_babiker.rename(={
columns"SARS-CoV-2 RT-PCR Ct": "scv2_ct",
"SARS-CoV-2 RA": "scv2_ra",
"Inpatient/ED vs. Outpatient": "patient_status",
},=True,
inplace
)"scv2_ct"] = (
df_np_babiker["scv2_ct"].replace(",", ".", regex=True).astype(float)
df_np_babiker[
)"patient_status"] = df_np_babiker["patient_status"].apply(
df_np_babiker[lambda x: x if x in ["Inpatient", "Outpatient"] else "Unknown"
)# The data uses . to represent missing data. Set this column to integers, while at the same time mapping missing data to NA.
"days_from_onset"] = (
df_np_babiker["Day of Testing Relative to Symptom Onset"]
df_np_babiker[".", "-1")
.replace(int)
.astype(-1, "NA")
.replace(
)"swab_type", "Study"]] = ["np", "Babiker et al. 2020"]
df_np_babiker[[
= pd.read_csv(
df_np_mostafa "../data/2024-06-17-swab-sensitivity/mostafa-np-ra-ct.tsv", sep="\t"
# Data obtained from Table S2.
) = {
mostafa_severity_dict 1: "Required\nventilator",
2: "ICU",
3: "Inpatient",
4: "Outpatient",
0: "Unknown",
}
df_np_mostafa.rename(={
columns"SARS-CoV-2 RT-PCR Ct value": "scv2_ct",
"CosmosID Proportion Mapped to SARS-CoV-2": "scv2_ra",
},=True,
inplace
)"Severity index"] = df_np_mostafa["Severity index"].replace("–", 0)
df_np_mostafa["patient_status"] = (
df_np_mostafa["Severity index"].astype(int).replace(mostafa_severity_dict)
df_np_mostafa[
)# There is no information of why some patients only have "<7" as their days from onset. We set it to 3.5 (the average of 1-6 days.)
"days_from_onset"] = df_np_mostafa["No. of days from onset"].replace(
df_np_mostafa["–": "NA", "<7": "3.5"}
{
)# Drop samples unless we have both qPCR and MGS detection
= df_np_mostafa[df_np_mostafa["COVID-19-positive"] == True]
df_np_mostafa = df_np_mostafa[df_np_mostafa["scv2_ct"] != "–"]
df_np_mostafa "scv2_ct"] = df_np_mostafa["scv2_ct"].astype(float)
df_np_mostafa["swab_type", "Study"]] = ["np", "Mostafa et al. 2020"]
df_np_mostafa[[
= {
study_dfs "Lu et al. 2021": df_op_lu,
"Babiker et al. 2020": df_np_babiker,
"Mostafa et al. 2020": df_np_mostafa,
"Rodriguez et al. 2021": df_np_rodriguez,
}return study_dfs
def get_study_ras():
= get_studies()
studies = {}
study_ras for title, df in studies.items():
= df["scv2_ra"].tolist()
study_ras[title] return study_ras
def get_composite_ras():
= get_study_ras().values()
study_ras = sum(study_ras, [])
composite_swab_ras if DEBUG:
print(len(composite_swab_ras))
return composite_swab_ras
def get_adjusted_composite_ras():
= get_studies().values()
study_dfs = pd.concat(study_dfs)
composite_df = composite_df[
composite_df "patient_status"].isin(["Inpatient", "Outpatient"])
composite_df[
]= composite_df[composite_df["scv2_ra"] == 0]["scv2_ra"].tolist()
zero_ras = adjust_cts(composite_df)
symptom_status_dfs = defaultdict(list)
symptom_status_ras for symptom_status in ["Asymptomatic", "Symptomatic"]:
= symptom_status_dfs[symptom_status]
df = df[df["scv2_ra"] != 0]
df = adjust_rel_abun(df)
df = df["adjusted_scv2_ra"].tolist() + zero_ras
ras = ras
symptom_status_ras[symptom_status]
return symptom_status_ras
def get_study_p2ra(swab_sample_sizes=DEFAULT_SWAB_SAMPLE_SIZES):
= get_study_ras().values()
study_ras = get_study_ras().keys()
study_titles
= get_swab_p2ra(study_titles, study_ras, swab_sample_sizes)
study_p2ra_df return study_p2ra_df
def get_composite_p2ra(swab_sample_sizes=DEFAULT_SWAB_SAMPLE_SIZES):
= get_composite_ras()
composite_ras = get_swab_p2ra(
composite_p2ra_df "Original Composite Data"], [composite_ras], swab_sample_sizes
[
)return composite_p2ra_df
def get_adjusted_composite_p2ra(swab_sample_sizes=DEFAULT_SWAB_SAMPLE_SIZES):
= get_adjusted_composite_ras()
symptom_status_ras
= get_swab_p2ra(
asymptomatic_p2ra_df "Adjusted Composite Data"],
["Asymptomatic"]],
[symptom_status_ras[
swab_sample_sizes,
)= get_swab_p2ra(
symptomatic_p2ra_df "Adjusted Composite Data"],
["Symptomatic"]],
[symptom_status_ras[
swab_sample_sizes,
)
= asymptomatic_p2ra_df.sample(
asymptomatics =ASYMPTOMATIC_SHARE, random_state=42
frac
)= symptomatic_p2ra_df.sample(
symptomatics =1 - ASYMPTOMATIC_SHARE, random_state=42
frac
)
= pd.concat(
composite_adjusted_p2ra_df =True
[asymptomatics, symptomatics], ignore_index
)return composite_adjusted_p2ra_df
def get_asymptomatic_factor():
# https://doi.org/10.1371/journal.pone.0270694
= 0 # "The initial CT values for 37 asymptomatic individuals and 37 symptomatic patients appeared similar" https://www.nature.com/articles/s41591-020-0965-6
long_2020_asymptomatic_delta = 0 # "There were no significant differences in CT values between asymptomatic and symptomatic (including presymptomatic) patients." 10.1001/jamainternmed.2020.3862
lee_2020_asymptomatic_delta = 0.99 # Extracted from supplement figure 4D https://doi.org/10.1016/S2666-5247(23)00139-8
yang_2023_asymptomatic_delta
= 29.9
hall_asymptomatic_ct_median = 21.8
hall_symptomatic_ct_median = hall_asymptomatic_ct_median - hall_symptomatic_ct_median
hall_asymptomatic_delta = (
ASYMPTOMATIC_ADJUSTMENT_FACTOR
hall_asymptomatic_delta+ long_2020_asymptomatic_delta
+ lee_2020_asymptomatic_delta
+ yang_2023_asymptomatic_delta
/ 4
)
return ASYMPTOMATIC_ADJUSTMENT_FACTOR
def adjust_cts(df):
= get_asymptomatic_factor()
ASYMPTOMATIC_ADJUSTMENT_FACTOR
= pd.read_csv(
np_data "../data/2024-06-17-swab-sensitivity/2024-06-18-np-nasal-ct.tsv",
="\t",
sep=1,
skiprows
)= np_data.mean()
np_means
= np_means.mean()
NP_ADJUSTMENT_FACTOR = pd.read_csv(
goodall_data "../data/2024-06-17-swab-sensitivity/goodall-op-nasal-ct.tsv",
="\t",
sep=2,
skiprows=None,
header
)= goodall_data[0].mean()
OP_ADJUSTMENT_FACTOR
"adjusted_scv2_ct"] = df["scv2_ct"]
df[# Subtract the adjustment factors from the CT values (NP_ADJUSTMENT_FACTOR is negative, so it increases the CT values)
"swab_type"] == "np", "adjusted_scv2_ct"] -= NP_ADJUSTMENT_FACTOR
df.loc[df["swab_type"] == "op", "adjusted_scv2_ct"] -= OP_ADJUSTMENT_FACTOR
df.loc[df[= df.copy()
df_symptomatic = df.copy()
df_asymptomatic "adjusted_scv2_ct"] += ASYMPTOMATIC_ADJUSTMENT_FACTOR
df_asymptomatic[
= {
symptom_status_dfs "Asymptomatic": df_asymptomatic,
"Symptomatic": df_symptomatic,
}
return symptom_status_dfs
def adjust_rel_abun(composite_df):
= composite_df.copy()
composite_df "scv2_ra_logit"] = composite_df["scv2_ra"].apply(logit)
composite_df.loc[:,
= linregress(
slope, intercept, r_value, p_value, std_err "scv2_ct"], composite_df["scv2_ra_logit"]
composite_df[
)"adjusted_scv2_ra_logit"] = (
composite_df[+ slope * composite_df["adjusted_scv2_ct"]
intercept
)= composite_df["scv2_ra_logit"] - (
residuals + slope * composite_df["scv2_ct"]
intercept
)
= np.var(residuals, ddof=2)
sigma_squared
= np.sqrt(sigma_squared)
sigma
= np.random.normal(loc=0, scale=sigma, size=len(composite_df))
noise
"adjusted_scv2_ra_logit_with_noise"] = (
composite_df["adjusted_scv2_ra_logit"] + noise
composite_df[
)"adjusted_scv2_ra"] = composite_df[
composite_df["adjusted_scv2_ra_logit_with_noise"
apply(logistic)
].return composite_df
def get_swab_p2ra(subset_titles, ra_lists, swab_sample_sizes=DEFAULT_SWAB_SAMPLE_SIZES):
= pd.DataFrame()
df_swab for subset, ras in zip(subset_titles, ra_lists):
= get_logit_normal_samples(ras)
samples = simulate_p2ra_many(samples, swab_sample_sizes, n_simulations=10000)
df "Subset"] = subset
df[= pd.concat(
df_swab
[
df_swab,
df.melt(="Subset",
id_vars=swab_sample_sizes,
value_vars="Sample Size",
var_name="Relative Abundance",
value_name
),
]
)
return df_swab
def get_logit_normal_samples(ras):
= np.array(ras)
ra_values = (ra_values == 0).mean()
zero_share = ra_values[ra_values != 0]
ra_values = logit(ra_values)
logit_ra_values = np.mean(logit_ra_values), np.std(logit_ra_values)
mean, std = norm(loc=mean, scale=std)
norm_dist = norm_dist.rvs(size=int(100000 * (1 - zero_share)))
logit_samples = logistic(logit_samples)
samples = np.append(samples, np.zeros(int(100000 * zero_share)))
samples
np.random.shuffle(samples)return samples
def get_prevalence():
= 7
days_in_week = 2 ** (1 / DOUBLING_PERIOD_D)
growth_factor = growth_factor**days_in_week
weekly_growth = 0.01
weekly_incidence = weekly_incidence * (weekly_growth) / (weekly_growth - 1)
prevalence
if DEBUG:
# Validate that calculated prevalence gives correct weekly incidence
# Prevalence on each past day in the previous week
= [
past_prevalence / growth_factor**x for x in range(days_in_week + 1)
prevalence # +1 to include prevalence on day 0
] # Incidence on each past day in the previous week
= -np.diff(past_prevalence)
past_incidence
assert np.isclose(sum(past_incidence), weekly_incidence)
return prevalence
def simulate_p2ra_many(ra_lists, sample_populations, n_simulations) -> pd.DataFrame:
= get_prevalence()
prevalence = defaultdict(list)
results for sample_pop in sample_populations:
for _ in range(n_simulations):
results[sample_pop].append(simulate_p2ra(sample_pop, ra_lists, prevalence))for key, values in results.items():
= sorted(values)
results[key] = pd.DataFrame(results)
df return df
def simulate_p2ra(sample_pop, ra_lists, prevalence):
= np.random.poisson(sample_pop * prevalence)
n_sick if n_sick == 0:
return 0
= 0
cumulative_ra_sick for _ in range(n_sick):
= np.random.choice(ra_lists)
observation += observation
cumulative_ra_sick = cumulative_ra_sick / n_sick
individual_ra_sick = n_sick / sample_pop * individual_ra_sick
relative_abundance return relative_abundance
def get_fig_2_dfs():
= pd.concat([study_p2ra_df, composite_p2ra_df], ignore_index=True)
df_swab
= pd.DataFrame(
df_ww
{"Relative Abundance": rothman_p2ra_ras + crits_christoph_p2ra_ras,
"Subset": ["Rothman et al. 2021"] * len(rothman_p2ra_ras)
+ ["Crits-Christoph et al. 2021"] * len(crits_christoph_p2ra_ras),
}
)return df_swab, df_ww
def get_fig_7_dfs():
= pd.concat([composite_p2ra_df, adjusted_composite_p2ra_df])
df_swab =True, inplace=True)
df_swab.reset_index(drop= pd.DataFrame(
df_ww
{"Relative Abundance": rothman_p2ra_ras + crits_christoph_p2ra_ras,
"Subset": ["Rothman et al. 2021"] * len(rothman_p2ra_ras)
+ ["Crits-Christoph et al. 2021"] * len(crits_christoph_p2ra_ras),
}
)
return df_swab, df_ww
def get_ww_p2ra():
= pd.read_csv(
df_wastewater "../data/2024-06-17-swab-sensitivity/2024-05-07-fits-subset.tsv", sep="\t"
# Original fits are taken from p2ra-manuscript. Based on the v1 pipeline (https://github.com/naobservatory/mgs-pipeline). Subset to be small enough for Github with ../scripts/subset_fits_tsv.py.
) = df_wastewater[
rothman_p2ra_ras "study"] == "rothman")
(df_wastewater[& (df_wastewater["location"] == "Overall")
& (df_wastewater["pathogen"] == "sars_cov_2")
"ra_at_1in100"].tolist()
][= df_wastewater[
crits_christoph_p2ra_ras "study"] == "crits_christoph")
(df_wastewater[& (df_wastewater["location"] == "Overall")
& (df_wastewater["pathogen"] == "sars_cov_2")
"ra_at_1in100"].tolist()
][
= {
ww_p2ra "Rothman et al. 2021": rothman_p2ra_ras,
"Crits-Christoph et al. 2021": crits_christoph_p2ra_ras,
}return ww_p2ra
def plot_medians(ax, medians):
= ax.collections
patches for patch, median in zip(patches, medians):
for path in patch.get_paths():
= path.vertices
vertices = median
x_mid = min(vertices[:, 0], key=lambda x: abs(x - x_mid))
closest_x = max(vertices[vertices[:, 0] == closest_x, 1])
y_upper = min(vertices[vertices[:, 0] == closest_x, 1])
y_lower ="white", linewidth=1.5)
ax.plot([x_mid, x_mid], [y_lower, y_upper], color
# Generate p2ra values one time before figures are created.
= get_studies().values()
study_dfs = get_composite_p2ra()
composite_p2ra_df = get_adjusted_composite_p2ra()
adjusted_composite_p2ra_df = get_study_p2ra()
study_p2ra_df = get_ww_p2ra()
ww_p2ra_dfs = ww_p2ra_dfs["Rothman et al. 2021"]
rothman_p2ra_ras = ww_p2ra_dfs["Crits-Christoph et al. 2021"]
crits_christoph_p2ra_ras
= get_asymptomatic_factor()
ASYMPTOMATIC_ADJUSTMENT_FACTOR = f"{ASYMPTOMATIC_ADJUSTMENT_FACTOR:.2f}"
rounded_asymptomatic_factor
= f"{int(ASYMPTOMATIC_SHARE * 100)}%"
asymptomatic_share_percentage = f"{int((1 - ASYMPTOMATIC_SHARE) * 100)}%"
symptomatic_share_percentage
= get_prevalence() prevalence
```