import logging
import shutil
import xml.etree.ElementTree as ET
from pathlib import Path
import numpy as np
import uproot
from pyhf.mixins import _ChannelSummaryMixin
from pyhf.schema import path as schema_path
_ROOT_DATA_FILE = None
log = logging.getLogger(__name__)
__all__ = [
"build_channel",
"build_data",
"build_measurement",
"build_modifier",
"build_sample",
"indent",
]
def __dir__():
return __all__
# 'spec' gets passed through all functions as NormFactor is a unique case of having
# parameter configurations stored at the modifier-definition-spec level. This means
# that build_modifier() needs access to the measurements. The call stack is:
#
# writexml
# ->build_channel
# ->build_sample
# ->build_modifier
#
# Therefore, 'spec' needs to be threaded through all these calls.
def _make_hist_name(channel, sample, modifier="", prefix="hist", suffix=""):
middle = "_".join(filter(lambda x: x, [channel, sample, modifier]))
return f"{prefix}{middle}{suffix}"
def _export_root_histogram(hist_name, data):
if hist_name in _ROOT_DATA_FILE:
msg = f"Duplicate key {hist_name} being written."
raise KeyError(msg)
_ROOT_DATA_FILE[hist_name] = uproot.to_writable(
(np.asarray(data), np.arange(len(data) + 1))
)
# https://stackoverflow.com/a/4590052
[docs]
def indent(elem, level=0):
i = "\n" + level * " "
if elem is not None:
if not elem.text or not elem.text.strip():
elem.text = i + " "
if not elem.tail or not elem.tail.strip():
elem.tail = i
for subelem in elem:
indent(subelem, level + 1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
elif level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
[docs]
def build_measurement(measurementspec, modifiertypes):
"""
Build the XML measurement specification for a given measurement adhering to defs.json/#definitions/measurement.
Args:
measurementspec (:obj:`dict`): The measurements specification from a :class:`~pyhf.workspace.Workspace`.
modifiertypes (:obj:`dict`): A mapping from modifier name (:obj:`str`) to modifier type (:obj:`str`).
Returns:
:class:`xml.etree.cElementTree.Element`: The XML measurement specification.
"""
# need to determine prefixes
prefixes = {
"normsys": "alpha_",
"histosys": "alpha_",
"shapesys": "gamma_",
"staterror": "gamma_",
}
config = measurementspec["config"]
name = measurementspec["name"]
poi = config["poi"]
# we want to know which parameters are fixed (constant)
# and to additionally extract the luminosity information
fixed_params = []
lumi = 1.0
lumierr = 0.0
for parameter in config["parameters"]:
if parameter.get("fixed", False):
pname = parameter["name"]
if pname == "lumi":
fixed_params.append("Lumi")
else:
prefix = prefixes.get(modifiertypes[pname], "")
fixed_params.append(f"{prefix}{pname}")
# we found luminosity, so handle it
if parameter["name"] == "lumi":
lumi = parameter["auxdata"][0]
lumierr = parameter["sigmas"][0]
# define measurement
meas = ET.Element(
"Measurement",
Name=name,
Lumi=str(lumi),
LumiRelErr=str(lumierr),
ExportOnly=str(True),
)
poiel = ET.Element("POI")
poiel.text = poi
meas.append(poiel)
# add fixed parameters (constant)
if fixed_params:
se = ET.Element("ParamSetting", Const="True")
se.text = " ".join(fixed_params)
meas.append(se)
return meas
[docs]
def build_modifier(spec, modifierspec, channelname, samplename, sampledata):
if modifierspec["name"] == "lumi":
return None
mod_map = {
"histosys": "HistoSys",
"staterror": "StatError",
"normsys": "OverallSys",
"shapesys": "ShapeSys",
"normfactor": "NormFactor",
"shapefactor": "ShapeFactor",
}
attrs = {"Name": modifierspec["name"]}
if modifierspec["type"] == "histosys":
attrs["HistoNameLow"] = _make_hist_name(
channelname, samplename, modifierspec["name"], suffix="Low"
)
attrs["HistoNameHigh"] = _make_hist_name(
channelname, samplename, modifierspec["name"], suffix="High"
)
_export_root_histogram(attrs["HistoNameLow"], modifierspec["data"]["lo_data"])
_export_root_histogram(attrs["HistoNameHigh"], modifierspec["data"]["hi_data"])
elif modifierspec["type"] == "normsys":
attrs["High"] = str(modifierspec["data"]["hi"])
attrs["Low"] = str(modifierspec["data"]["lo"])
elif modifierspec["type"] == "normfactor":
# NB: only look at first measurement for normfactor configs. In order
# to dump as HistFactory XML, this has to be the same for all
# measurements or it will not work correctly. Why?
#
# Unlike other modifiers, NormFactor has the unique circumstance of
# defining its parameter configurations at the modifier level inside
# the channel specification, instead of at the measurement level, like
# all of the other modifiers.
#
# However, since I strive for perfection, the "Const" attribute will
# never be set here, but at the per-measurement configuration instead
# like all other parameters. This is an acceptable compromise.
#
# Lastly, if a normfactor parameter configuration doesn't exist in the
# first measurement parameter configuration, then set defaults.
val = 1
low = 0
high = 10
for p in spec["measurements"][0]["config"]["parameters"]:
if p["name"] == modifierspec["name"]:
val = p.get("inits", [val])[0]
low, high = p.get("bounds", [[low, high]])[0]
attrs["Val"] = str(val)
attrs["Low"] = str(low)
attrs["High"] = str(high)
elif modifierspec["type"] == "staterror":
attrs["Activate"] = "True"
attrs["HistoName"] = _make_hist_name(
channelname, samplename, modifierspec["name"]
)
# must be deleted, HiFa XML specification does not support 'Name'
del attrs["Name"]
# need to make this a relative uncertainty stored in ROOT file
_export_root_histogram(
attrs["HistoName"],
np.divide(
modifierspec["data"],
sampledata,
out=np.zeros_like(sampledata),
where=np.asarray(sampledata) != 0,
dtype="float",
).tolist(),
)
elif modifierspec["type"] == "shapesys":
attrs["ConstraintType"] = "Poisson"
attrs["HistoName"] = _make_hist_name(
channelname, samplename, modifierspec["name"]
)
# need to make this a relative uncertainty stored in ROOT file
_export_root_histogram(
attrs["HistoName"],
[
np.divide(
a, b, out=np.zeros_like(a), where=np.asarray(b) != 0, dtype="float"
)
for a, b in np.array(
(modifierspec["data"], sampledata), dtype="float"
).T
],
)
elif modifierspec["type"] == "shapefactor":
pass
else:
log.warning(
"Skipping modifier %s(%s) for now",
modifierspec["name"],
modifierspec["type"],
)
return None
return ET.Element(mod_map[modifierspec["type"]], **attrs)
[docs]
def build_sample(spec, samplespec, channelname):
histname = _make_hist_name(channelname, samplespec["name"])
attrs = {
"Name": samplespec["name"],
"HistoName": histname,
"InputFile": _ROOT_DATA_FILE.file_path,
"NormalizeByTheory": "False",
}
sample = ET.Element("Sample", **attrs)
for modspec in samplespec["modifiers"]:
# if lumi modifier added for this sample, need to set NormalizeByTheory
if modspec["type"] == "lumi":
sample.attrib.update({"NormalizeByTheory": "True"})
modifier = build_modifier(
spec, modspec, channelname, samplespec["name"], samplespec["data"]
)
if modifier is not None:
sample.append(modifier)
_export_root_histogram(histname, samplespec["data"])
return sample
[docs]
def build_data(obsspec, channelname):
histname = _make_hist_name(channelname, "data")
data = ET.Element("Data", HistoName=histname, InputFile=_ROOT_DATA_FILE.file_path)
observation = next((obs for obs in obsspec if obs["name"] == channelname), None)
_export_root_histogram(histname, observation["data"])
return data
[docs]
def build_channel(spec, channelspec, obsspec):
channel = ET.Element(
"Channel", Name=channelspec["name"], InputFile=_ROOT_DATA_FILE.file_path
)
if obsspec:
data = build_data(obsspec, channelspec["name"])
channel.append(data)
for samplespec in channelspec["samples"]:
channel.append(build_sample(spec, samplespec, channelspec["name"]))
return channel
def writexml(spec, specdir, data_rootdir, resultprefix):
global _ROOT_DATA_FILE
shutil.copyfile(
schema_path.joinpath("HistFactorySchema.dtd"),
Path(specdir).parent.joinpath("HistFactorySchema.dtd"),
)
combination = ET.Element(
"Combination", OutputFilePrefix=str(Path(specdir).joinpath(resultprefix))
)
with uproot.recreate(Path(data_rootdir).joinpath("data.root")) as _ROOT_DATA_FILE:
for channelspec in spec["channels"]:
channelfilepath = Path(specdir).joinpath(
f"{resultprefix}_{channelspec['name']}.xml"
)
with channelfilepath.open("w", encoding="utf-8") as channelfile:
channel = build_channel(spec, channelspec, spec.get("observations"))
indent(channel)
channelfile.write(
"<!DOCTYPE Channel SYSTEM '../HistFactorySchema.dtd'>\n\n"
)
channelfile.write(
ET.tostring(channel, encoding="utf-8").decode("utf-8")
)
inp = ET.Element("Input")
inp.text = str(channelfilepath)
combination.append(inp)
# need information about modifier types to get the right prefix in measurement
mixin = _ChannelSummaryMixin(channels=spec["channels"])
for measurement in spec["measurements"]:
combination.append(build_measurement(measurement, dict(mixin.modifiers)))
indent(combination)
return b"<!DOCTYPE Combination SYSTEM 'HistFactorySchema.dtd'>\n\n" + ET.tostring(
combination, encoding="utf-8"
)