import plotly.graph_objs as go
import plotly.io as pio
from collections import namedtuple, OrderedDict
from ._special_inputs import IdentityMap, Constant, Range
from .trendline_functions import ols, lowess, rolling, expanding, ewm
from _plotly_utils.basevalidators import ColorscaleValidator
from plotly.colors import qualitative, sequential
import math
import pandas as pd
import numpy as np
from plotly.subplots import (
make_subplots,
_set_trace_grid_reference,
_subplot_type_for_trace_type,
)
NO_COLOR = "px_no_color_constant"
trendline_functions = dict(
lowess=lowess, rolling=rolling, ewm=ewm, expanding=expanding, ols=ols
)
# Declare all supported attributes, across all plot types
direct_attrables = (
["base", "x", "y", "z", "a", "b", "c", "r", "theta", "size", "x_start", "x_end"]
+ ["hover_name", "text", "names", "values", "parents", "wide_cross"]
+ ["ids", "error_x", "error_x_minus", "error_y", "error_y_minus", "error_z"]
+ ["error_z_minus", "lat", "lon", "locations", "animation_group"]
)
array_attrables = ["dimensions", "custom_data", "hover_data", "path", "wide_variable"]
group_attrables = ["animation_frame", "facet_row", "facet_col", "line_group"]
renameable_group_attrables = [
"color", # renamed to marker.color or line.color in infer_config
"symbol", # renamed to marker.symbol in infer_config
"line_dash", # renamed to line.dash in infer_config
"pattern_shape", # renamed to marker.pattern.shape in infer_config
]
all_attrables = (
direct_attrables + array_attrables + group_attrables + renameable_group_attrables
)
cartesians = [go.Scatter, go.Scattergl, go.Bar, go.Funnel, go.Box, go.Violin]
cartesians += [go.Histogram, go.Histogram2d, go.Histogram2dContour]
class PxDefaults(object):
__slots__ = [
"template",
"width",
"height",
"color_discrete_sequence",
"color_discrete_map",
"color_continuous_scale",
"symbol_sequence",
"symbol_map",
"line_dash_sequence",
"line_dash_map",
"pattern_shape_sequence",
"pattern_shape_map",
"size_max",
"category_orders",
"labels",
]
def __init__(self):
self.reset()
def reset(self):
self.template = None
self.width = None
self.height = None
self.color_discrete_sequence = None
self.color_discrete_map = {}
self.color_continuous_scale = None
self.symbol_sequence = None
self.symbol_map = {}
self.line_dash_sequence = None
self.line_dash_map = {}
self.pattern_shape_sequence = None
self.pattern_shape_map = {}
self.size_max = 20
self.category_orders = {}
self.labels = {}
defaults = PxDefaults()
del PxDefaults
MAPBOX_TOKEN = None
def set_mapbox_access_token(token):
"""
Arguments:
token: A Mapbox token to be used in `plotly.express.scatter_mapbox` and \
`plotly.express.line_mapbox` figures. See \
https://docs.mapbox.com/help/how-mapbox-works/access-tokens/ for more details
"""
global MAPBOX_TOKEN
MAPBOX_TOKEN = token
def get_trendline_results(fig):
"""
Extracts fit statistics for trendlines (when applied to figures generated with
the `trendline` argument set to `"ols"`).
Arguments:
fig: the output of a `plotly.express` charting call
Returns:
A `pandas.DataFrame` with a column "px_fit_results" containing the `statsmodels`
results objects, along with columns identifying the subset of the data the
trendline was fit on.
"""
return fig._px_trendlines
Mapping = namedtuple(
"Mapping",
[
"show_in_trace_name",
"grouper",
"val_map",
"sequence",
"updater",
"variable",
"facet",
],
)
TraceSpec = namedtuple("TraceSpec", ["constructor", "attrs", "trace_patch", "marginal"])
def get_label(args, column):
try:
return args["labels"][column]
except Exception:
return column
def invert_label(args, column):
"""Invert mapping.
Find key corresponding to value column in dict args["labels"].
Returns `column` if the value does not exist.
"""
reversed_labels = {value: key for (key, value) in args["labels"].items()}
try:
return reversed_labels[column]
except Exception:
return column
def _is_continuous(df, col_name):
return df[col_name].dtype.kind in "ifc"
def get_decorated_label(args, column, role):
original_label = label = get_label(args, column)
if "histfunc" in args and (
(role == "z")
or (role == "x" and "orientation" in args and args["orientation"] == "h")
or (role == "y" and "orientation" in args and args["orientation"] == "v")
):
histfunc = args["histfunc"] or "count"
if histfunc != "count":
label = "%s of %s" % (histfunc, label)
else:
label = "count"
if "histnorm" in args and args["histnorm"] is not None:
if label == "count":
label = args["histnorm"]
else:
histnorm = args["histnorm"]
if histfunc == "sum":
if histnorm == "probability":
label = "%s of %s" % ("fraction", label)
elif histnorm == "percent":
label = "%s of %s" % (histnorm, label)
else:
label = "%s weighted by %s" % (histnorm, original_label)
elif histnorm == "probability":
label = "%s of sum of %s" % ("fraction", label)
elif histnorm == "percent":
label = "%s of sum of %s" % ("percent", label)
else:
label = "%s of %s" % (histnorm, label)
if "barnorm" in args and args["barnorm"] is not None:
label = "%s (normalized as %s)" % (label, args["barnorm"])
return label
def make_mapping(args, variable):
if variable == "line_group" or variable == "animation_frame":
return Mapping(
show_in_trace_name=False,
grouper=args[variable],
val_map={},
sequence=[""],
variable=variable,
updater=(lambda trace, v: v),
facet=None,
)
if variable == "facet_row" or variable == "facet_col":
letter = "x" if variable == "facet_col" else "y"
return Mapping(
show_in_trace_name=False,
variable=letter,
grouper=args[variable],
val_map={},
sequence=[i for i in range(1, 1000)],
updater=(lambda trace, v: v),
facet="row" if variable == "facet_row" else "col",
)
(parent, variable, *other_variables) = variable.split(".")
vprefix = variable
arg_name = variable
if variable == "color":
vprefix = "color_discrete"
if variable == "dash":
arg_name = "line_dash"
vprefix = "line_dash"
if variable == "pattern":
arg_name = "pattern_shape"
vprefix = "pattern_shape"
if args[vprefix + "_map"] == "identity":
val_map = IdentityMap()
else:
val_map = args[vprefix + "_map"].copy()
return Mapping(
show_in_trace_name=True,
variable=variable,
grouper=args[arg_name],
val_map=val_map,
sequence=args[vprefix + "_sequence"],
updater=lambda trace, v: trace.update(
{parent: {".".join([variable] + other_variables): v}}
),
facet=None,
)
def make_trace_kwargs(args, trace_spec, trace_data, mapping_labels, sizeref):
"""Populates a dict with arguments to update trace
Parameters
----------
args : dict
args to be used for the trace
trace_spec : NamedTuple
which kind of trace to be used (has constructor, marginal etc.
attributes)
trace_data : pandas DataFrame
data
mapping_labels : dict
to be used for hovertemplate
sizeref : float
marker sizeref
Returns
-------
trace_patch : dict
dict to be used to update trace
fit_results : dict
fit information to be used for trendlines
"""
if "line_close" in args and args["line_close"]:
trace_data = trace_data.append(trace_data.iloc[0])
trace_patch = trace_spec.trace_patch.copy() or {}
fit_results = None
hover_header = ""
for attr_name in trace_spec.attrs:
attr_value = args[attr_name]
attr_label = get_decorated_label(args, attr_value, attr_name)
if attr_name == "dimensions":
dims = [
(name, column)
for (name, column) in trace_data.iteritems()
if ((not attr_value) or (name in attr_value))
and (
trace_spec.constructor != go.Parcoords
or _is_continuous(args["data_frame"], name)
)
and (
trace_spec.constructor != go.Parcats
or (attr_value is not None and name in attr_value)
or len(args["data_frame"][name].unique())
<= args["dimensions_max_cardinality"]
)
]
trace_patch["dimensions"] = [
dict(label=get_label(args, name), values=column)
for (name, column) in dims
]
if trace_spec.constructor == go.Splom:
for d in trace_patch["dimensions"]:
d["axis"] = dict(matches=True)
mapping_labels["%{xaxis.title.text}"] = "%{x}"
mapping_labels["%{yaxis.title.text}"] = "%{y}"
elif attr_value is not None:
if attr_name == "size":
if "marker" not in trace_patch:
trace_patch["marker"] = dict()
trace_patch["marker"]["size"] = trace_data[attr_value]
trace_patch["marker"]["sizemode"] = "area"
trace_patch["marker"]["sizeref"] = sizeref
mapping_labels[attr_label] = "%{marker.size}"
elif attr_name == "marginal_x":
if trace_spec.constructor == go.Histogram:
mapping_labels["count"] = "%{y}"
elif attr_name == "marginal_y":
if trace_spec.constructor == go.Histogram:
mapping_labels["count"] = "%{x}"
elif attr_name == "trendline":
if (
args["x"]
and args["y"]
and len(trace_data[[args["x"], args["y"]]].dropna()) > 1
):
# sorting is bad but trace_specs with "trendline" have no other attrs
sorted_trace_data = trace_data.sort_values(by=args["x"])
y = sorted_trace_data[args["y"]].values
x = sorted_trace_data[args["x"]].values
if x.dtype.type == np.datetime64:
x = x.astype(int) / 10 ** 9 # convert to unix epoch seconds
elif x.dtype.type == np.object_:
try:
x = x.astype(np.float64)
except ValueError:
raise ValueError(
"Could not convert value of 'x' ('%s') into a numeric type. "
"If 'x' contains stringified dates, please convert to a datetime column."
% args["x"]
)
if y.dtype.type == np.object_:
try:
y = y.astype(np.float64)
except ValueError:
raise ValueError(
"Could not convert value of 'y' into a numeric type."
)
# preserve original values of "x" in case they're dates
# otherwise numpy/pandas can mess with the timezones
# NB this means trendline functions must output one-to-one with the input series
# i.e. we can't do resampling, because then the X values might not line up!
non_missing = np.logical_not(
np.logical_or(np.isnan(y), np.isnan(x))
)
trace_patch["x"] = sorted_trace_data[args["x"]][non_missing]
trendline_function = trendline_functions[attr_value]
y_out, hover_header, fit_results = trendline_function(
args["trendline_options"],
sorted_trace_data[args["x"]],
x,
y,
args["x"],
args["y"],
non_missing,
)
assert len(y_out) == len(
trace_patch["x"]
), "missing-data-handling failure in trendline code"
trace_patch["y"] = y_out
mapping_labels[get_label(args, args["x"])] = "%{x}"
mapping_labels[get_label(args, args["y"])] = "%{y} (trend)"
elif attr_name.startswith("error"):
error_xy = attr_name[:7]
arr = "arrayminus" if attr_name.endswith("minus") else "array"
if error_xy not in trace_patch:
trace_patch[error_xy] = {}
trace_patch[error_xy][arr] = trace_data[attr_value]
elif attr_name == "custom_data":
if len(attr_value) > 0:
# here we store a data frame in customdata, and it's serialized
# as a list of row lists, which is what we want
trace_patch["customdata"] = trace_data[attr_value]
elif attr_name == "hover_name":
if trace_spec.constructor not in [
go.Histogram,
go.Histogram2d,
go.Histogram2dContour,
]:
trace_patch["hovertext"] = trace_data[attr_value]
if hover_header == "":
hover_header = "%{hovertext}
"
elif attr_name == "hover_data":
if trace_spec.constructor not in [
go.Histogram,
go.Histogram2d,
go.Histogram2dContour,
]:
hover_is_dict = isinstance(attr_value, dict)
customdata_cols = args.get("custom_data") or []
for col in attr_value:
if hover_is_dict and not attr_value[col]:
continue
if col in [
args.get("x"),
args.get("y"),
args.get("z"),
args.get("base"),
]:
continue
try:
position = args["custom_data"].index(col)
except (ValueError, AttributeError, KeyError):
position = len(customdata_cols)
customdata_cols.append(col)
attr_label_col = get_decorated_label(args, col, None)
mapping_labels[attr_label_col] = "%%{customdata[%d]}" % (
position
)
if len(customdata_cols) > 0:
# here we store a data frame in customdata, and it's serialized
# as a list of row lists, which is what we want
trace_patch["customdata"] = trace_data[customdata_cols]
elif attr_name == "color":
if trace_spec.constructor in [go.Choropleth, go.Choroplethmapbox]:
trace_patch["z"] = trace_data[attr_value]
trace_patch["coloraxis"] = "coloraxis1"
mapping_labels[attr_label] = "%{z}"
elif trace_spec.constructor in [
go.Sunburst,
go.Treemap,
go.Icicle,
go.Pie,
go.Funnelarea,
]:
if "marker" not in trace_patch:
trace_patch["marker"] = dict()
if args.get("color_is_continuous"):
trace_patch["marker"]["colors"] = trace_data[attr_value]
trace_patch["marker"]["coloraxis"] = "coloraxis1"
mapping_labels[attr_label] = "%{color}"
else:
trace_patch["marker"]["colors"] = []
if args["color_discrete_map"] is not None:
mapping = args["color_discrete_map"].copy()
else:
mapping = {}
for cat in trace_data[attr_value]:
if mapping.get(cat) is None:
mapping[cat] = args["color_discrete_sequence"][
len(mapping) % len(args["color_discrete_sequence"])
]
trace_patch["marker"]["colors"].append(mapping[cat])
else:
colorable = "marker"
if trace_spec.constructor in [go.Parcats, go.Parcoords]:
colorable = "line"
if colorable not in trace_patch:
trace_patch[colorable] = dict()
trace_patch[colorable]["color"] = trace_data[attr_value]
trace_patch[colorable]["coloraxis"] = "coloraxis1"
mapping_labels[attr_label] = "%%{%s.color}" % colorable
elif attr_name == "animation_group":
trace_patch["ids"] = trace_data[attr_value]
elif attr_name == "locations":
trace_patch[attr_name] = trace_data[attr_value]
mapping_labels[attr_label] = "%{location}"
elif attr_name == "values":
trace_patch[attr_name] = trace_data[attr_value]
_label = "value" if attr_label == "values" else attr_label
mapping_labels[_label] = "%{value}"
elif attr_name == "parents":
trace_patch[attr_name] = trace_data[attr_value]
_label = "parent" if attr_label == "parents" else attr_label
mapping_labels[_label] = "%{parent}"
elif attr_name == "ids":
trace_patch[attr_name] = trace_data[attr_value]
_label = "id" if attr_label == "ids" else attr_label
mapping_labels[_label] = "%{id}"
elif attr_name == "names":
if trace_spec.constructor in [
go.Sunburst,
go.Treemap,
go.Icicle,
go.Pie,
go.Funnelarea,
]:
trace_patch["labels"] = trace_data[attr_value]
_label = "label" if attr_label == "names" else attr_label
mapping_labels[_label] = "%{label}"
else:
trace_patch[attr_name] = trace_data[attr_value]
else:
trace_patch[attr_name] = trace_data[attr_value]
mapping_labels[attr_label] = "%%{%s}" % attr_name
elif (trace_spec.constructor == go.Histogram and attr_name in ["x", "y"]) or (
trace_spec.constructor in [go.Histogram2d, go.Histogram2dContour]
and attr_name == "z"
):
# ensure that stuff like "count" gets into the hoverlabel
mapping_labels[attr_label] = "%%{%s}" % attr_name
if trace_spec.constructor not in [go.Parcoords, go.Parcats]:
# Modify mapping_labels according to hover_data keys
# if hover_data is a dict
mapping_labels_copy = OrderedDict(mapping_labels)
if args["hover_data"] and isinstance(args["hover_data"], dict):
for k, v in mapping_labels.items():
# We need to invert the mapping here
k_args = invert_label(args, k)
if k_args in args["hover_data"]:
formatter = args["hover_data"][k_args][0]
if formatter:
if isinstance(formatter, str):
mapping_labels_copy[k] = v.replace("}", "%s}" % formatter)
else:
_ = mapping_labels_copy.pop(k)
hover_lines = [k + "=" + v for k, v in mapping_labels_copy.items()]
trace_patch["hovertemplate"] = hover_header + "
".join(hover_lines)
trace_patch["hovertemplate"] += ""
return trace_patch, fit_results
def configure_axes(args, constructor, fig, orders):
configurators = {
go.Scatter3d: configure_3d_axes,
go.Scatterternary: configure_ternary_axes,
go.Scatterpolar: configure_polar_axes,
go.Scatterpolargl: configure_polar_axes,
go.Barpolar: configure_polar_axes,
go.Scattermapbox: configure_mapbox,
go.Choroplethmapbox: configure_mapbox,
go.Densitymapbox: configure_mapbox,
go.Scattergeo: configure_geo,
go.Choropleth: configure_geo,
}
for c in cartesians:
configurators[c] = configure_cartesian_axes
if constructor in configurators:
configurators[constructor](args, fig, orders)
def set_cartesian_axis_opts(args, axis, letter, orders):
log_key = "log_" + letter
range_key = "range_" + letter
if log_key in args and args[log_key]:
axis["type"] = "log"
if range_key in args and args[range_key]:
axis["range"] = [math.log(r, 10) for r in args[range_key]]
elif range_key in args and args[range_key]:
axis["range"] = args[range_key]
if args[letter] in orders:
axis["categoryorder"] = "array"
axis["categoryarray"] = (
orders[args[letter]]
if isinstance(axis, go.layout.XAxis)
else list(reversed(orders[args[letter]])) # top down for Y axis
)
def configure_cartesian_marginal_axes(args, fig, orders):
if "histogram" in [args["marginal_x"], args["marginal_y"]]:
fig.layout["barmode"] = "overlay"
nrows = len(fig._grid_ref)
ncols = len(fig._grid_ref[0])
# Set y-axis titles and axis options in the left-most column
for yaxis in fig.select_yaxes(col=1):
set_cartesian_axis_opts(args, yaxis, "y", orders)
# Set x-axis titles and axis options in the bottom-most row
for xaxis in fig.select_xaxes(row=1):
set_cartesian_axis_opts(args, xaxis, "x", orders)
# Configure axis ticks on marginal subplots
if args["marginal_x"]:
fig.update_yaxes(
showticklabels=False, showline=False, ticks="", range=None, row=nrows
)
if args["template"].layout.yaxis.showgrid is None:
fig.update_yaxes(showgrid=args["marginal_x"] == "histogram", row=nrows)
if args["template"].layout.xaxis.showgrid is None:
fig.update_xaxes(showgrid=True, row=nrows)
if args["marginal_y"]:
fig.update_xaxes(
showticklabels=False, showline=False, ticks="", range=None, col=ncols
)
if args["template"].layout.xaxis.showgrid is None:
fig.update_xaxes(showgrid=args["marginal_y"] == "histogram", col=ncols)
if args["template"].layout.yaxis.showgrid is None:
fig.update_yaxes(showgrid=True, col=ncols)
# Add axis titles to non-marginal subplots
y_title = get_decorated_label(args, args["y"], "y")
if args["marginal_x"]:
fig.update_yaxes(title_text=y_title, row=1, col=1)
else:
for row in range(1, nrows + 1):
fig.update_yaxes(title_text=y_title, row=row, col=1)
x_title = get_decorated_label(args, args["x"], "x")
if args["marginal_y"]:
fig.update_xaxes(title_text=x_title, row=1, col=1)
else:
for col in range(1, ncols + 1):
fig.update_xaxes(title_text=x_title, row=1, col=col)
# Configure axis type across all x-axes
if "log_x" in args and args["log_x"]:
fig.update_xaxes(type="log")
# Configure axis type across all y-axes
if "log_y" in args and args["log_y"]:
fig.update_yaxes(type="log")
# Configure matching and axis type for marginal y-axes
matches_y = "y" + str(ncols + 1)
if args["marginal_x"]:
for row in range(2, nrows + 1, 2):
fig.update_yaxes(matches=matches_y, type=None, row=row)
if args["marginal_y"]:
for col in range(2, ncols + 1, 2):
fig.update_xaxes(matches="x2", type=None, col=col)
def configure_cartesian_axes(args, fig, orders):
if ("marginal_x" in args and args["marginal_x"]) or (
"marginal_y" in args and args["marginal_y"]
):
configure_cartesian_marginal_axes(args, fig, orders)
return
# Set y-axis titles and axis options in the left-most column
y_title = get_decorated_label(args, args["y"], "y")
for yaxis in fig.select_yaxes(col=1):
yaxis.update(title_text=y_title)
set_cartesian_axis_opts(args, yaxis, "y", orders)
# Set x-axis titles and axis options in the bottom-most row
x_title = get_decorated_label(args, args["x"], "x")
for xaxis in fig.select_xaxes(row=1):
if "is_timeline" not in args:
xaxis.update(title_text=x_title)
set_cartesian_axis_opts(args, xaxis, "x", orders)
# Configure axis type across all x-axes
if "log_x" in args and args["log_x"]:
fig.update_xaxes(type="log")
# Configure axis type across all y-axes
if "log_y" in args and args["log_y"]:
fig.update_yaxes(type="log")
if "is_timeline" in args:
fig.update_xaxes(type="date")
if "ecdfmode" in args:
if args["orientation"] == "v":
fig.update_yaxes(rangemode="tozero")
else:
fig.update_xaxes(rangemode="tozero")
def configure_ternary_axes(args, fig, orders):
fig.update_ternaries(
aaxis=dict(title_text=get_label(args, args["a"])),
baxis=dict(title_text=get_label(args, args["b"])),
caxis=dict(title_text=get_label(args, args["c"])),
)
def configure_polar_axes(args, fig, orders):
patch = dict(
angularaxis=dict(direction=args["direction"], rotation=args["start_angle"]),
radialaxis=dict(),
)
for var, axis in [("r", "radialaxis"), ("theta", "angularaxis")]:
if args[var] in orders:
patch[axis]["categoryorder"] = "array"
patch[axis]["categoryarray"] = orders[args[var]]
radialaxis = patch["radialaxis"]
if args["log_r"]:
radialaxis["type"] = "log"
if args["range_r"]:
radialaxis["range"] = [math.log(x, 10) for x in args["range_r"]]
else:
if args["range_r"]:
radialaxis["range"] = args["range_r"]
if args["range_theta"]:
patch["sector"] = args["range_theta"]
fig.update_polars(patch)
def configure_3d_axes(args, fig, orders):
patch = dict(
xaxis=dict(title_text=get_label(args, args["x"])),
yaxis=dict(title_text=get_label(args, args["y"])),
zaxis=dict(title_text=get_label(args, args["z"])),
)
for letter in ["x", "y", "z"]:
axis = patch[letter + "axis"]
if args["log_" + letter]:
axis["type"] = "log"
if args["range_" + letter]:
axis["range"] = [math.log(x, 10) for x in args["range_" + letter]]
else:
if args["range_" + letter]:
axis["range"] = args["range_" + letter]
if args[letter] in orders:
axis["categoryorder"] = "array"
axis["categoryarray"] = orders[args[letter]]
fig.update_scenes(patch)
def configure_mapbox(args, fig, orders):
center = args["center"]
if not center and "lat" in args and "lon" in args:
center = dict(
lat=args["data_frame"][args["lat"]].mean(),
lon=args["data_frame"][args["lon"]].mean(),
)
fig.update_mapboxes(
accesstoken=MAPBOX_TOKEN,
center=center,
zoom=args["zoom"],
style=args["mapbox_style"],
)
def configure_geo(args, fig, orders):
fig.update_geos(
center=args["center"],
scope=args["scope"],
fitbounds=args["fitbounds"],
visible=args["basemap_visible"],
projection=dict(type=args["projection"]),
)
def configure_animation_controls(args, constructor, fig):
def frame_args(duration):
return {
"frame": {"duration": duration, "redraw": constructor != go.Scatter},
"mode": "immediate",
"fromcurrent": True,
"transition": {"duration": duration, "easing": "linear"},
}
if "animation_frame" in args and args["animation_frame"] and len(fig.frames) > 1:
fig.layout.updatemenus = [
{
"buttons": [
{
"args": [None, frame_args(500)],
"label": "▶",
"method": "animate",
},
{
"args": [[None], frame_args(0)],
"label": "◼",
"method": "animate",
},
],
"direction": "left",
"pad": {"r": 10, "t": 70},
"showactive": False,
"type": "buttons",
"x": 0.1,
"xanchor": "right",
"y": 0,
"yanchor": "top",
}
]
fig.layout.sliders = [
{
"active": 0,
"yanchor": "top",
"xanchor": "left",
"currentvalue": {
"prefix": get_label(args, args["animation_frame"]) + "="
},
"pad": {"b": 10, "t": 60},
"len": 0.9,
"x": 0.1,
"y": 0,
"steps": [
{
"args": [[f.name], frame_args(0)],
"label": f.name,
"method": "animate",
}
for f in fig.frames
],
}
]
def make_trace_spec(args, constructor, attrs, trace_patch):
if constructor in [go.Scatter, go.Scatterpolar]:
if "render_mode" in args and (
args["render_mode"] == "webgl"
or (
args["render_mode"] == "auto"
and len(args["data_frame"]) > 1000
and args["animation_frame"] is None
)
):
if constructor == go.Scatter:
constructor = go.Scattergl
if "orientation" in trace_patch:
del trace_patch["orientation"]
else:
constructor = go.Scatterpolargl
# Create base trace specification
result = [TraceSpec(constructor, attrs, trace_patch, None)]
# Add marginal trace specifications
for letter in ["x", "y"]:
if "marginal_" + letter in args and args["marginal_" + letter]:
trace_spec = None
axis_map = dict(
xaxis="x1" if letter == "x" else "x2",
yaxis="y1" if letter == "y" else "y2",
)
if args["marginal_" + letter] == "histogram":
trace_spec = TraceSpec(
constructor=go.Histogram,
attrs=[letter, "marginal_" + letter],
trace_patch=dict(opacity=0.5, bingroup=letter, **axis_map),
marginal=letter,
)
elif args["marginal_" + letter] == "violin":
trace_spec = TraceSpec(
constructor=go.Violin,
attrs=[letter, "hover_name", "hover_data"],
trace_patch=dict(scalegroup=letter),
marginal=letter,
)
elif args["marginal_" + letter] == "box":
trace_spec = TraceSpec(
constructor=go.Box,
attrs=[letter, "hover_name", "hover_data"],
trace_patch=dict(notched=True),
marginal=letter,
)
elif args["marginal_" + letter] == "rug":
symbols = {"x": "line-ns-open", "y": "line-ew-open"}
trace_spec = TraceSpec(
constructor=go.Box,
attrs=[letter, "hover_name", "hover_data"],
trace_patch=dict(
fillcolor="rgba(255,255,255,0)",
line={"color": "rgba(255,255,255,0)"},
boxpoints="all",
jitter=0,
hoveron="points",
marker={"symbol": symbols[letter]},
),
marginal=letter,
)
if "color" in attrs or "color" not in args:
if "marker" not in trace_spec.trace_patch:
trace_spec.trace_patch["marker"] = dict()
first_default_color = args["color_continuous_scale"][0]
trace_spec.trace_patch["marker"]["color"] = first_default_color
result.append(trace_spec)
# Add trendline trace specifications
if args.get("trendline") and args.get("trendline_scope", "trace") == "trace":
result.append(make_trendline_spec(args, constructor))
return result
def make_trendline_spec(args, constructor):
trace_spec = TraceSpec(
constructor=go.Scattergl
if constructor == go.Scattergl # could be contour
else go.Scatter,
attrs=["trendline"],
trace_patch=dict(mode="lines"),
marginal=None,
)
if args["trendline_color_override"]:
trace_spec.trace_patch["line"] = dict(color=args["trendline_color_override"])
return trace_spec
def one_group(x):
return ""
def apply_default_cascade(args):
# first we apply px.defaults to unspecified args
for param in defaults.__slots__:
if param in args and args[param] is None:
args[param] = getattr(defaults, param)
# load the default template if set, otherwise "plotly"
if args["template"] is None:
if pio.templates.default is not None:
args["template"] = pio.templates.default
else:
args["template"] = "plotly"
try:
# retrieve the actual template if we were given a name
args["template"] = pio.templates[args["template"]]
except Exception:
# otherwise try to build a real template
args["template"] = go.layout.Template(args["template"])
# if colors not set explicitly or in px.defaults, defer to a template
# if the template doesn't have one, we set some final fallback defaults
if "color_continuous_scale" in args:
if (
args["color_continuous_scale"] is None
and args["template"].layout.colorscale.sequential
):
args["color_continuous_scale"] = [
x[1] for x in args["template"].layout.colorscale.sequential
]
if args["color_continuous_scale"] is None:
args["color_continuous_scale"] = sequential.Viridis
if "color_discrete_sequence" in args:
if args["color_discrete_sequence"] is None and args["template"].layout.colorway:
args["color_discrete_sequence"] = args["template"].layout.colorway
if args["color_discrete_sequence"] is None:
args["color_discrete_sequence"] = qualitative.D3
# if symbol_sequence/line_dash_sequence not set explicitly or in px.defaults,
# see if we can defer to template. If not, set reasonable defaults
if "symbol_sequence" in args:
if args["symbol_sequence"] is None and args["template"].data.scatter:
args["symbol_sequence"] = [
scatter.marker.symbol for scatter in args["template"].data.scatter
]
if not args["symbol_sequence"] or not any(args["symbol_sequence"]):
args["symbol_sequence"] = ["circle", "diamond", "square", "x", "cross"]
if "line_dash_sequence" in args:
if args["line_dash_sequence"] is None and args["template"].data.scatter:
args["line_dash_sequence"] = [
scatter.line.dash for scatter in args["template"].data.scatter
]
if not args["line_dash_sequence"] or not any(args["line_dash_sequence"]):
args["line_dash_sequence"] = [
"solid",
"dot",
"dash",
"longdash",
"dashdot",
"longdashdot",
]
if "pattern_shape_sequence" in args:
if args["pattern_shape_sequence"] is None and args["template"].data.bar:
args["pattern_shape_sequence"] = [
bar.marker.pattern.shape for bar in args["template"].data.bar
]
if not args["pattern_shape_sequence"] or not any(
args["pattern_shape_sequence"]
):
args["pattern_shape_sequence"] = ["", "/", "\\", "x", "+", "."]
def _check_name_not_reserved(field_name, reserved_names):
if field_name not in reserved_names:
return field_name
else:
raise NameError(
"A name conflict was encountered for argument '%s'. "
"A column or index with name '%s' is ambiguous." % (field_name, field_name)
)
def _get_reserved_col_names(args):
"""
This function builds a list of columns of the data_frame argument used
as arguments, either as str/int arguments or given as columns
(pandas series type).
"""
df = args["data_frame"]
reserved_names = set()
for field in args:
if field not in all_attrables:
continue
names = args[field] if field in array_attrables else [args[field]]
if names is None:
continue
for arg in names:
if arg is None:
continue
elif isinstance(arg, str): # no need to add ints since kw arg are not ints
reserved_names.add(arg)
elif isinstance(arg, pd.Series):
arg_name = arg.name
if arg_name and hasattr(df, arg_name):
in_df = arg is df[arg_name]
if in_df:
reserved_names.add(arg_name)
elif arg is df.index and arg.name is not None:
reserved_names.add(arg.name)
return reserved_names
def _is_col_list(df_input, arg):
"""Returns True if arg looks like it's a list of columns or references to columns
in df_input, and False otherwise (in which case it's assumed to be a single column
or reference to a column).
"""
if arg is None or isinstance(arg, str) or isinstance(arg, int):
return False
if isinstance(arg, pd.MultiIndex):
return False # just to keep existing behaviour for now
try:
iter(arg)
except TypeError:
return False # not iterable
for c in arg:
if isinstance(c, str) or isinstance(c, int):
if df_input is None or c not in df_input.columns:
return False
else:
try:
iter(c)
except TypeError:
return False # not iterable
return True
def _isinstance_listlike(x):
"""Returns True if x is an iterable which can be transformed into a pandas Series,
False for the other types of possible values of a `hover_data` dict.
A tuple of length 2 is a special case corresponding to a (format, data) tuple.
"""
if (
isinstance(x, str)
or (isinstance(x, tuple) and len(x) == 2)
or isinstance(x, bool)
or x is None
):
return False
else:
return True
def _escape_col_name(df_input, col_name, extra):
while df_input is not None and (col_name in df_input.columns or col_name in extra):
col_name = "_" + col_name
return col_name
def to_unindexed_series(x):
"""
assuming x is list-like or even an existing pd.Series, return a new pd.Series with
no index, without extracting the data from an existing Series via numpy, which
seems to mangle datetime columns. Stripping the index from existing pd.Series is
required to get things to match up right in the new DataFrame we're building
"""
return pd.Series(x).reset_index(drop=True)
def process_args_into_dataframe(args, wide_mode, var_name, value_name):
"""
After this function runs, the `all_attrables` keys of `args` all contain only
references to columns of `df_output`. This function handles the extraction of data
from `args["attrable"]` and column-name-generation as appropriate, and adds the
data to `df_output` and then replaces `args["attrable"]` with the appropriate
reference.
"""
df_input = args["data_frame"]
df_provided = df_input is not None
df_output = pd.DataFrame()
constants = dict()
ranges = list()
wide_id_vars = set()
reserved_names = _get_reserved_col_names(args) if df_provided else set()
# Case of functions with a "dimensions" kw: scatter_matrix, parcats, parcoords
if "dimensions" in args and args["dimensions"] is None:
if not df_provided:
raise ValueError(
"No data were provided. Please provide data either with the `data_frame` or with the `dimensions` argument."
)
else:
df_output[df_input.columns] = df_input[df_input.columns]
# hover_data is a dict
hover_data_is_dict = (
"hover_data" in args
and args["hover_data"]
and isinstance(args["hover_data"], dict)
)
# If dict, convert all values of hover_data to tuples to simplify processing
if hover_data_is_dict:
for k in args["hover_data"]:
if _isinstance_listlike(args["hover_data"][k]):
args["hover_data"][k] = (True, args["hover_data"][k])
if not isinstance(args["hover_data"][k], tuple):
args["hover_data"][k] = (args["hover_data"][k], None)
if df_provided and args["hover_data"][k][1] is not None and k in df_input:
raise ValueError(
"Ambiguous input: values for '%s' appear both in hover_data and data_frame"
% k
)
# Loop over possible arguments
for field_name in all_attrables:
# Massaging variables
argument_list = (
[args.get(field_name)]
if field_name not in array_attrables
else args.get(field_name)
)
# argument not specified, continue
if argument_list is None or argument_list is [None]:
continue
# Argument name: field_name if the argument is not a list
# Else we give names like ["hover_data_0, hover_data_1"] etc.
field_list = (
[field_name]
if field_name not in array_attrables
else [field_name + "_" + str(i) for i in range(len(argument_list))]
)
# argument_list and field_list ready, iterate over them
# Core of the loop starts here
for i, (argument, field) in enumerate(zip(argument_list, field_list)):
length = len(df_output)
if argument is None:
continue
col_name = None
# Case of multiindex
if isinstance(argument, pd.MultiIndex):
raise TypeError(
"Argument '%s' is a pandas MultiIndex. "
"pandas MultiIndex is not supported by plotly express "
"at the moment." % field
)
# ----------------- argument is a special value ----------------------
if isinstance(argument, Constant) or isinstance(argument, Range):
col_name = _check_name_not_reserved(
str(argument.label) if argument.label is not None else field,
reserved_names,
)
if isinstance(argument, Constant):
constants[col_name] = argument.value
else:
ranges.append(col_name)
# ----------------- argument is likely a col name ----------------------
elif isinstance(argument, str) or not hasattr(argument, "__len__"):
if (
field_name == "hover_data"
and hover_data_is_dict
and args["hover_data"][str(argument)][1] is not None
):
# hover_data has onboard data
# previously-checked to have no name-conflict with data_frame
col_name = str(argument)
real_argument = args["hover_data"][col_name][1]
if length and len(real_argument) != length:
raise ValueError(
"All arguments should have the same length. "
"The length of hover_data key `%s` is %d, whereas the "
"length of previously-processed arguments %s is %d"
% (
argument,
len(real_argument),
str(list(df_output.columns)),
length,
)
)
df_output[col_name] = to_unindexed_series(real_argument)
elif not df_provided:
raise ValueError(
"String or int arguments are only possible when a "
"DataFrame or an array is provided in the `data_frame` "
"argument. No DataFrame was provided, but argument "
"'%s' is of type str or int." % field
)
# Check validity of column name
elif argument not in df_input.columns:
if wide_mode and argument in (value_name, var_name):
continue
else:
err_msg = (
"Value of '%s' is not the name of a column in 'data_frame'. "
"Expected one of %s but received: %s"
% (field, str(list(df_input.columns)), argument)
)
if argument == "index":
err_msg += "\n To use the index, pass it in directly as `df.index`."
raise ValueError(err_msg)
elif length and len(df_input[argument]) != length:
raise ValueError(
"All arguments should have the same length. "
"The length of column argument `df[%s]` is %d, whereas the "
"length of previously-processed arguments %s is %d"
% (
field,
len(df_input[argument]),
str(list(df_output.columns)),
length,
)
)
else:
col_name = str(argument)
df_output[col_name] = to_unindexed_series(df_input[argument])
# ----------------- argument is likely a column / array / list.... -------
else:
if df_provided and hasattr(argument, "name"):
if argument is df_input.index:
if argument.name is None or argument.name in df_input:
col_name = "index"
else:
col_name = argument.name
col_name = _escape_col_name(
df_input, col_name, [var_name, value_name]
)
else:
if (
argument.name is not None
and argument.name in df_input
and argument is df_input[argument.name]
):
col_name = argument.name
if col_name is None: # numpy array, list...
col_name = _check_name_not_reserved(field, reserved_names)
if length and len(argument) != length:
raise ValueError(
"All arguments should have the same length. "
"The length of argument `%s` is %d, whereas the "
"length of previously-processed arguments %s is %d"
% (field, len(argument), str(list(df_output.columns)), length)
)
df_output[str(col_name)] = to_unindexed_series(argument)
# Finally, update argument with column name now that column exists
assert col_name is not None, (
"Data-frame processing failure, likely due to a internal bug. "
"Please report this to "
"https://github.com/plotly/plotly.py/issues/new and we will try to "
"replicate and fix it."
)
if field_name not in array_attrables:
args[field_name] = str(col_name)
elif isinstance(args[field_name], dict):
pass
else:
args[field_name][i] = str(col_name)
if field_name != "wide_variable":
wide_id_vars.add(str(col_name))
for col_name in ranges:
df_output[col_name] = range(len(df_output))
for col_name in constants:
df_output[col_name] = constants[col_name]
return df_output, wide_id_vars
def build_dataframe(args, constructor):
"""
Constructs a dataframe and modifies `args` in-place.
The argument values in `args` can be either strings corresponding to
existing columns of a dataframe, or data arrays (lists, numpy arrays,
pandas columns, series).
Parameters
----------
args : OrderedDict
arguments passed to the px function and subsequently modified
constructor : graph_object trace class
the trace type selected for this figure
"""
# make copies of all the fields via dict() and list()
for field in args:
if field in array_attrables and args[field] is not None:
args[field] = (
dict(args[field])
if isinstance(args[field], dict)
else list(args[field])
)
# Cast data_frame argument to DataFrame (it could be a numpy array, dict etc.)
df_provided = args["data_frame"] is not None
if df_provided and not isinstance(args["data_frame"], pd.DataFrame):
args["data_frame"] = pd.DataFrame(args["data_frame"])
df_input = args["data_frame"]
# now we handle special cases like wide-mode or x-xor-y specification
# by rearranging args to tee things up for process_args_into_dataframe to work
no_x = args.get("x") is None
no_y = args.get("y") is None
wide_x = False if no_x else _is_col_list(df_input, args["x"])
wide_y = False if no_y else _is_col_list(df_input, args["y"])
wide_mode = False
var_name = None # will likely be "variable" in wide_mode
wide_cross_name = None # will likely be "index" in wide_mode
value_name = None # will likely be "value" in wide_mode
hist2d_types = [go.Histogram2d, go.Histogram2dContour]
hist1d_orientation = constructor == go.Histogram or "ecdfmode" in args
if constructor in cartesians:
if wide_x and wide_y:
raise ValueError(
"Cannot accept list of column references or list of columns for both `x` and `y`."
)
if df_provided and no_x and no_y:
wide_mode = True
if isinstance(df_input.columns, pd.MultiIndex):
raise TypeError(
"Data frame columns is a pandas MultiIndex. "
"pandas MultiIndex is not supported by plotly express "
"at the moment."
)
args["wide_variable"] = list(df_input.columns)
var_name = df_input.columns.name
if var_name in [None, "value", "index"] or var_name in df_input:
var_name = "variable"
if constructor == go.Funnel:
wide_orientation = args.get("orientation") or "h"
else:
wide_orientation = args.get("orientation") or "v"
args["orientation"] = wide_orientation
args["wide_cross"] = None
elif wide_x != wide_y:
wide_mode = True
args["wide_variable"] = args["y"] if wide_y else args["x"]
if df_provided and args["wide_variable"] is df_input.columns:
var_name = df_input.columns.name
if isinstance(args["wide_variable"], pd.Index):
args["wide_variable"] = list(args["wide_variable"])
if var_name in [None, "value", "index"] or (
df_provided and var_name in df_input
):
var_name = "variable"
if hist1d_orientation:
wide_orientation = "v" if wide_x else "h"
else:
wide_orientation = "v" if wide_y else "h"
args["y" if wide_y else "x"] = None
args["wide_cross"] = None
if not no_x and not no_y:
wide_cross_name = "__x__" if wide_y else "__y__"
if wide_mode:
value_name = _escape_col_name(df_input, "value", [])
var_name = _escape_col_name(df_input, var_name, [])
missing_bar_dim = None
if (
constructor in [go.Scatter, go.Bar, go.Funnel] + hist2d_types
and not hist1d_orientation
):
if not wide_mode and (no_x != no_y):
for ax in ["x", "y"]:
if args.get(ax) is None:
args[ax] = df_input.index if df_provided else Range()
if constructor == go.Bar:
missing_bar_dim = ax
else:
if args["orientation"] is None:
args["orientation"] = "v" if ax == "x" else "h"
if wide_mode and wide_cross_name is None:
if no_x != no_y and args["orientation"] is None:
args["orientation"] = "v" if no_x else "h"
if df_provided:
if isinstance(df_input.index, pd.MultiIndex):
raise TypeError(
"Data frame index is a pandas MultiIndex. "
"pandas MultiIndex is not supported by plotly express "
"at the moment."
)
args["wide_cross"] = df_input.index
else:
args["wide_cross"] = Range(
label=_escape_col_name(df_input, "index", [var_name, value_name])
)
no_color = False
if type(args.get("color")) == str and args["color"] == NO_COLOR:
no_color = True
args["color"] = None
# now that things have been prepped, we do the systematic rewriting of `args`
df_output, wide_id_vars = process_args_into_dataframe(
args, wide_mode, var_name, value_name
)
# now that `df_output` exists and `args` contains only references, we complete
# the special-case and wide-mode handling by further rewriting args and/or mutating
# df_output
count_name = _escape_col_name(df_output, "count", [var_name, value_name])
if not wide_mode and missing_bar_dim and constructor == go.Bar:
# now that we've populated df_output, we check to see if the non-missing
# dimension is categorical: if so, then setting the missing dimension to a
# constant 1 is a less-insane thing to do than setting it to the index by
# default and we let the normal auto-orientation-code do its thing later
other_dim = "x" if missing_bar_dim == "y" else "y"
if not _is_continuous(df_output, args[other_dim]):
args[missing_bar_dim] = count_name
df_output[count_name] = 1
else:
# on the other hand, if the non-missing dimension is continuous, then we
# can use this information to override the normal auto-orientation code
if args["orientation"] is None:
args["orientation"] = "v" if missing_bar_dim == "x" else "h"
if constructor in hist2d_types:
del args["orientation"]
if wide_mode:
# at this point, `df_output` is semi-long/semi-wide, but we know which columns
# are which, so we melt it and reassign `args` to refer to the newly-tidy
# columns, keeping track of various names and manglings set up above
wide_value_vars = [c for c in args["wide_variable"] if c not in wide_id_vars]
del args["wide_variable"]
if wide_cross_name == "__x__":
wide_cross_name = args["x"]
elif wide_cross_name == "__y__":
wide_cross_name = args["y"]
else:
wide_cross_name = args["wide_cross"]
del args["wide_cross"]
dtype = None
for v in wide_value_vars:
v_dtype = df_output[v].dtype.kind
v_dtype = "number" if v_dtype in ["i", "f", "u"] else v_dtype
if dtype is None:
dtype = v_dtype
elif dtype != v_dtype:
raise ValueError(
"Plotly Express cannot process wide-form data with columns of different type."
)
df_output = df_output.melt(
id_vars=wide_id_vars,
value_vars=wide_value_vars,
var_name=var_name,
value_name=value_name,
)
assert len(df_output.columns) == len(set(df_output.columns)), (
"Wide-mode name-inference failure, likely due to a internal bug. "
"Please report this to "
"https://github.com/plotly/plotly.py/issues/new and we will try to "
"replicate and fix it."
)
df_output[var_name] = df_output[var_name].astype(str)
orient_v = wide_orientation == "v"
if hist1d_orientation:
args["x" if orient_v else "y"] = value_name
args["y" if orient_v else "x"] = wide_cross_name
args["color"] = args["color"] or var_name
elif constructor in [go.Scatter, go.Funnel] + hist2d_types:
args["x" if orient_v else "y"] = wide_cross_name
args["y" if orient_v else "x"] = value_name
if constructor != go.Histogram2d:
args["color"] = args["color"] or var_name
if "line_group" in args:
args["line_group"] = args["line_group"] or var_name
elif constructor == go.Bar:
if _is_continuous(df_output, value_name):
args["x" if orient_v else "y"] = wide_cross_name
args["y" if orient_v else "x"] = value_name
args["color"] = args["color"] or var_name
else:
args["x" if orient_v else "y"] = value_name
args["y" if orient_v else "x"] = count_name
df_output[count_name] = 1
args["color"] = args["color"] or var_name
elif constructor in [go.Violin, go.Box]:
args["x" if orient_v else "y"] = wide_cross_name or var_name
args["y" if orient_v else "x"] = value_name
if hist1d_orientation and constructor == go.Scatter:
if args["x"] is not None and args["y"] is not None:
args["histfunc"] = "sum"
elif args["x"] is None:
args["histfunc"] = None
args["orientation"] = "h"
args["x"] = count_name
df_output[count_name] = 1
else:
args["histfunc"] = None
args["orientation"] = "v"
args["y"] = count_name
df_output[count_name] = 1
if no_color:
args["color"] = None
args["data_frame"] = df_output
return args
def _check_dataframe_all_leaves(df):
df_sorted = df.sort_values(by=list(df.columns))
null_mask = df_sorted.isnull()
df_sorted = df_sorted.astype(str)
null_indices = np.nonzero(null_mask.any(axis=1).values)[0]
for null_row_index in null_indices:
row = null_mask.iloc[null_row_index]
i = np.nonzero(row.values)[0][0]
if not row[i:].all():
raise ValueError(
"None entries cannot have not-None children",
df_sorted.iloc[null_row_index],
)
df_sorted[null_mask] = ""
row_strings = list(df_sorted.apply(lambda x: "".join(x), axis=1))
for i, row in enumerate(row_strings[:-1]):
if row_strings[i + 1] in row and (i + 1) in null_indices:
raise ValueError(
"Non-leaves rows are not permitted in the dataframe \n",
df_sorted.iloc[i + 1],
"is not a leaf.",
)
def process_dataframe_hierarchy(args):
"""
Build dataframe for sunburst, treemap, or icicle when the path argument is provided.
"""
df = args["data_frame"]
path = args["path"][::-1]
_check_dataframe_all_leaves(df[path[::-1]])
discrete_color = False
new_path = []
for col_name in path:
new_col_name = col_name + "_path_copy"
new_path.append(new_col_name)
df[new_col_name] = df[col_name]
path = new_path
# ------------ Define aggregation functions --------------------------------
def aggfunc_discrete(x):
uniques = x.unique()
if len(uniques) == 1:
return uniques[0]
else:
return "(?)"
agg_f = {}
aggfunc_color = None
if args["values"]:
try:
df[args["values"]] = pd.to_numeric(df[args["values"]])
except ValueError:
raise ValueError(
"Column `%s` of `df` could not be converted to a numerical data type."
% args["values"]
)
if args["color"]:
if args["color"] == args["values"]:
new_value_col_name = args["values"] + "_sum"
df[new_value_col_name] = df[args["values"]]
args["values"] = new_value_col_name
count_colname = args["values"]
else:
# we need a count column for the first groupby and the weighted mean of color
# trick to be sure the col name is unused: take the sum of existing names
count_colname = (
"count"
if "count" not in df.columns
else "".join([str(el) for el in list(df.columns)])
)
# we can modify df because it's a copy of the px argument
df[count_colname] = 1
args["values"] = count_colname
agg_f[count_colname] = "sum"
if args["color"]:
if not _is_continuous(df, args["color"]):
aggfunc_color = aggfunc_discrete
discrete_color = True
else:
def aggfunc_continuous(x):
return np.average(x, weights=df.loc[x.index, count_colname])
aggfunc_color = aggfunc_continuous
agg_f[args["color"]] = aggfunc_color
# Other columns (for color, hover_data, custom_data etc.)
cols = list(set(df.columns).difference(path))
for col in cols: # for hover_data, custom_data etc.
if col not in agg_f:
agg_f[col] = aggfunc_discrete
# Avoid collisions with reserved names - columns in the path have been copied already
cols = list(set(cols) - set(["labels", "parent", "id"]))
# ----------------------------------------------------------------------------
df_all_trees = pd.DataFrame(columns=["labels", "parent", "id"] + cols)
# Set column type here (useful for continuous vs discrete colorscale)
for col in cols:
df_all_trees[col] = df_all_trees[col].astype(df[col].dtype)
for i, level in enumerate(path):
df_tree = pd.DataFrame(columns=df_all_trees.columns)
dfg = df.groupby(path[i:]).agg(agg_f)
dfg = dfg.reset_index()
# Path label massaging
df_tree["labels"] = dfg[level].copy().astype(str)
df_tree["parent"] = ""
df_tree["id"] = dfg[level].copy().astype(str)
if i < len(path) - 1:
j = i + 1
while j < len(path):
df_tree["parent"] = (
dfg[path[j]].copy().astype(str) + "/" + df_tree["parent"]
)
df_tree["id"] = dfg[path[j]].copy().astype(str) + "/" + df_tree["id"]
j += 1
df_tree["parent"] = df_tree["parent"].str.rstrip("/")
if cols:
df_tree[cols] = dfg[cols]
df_all_trees = df_all_trees.append(df_tree, ignore_index=True)
# we want to make sure than (?) is the first color of the sequence
if args["color"] and discrete_color:
sort_col_name = "sort_color_if_discrete_color"
while sort_col_name in df_all_trees.columns:
sort_col_name += "0"
df_all_trees[sort_col_name] = df[args["color"]].astype(str)
df_all_trees = df_all_trees.sort_values(by=sort_col_name)
# Now modify arguments
args["data_frame"] = df_all_trees
args["path"] = None
args["ids"] = "id"
args["names"] = "labels"
args["parents"] = "parent"
if args["color"]:
if not args["hover_data"]:
args["hover_data"] = [args["color"]]
elif isinstance(args["hover_data"], dict):
if not args["hover_data"].get(args["color"]):
args["hover_data"][args["color"]] = (True, None)
else:
args["hover_data"].append(args["color"])
return args
def process_dataframe_timeline(args):
"""
Massage input for bar traces for px.timeline()
"""
args["is_timeline"] = True
if args["x_start"] is None or args["x_end"] is None:
raise ValueError("Both x_start and x_end are required")
try:
x_start = pd.to_datetime(args["data_frame"][args["x_start"]])
x_end = pd.to_datetime(args["data_frame"][args["x_end"]])
except (ValueError, TypeError):
raise TypeError(
"Both x_start and x_end must refer to data convertible to datetimes."
)
# note that we are not adding any columns to the data frame here, so no risk of overwrite
args["data_frame"][args["x_end"]] = (x_end - x_start).astype("timedelta64[ms]")
args["x"] = args["x_end"]
del args["x_end"]
args["base"] = args["x_start"]
del args["x_start"]
return args
def infer_config(args, constructor, trace_patch, layout_patch):
attrs = [k for k in direct_attrables + array_attrables if k in args]
grouped_attrs = []
# Compute sizeref
sizeref = 0
if "size" in args and args["size"]:
sizeref = args["data_frame"][args["size"]].max() / args["size_max"] ** 2
# Compute color attributes and grouping attributes
if "color" in args:
if "color_continuous_scale" in args:
if "color_discrete_sequence" not in args:
attrs.append("color")
else:
if args["color"] and _is_continuous(args["data_frame"], args["color"]):
attrs.append("color")
args["color_is_continuous"] = True
elif constructor in [go.Sunburst, go.Treemap, go.Icicle]:
attrs.append("color")
args["color_is_continuous"] = False
else:
grouped_attrs.append("marker.color")
elif "line_group" in args or constructor == go.Histogram2dContour:
grouped_attrs.append("line.color")
elif constructor in [go.Pie, go.Funnelarea]:
attrs.append("color")
if args["color"]:
if args["hover_data"] is None:
args["hover_data"] = []
args["hover_data"].append(args["color"])
else:
grouped_attrs.append("marker.color")
show_colorbar = bool(
"color" in attrs
and args["color"]
and constructor not in [go.Pie, go.Funnelarea]
and (
constructor not in [go.Treemap, go.Sunburst, go.Icicle]
or args.get("color_is_continuous")
)
)
else:
show_colorbar = False
if "line_dash" in args:
grouped_attrs.append("line.dash")
if "symbol" in args:
grouped_attrs.append("marker.symbol")
if "pattern_shape" in args:
grouped_attrs.append("marker.pattern.shape")
if "orientation" in args:
has_x = args["x"] is not None
has_y = args["y"] is not None
if args["orientation"] is None:
if constructor in [go.Histogram, go.Scatter]:
if has_y and not has_x:
args["orientation"] = "h"
elif constructor in [go.Violin, go.Box, go.Bar, go.Funnel]:
if has_x and not has_y:
args["orientation"] = "h"
if args["orientation"] is None and has_x and has_y:
x_is_continuous = _is_continuous(args["data_frame"], args["x"])
y_is_continuous = _is_continuous(args["data_frame"], args["y"])
if x_is_continuous and not y_is_continuous:
args["orientation"] = "h"
if y_is_continuous and not x_is_continuous:
args["orientation"] = "v"
if args["orientation"] is None:
args["orientation"] = "v"
if constructor == go.Histogram:
if has_x and has_y and args["histfunc"] is None:
args["histfunc"] = trace_patch["histfunc"] = "sum"
orientation = args["orientation"]
nbins = args["nbins"]
trace_patch["nbinsx"] = nbins if orientation == "v" else None
trace_patch["nbinsy"] = None if orientation == "v" else nbins
trace_patch["bingroup"] = "x" if orientation == "v" else "y"
trace_patch["orientation"] = args["orientation"]
if constructor in [go.Violin, go.Box]:
mode = "boxmode" if constructor == go.Box else "violinmode"
if layout_patch[mode] is None and args["color"] is not None:
if args["y"] == args["color"] and args["orientation"] == "h":
layout_patch[mode] = "overlay"
elif args["x"] == args["color"] and args["orientation"] == "v":
layout_patch[mode] = "overlay"
if layout_patch[mode] is None:
layout_patch[mode] = "group"
if (
constructor == go.Histogram2d
and args["z"] is not None
and args["histfunc"] is None
):
args["histfunc"] = trace_patch["histfunc"] = "sum"
if args.get("text_auto", False) is not False:
if constructor in [go.Histogram2d, go.Histogram2dContour]:
letter = "z"
elif constructor == go.Bar:
letter = "y" if args["orientation"] == "v" else "x"
else:
letter = "value"
if args["text_auto"] is True:
trace_patch["texttemplate"] = "%{" + letter + "}"
else:
trace_patch["texttemplate"] = "%{" + letter + ":" + args["text_auto"] + "}"
if constructor in [go.Histogram2d, go.Densitymapbox]:
show_colorbar = True
trace_patch["coloraxis"] = "coloraxis1"
if "opacity" in args:
if args["opacity"] is None:
if "barmode" in args and args["barmode"] == "overlay":
trace_patch["marker"] = dict(opacity=0.5)
elif constructor in [go.Densitymapbox, go.Pie, go.Funnel, go.Funnelarea]:
trace_patch["opacity"] = args["opacity"]
else:
trace_patch["marker"] = dict(opacity=args["opacity"])
if (
"line_group" in args or "line_dash" in args
): # px.line, px.line_*, px.area, px.ecdf
modes = set()
if args.get("lines", True):
modes.add("lines")
if args.get("text") or args.get("symbol") or args.get("markers"):
modes.add("markers")
if args.get("text"):
modes.add("text")
if len(modes) == 0:
modes.add("lines")
trace_patch["mode"] = "+".join(modes)
elif constructor != go.Splom and (
"symbol" in args or constructor == go.Scattermapbox
):
trace_patch["mode"] = "markers" + ("+text" if args["text"] else "")
if "line_shape" in args:
trace_patch["line"] = dict(shape=args["line_shape"])
elif "ecdfmode" in args:
trace_patch["line"] = dict(
shape="vh" if args["ecdfmode"] == "reversed" else "hv"
)
if "geojson" in args:
trace_patch["featureidkey"] = args["featureidkey"]
trace_patch["geojson"] = (
args["geojson"]
if not hasattr(args["geojson"], "__geo_interface__") # for geopandas
else args["geojson"].__geo_interface__
)
# Compute marginal attribute: copy to appropriate marginal_*
if "marginal" in args:
position = "marginal_x" if args["orientation"] == "v" else "marginal_y"
other_position = "marginal_x" if args["orientation"] == "h" else "marginal_y"
args[position] = args["marginal"]
args[other_position] = None
# If both marginals and faceting are specified, faceting wins
if args.get("facet_col") is not None and args.get("marginal_y") is not None:
args["marginal_y"] = None
if args.get("facet_row") is not None and args.get("marginal_x") is not None:
args["marginal_x"] = None
# facet_col_wrap only works if no marginals or row faceting is used
if (
args.get("marginal_x") is not None
or args.get("marginal_y") is not None
or args.get("facet_row") is not None
):
args["facet_col_wrap"] = 0
if "trendline" in args and args["trendline"] is not None:
if args["trendline"] not in trendline_functions:
raise ValueError(
"Value '%s' for `trendline` must be one of %s"
% (args["trendline"], trendline_functions.keys())
)
if "trendline_options" in args and args["trendline_options"] is None:
args["trendline_options"] = dict()
if "ecdfnorm" in args:
if args.get("ecdfnorm", None) not in [None, "percent", "probability"]:
raise ValueError(
"`ecdfnorm` must be one of None, 'percent' or 'probability'. "
+ "'%s' was provided." % args["ecdfnorm"]
)
args["histnorm"] = args["ecdfnorm"]
# Compute applicable grouping attributes
for k in group_attrables:
if k in args:
grouped_attrs.append(k)
# Create grouped mappings
grouped_mappings = [make_mapping(args, a) for a in grouped_attrs]
# Create trace specs
trace_specs = make_trace_spec(args, constructor, attrs, trace_patch)
return trace_specs, grouped_mappings, sizeref, show_colorbar
def get_orderings(args, grouper, grouped):
"""
`orders` is the user-supplied ordering with the remaining data-frame-supplied
ordering appended if the column is used for grouping. It includes anything the user
gave, for any variable, including values not present in the dataset. It's a dict
where the keys are e.g. "x" or "color"
`sorted_group_names` is the set of groups, ordered by the order above. It's a list
of tuples like [("value1", ""), ("value2", "")] where each tuple contains the name
of a single dimension-group
"""
orders = {} if "category_orders" not in args else args["category_orders"].copy()
for col in grouper:
if col != one_group:
uniques = list(args["data_frame"][col].unique())
if col not in orders:
orders[col] = uniques
else:
orders[col] = list(OrderedDict.fromkeys(list(orders[col]) + uniques))
sorted_group_names = []
for group_name in grouped.groups:
if len(grouper) == 1:
group_name = (group_name,)
sorted_group_names.append(group_name)
for i, col in reversed(list(enumerate(grouper))):
if col != one_group:
sorted_group_names = sorted(
sorted_group_names,
key=lambda g: orders[col].index(g[i]) if g[i] in orders[col] else -1,
)
return orders, sorted_group_names
def make_figure(args, constructor, trace_patch=None, layout_patch=None):
trace_patch = trace_patch or {}
layout_patch = layout_patch or {}
apply_default_cascade(args)
args = build_dataframe(args, constructor)
if constructor in [go.Treemap, go.Sunburst, go.Icicle] and args["path"] is not None:
args = process_dataframe_hierarchy(args)
if constructor == "timeline":
constructor = go.Bar
args = process_dataframe_timeline(args)
trace_specs, grouped_mappings, sizeref, show_colorbar = infer_config(
args, constructor, trace_patch, layout_patch
)
grouper = [x.grouper or one_group for x in grouped_mappings] or [one_group]
grouped = args["data_frame"].groupby(grouper, sort=False)
orders, sorted_group_names = get_orderings(args, grouper, grouped)
col_labels = []
row_labels = []
nrows = ncols = 1
for m in grouped_mappings:
if m.grouper not in orders:
m.val_map[""] = m.sequence[0]
else:
sorted_values = orders[m.grouper]
if m.facet == "col":
prefix = get_label(args, args["facet_col"]) + "="
col_labels = [prefix + str(s) for s in sorted_values]
ncols = len(col_labels)
if m.facet == "row":
prefix = get_label(args, args["facet_row"]) + "="
row_labels = [prefix + str(s) for s in sorted_values]
nrows = len(row_labels)
for val in sorted_values:
if val not in m.val_map: # always False if it's an IdentityMap
m.val_map[val] = m.sequence[len(m.val_map) % len(m.sequence)]
subplot_type = _subplot_type_for_trace_type(constructor().type)
trace_names_by_frame = {}
frames = OrderedDict()
trendline_rows = []
trace_name_labels = None
facet_col_wrap = args.get("facet_col_wrap", 0)
for group_name in sorted_group_names:
group = grouped.get_group(group_name if len(group_name) > 1 else group_name[0])
mapping_labels = OrderedDict()
trace_name_labels = OrderedDict()
frame_name = ""
for col, val, m in zip(grouper, group_name, grouped_mappings):
if col != one_group:
key = get_label(args, col)
if not isinstance(m.val_map, IdentityMap):
mapping_labels[key] = str(val)
if m.show_in_trace_name:
trace_name_labels[key] = str(val)
if m.variable == "animation_frame":
frame_name = val
trace_name = ", ".join(trace_name_labels.values())
if frame_name not in trace_names_by_frame:
trace_names_by_frame[frame_name] = set()
trace_names = trace_names_by_frame[frame_name]
for trace_spec in trace_specs:
# Create the trace
trace = trace_spec.constructor(name=trace_name)
if trace_spec.constructor not in [
go.Parcats,
go.Parcoords,
go.Choropleth,
go.Choroplethmapbox,
go.Densitymapbox,
go.Histogram2d,
go.Sunburst,
go.Treemap,
go.Icicle,
]:
trace.update(
legendgroup=trace_name,
showlegend=(trace_name != "" and trace_name not in trace_names),
)
if trace_spec.constructor in [go.Bar, go.Violin, go.Box, go.Histogram]:
trace.update(alignmentgroup=True, offsetgroup=trace_name)
trace_names.add(trace_name)
# Init subplot row/col
trace._subplot_row = 1
trace._subplot_col = 1
for i, m in enumerate(grouped_mappings):
val = group_name[i]
try:
m.updater(trace, m.val_map[val]) # covers most cases
except ValueError:
# this catches some odd cases like marginals
if (
trace_spec != trace_specs[0]
and (
trace_spec.constructor in [go.Violin, go.Box]
and m.variable in ["symbol", "pattern", "dash"]
)
or (
trace_spec.constructor in [go.Histogram]
and m.variable in ["symbol", "dash"]
)
):
pass
elif (
trace_spec != trace_specs[0]
and trace_spec.constructor in [go.Histogram]
and m.variable == "color"
):
trace.update(marker=dict(color=m.val_map[val]))
elif (
trace_spec.constructor in [go.Choropleth, go.Choroplethmapbox]
and m.variable == "color"
):
trace.update(
z=[1] * len(group),
colorscale=[m.val_map[val]] * 2,
showscale=False,
showlegend=True,
)
else:
raise
# Find row for trace, handling facet_row and marginal_x
if m.facet == "row":
row = m.val_map[val]
else:
if (
args.get("marginal_x") is not None # there is a marginal
and trace_spec.marginal != "x" # and we're not it
):
row = 2
else:
row = 1
# Find col for trace, handling facet_col and marginal_y
if m.facet == "col":
col = m.val_map[val]
if facet_col_wrap: # assumes no facet_row, no marginals
row = 1 + ((col - 1) // facet_col_wrap)
col = 1 + ((col - 1) % facet_col_wrap)
else:
if trace_spec.marginal == "y":
col = 2
else:
col = 1
if row > 1:
trace._subplot_row = row
if col > 1:
trace._subplot_col = col
if (
trace_specs[0].constructor == go.Histogram2dContour
and trace_spec.constructor == go.Box
and trace.line.color
):
trace.update(marker=dict(color=trace.line.color))
if "ecdfmode" in args:
base = args["x"] if args["orientation"] == "v" else args["y"]
var = args["x"] if args["orientation"] == "h" else args["y"]
ascending = args.get("ecdfmode", "standard") != "reversed"
group = group.sort_values(by=base, ascending=ascending)
group_sum = group[var].sum() # compute here before next line mutates
group[var] = group[var].cumsum()
if not ascending:
group = group.sort_values(by=base, ascending=True)
if args.get("ecdfmode", "standard") == "complementary":
group[var] = group_sum - group[var]
if args["ecdfnorm"] == "probability":
group[var] = group[var] / group_sum
elif args["ecdfnorm"] == "percent":
group[var] = 100.0 * group[var] / group_sum
patch, fit_results = make_trace_kwargs(
args, trace_spec, group, mapping_labels.copy(), sizeref
)
trace.update(patch)
if fit_results is not None:
trendline_rows.append(mapping_labels.copy())
trendline_rows[-1]["px_fit_results"] = fit_results
if frame_name not in frames:
frames[frame_name] = dict(data=[], name=frame_name)
frames[frame_name]["data"].append(trace)
frame_list = [f for f in frames.values()]
if len(frame_list) > 1:
frame_list = sorted(
frame_list, key=lambda f: orders[args["animation_frame"]].index(f["name"])
)
if show_colorbar:
colorvar = "z" if constructor in [go.Histogram2d, go.Densitymapbox] else "color"
range_color = args["range_color"] or [None, None]
colorscale_validator = ColorscaleValidator("colorscale", "make_figure")
layout_patch["coloraxis1"] = dict(
colorscale=colorscale_validator.validate_coerce(
args["color_continuous_scale"]
),
cmid=args["color_continuous_midpoint"],
cmin=range_color[0],
cmax=range_color[1],
colorbar=dict(
title_text=get_decorated_label(args, args[colorvar], colorvar)
),
)
for v in ["height", "width"]:
if args[v]:
layout_patch[v] = args[v]
layout_patch["legend"] = dict(tracegroupgap=0)
if trace_name_labels:
layout_patch["legend"]["title_text"] = ", ".join(trace_name_labels)
if args["title"]:
layout_patch["title_text"] = args["title"]
elif args["template"].layout.margin.t is None:
layout_patch["margin"] = {"t": 60}
if (
"size" in args
and args["size"]
and args["template"].layout.legend.itemsizing is None
):
layout_patch["legend"]["itemsizing"] = "constant"
if facet_col_wrap:
nrows = math.ceil(ncols / facet_col_wrap)
ncols = min(ncols, facet_col_wrap)
if args.get("marginal_x") is not None:
nrows += 1
if args.get("marginal_y") is not None:
ncols += 1
fig = init_figure(
args, subplot_type, frame_list, nrows, ncols, col_labels, row_labels
)
# Position traces in subplots
for frame in frame_list:
for trace in frame["data"]:
if isinstance(trace, go.Splom):
# Special case that is not compatible with make_subplots
continue
_set_trace_grid_reference(
trace,
fig.layout,
fig._grid_ref,
nrows - trace._subplot_row + 1,
trace._subplot_col,
)
# Add traces, layout and frames to figure
fig.add_traces(frame_list[0]["data"] if len(frame_list) > 0 else [])
fig.update_layout(layout_patch)
if "template" in args and args["template"] is not None:
fig.update_layout(template=args["template"], overwrite=True)
fig.frames = frame_list if len(frames) > 1 else []
if args.get("trendline") and args.get("trendline_scope", "trace") == "overall":
trendline_spec = make_trendline_spec(args, constructor)
trendline_trace = trendline_spec.constructor(
name="Overall Trendline", legendgroup="Overall Trendline", showlegend=False
)
if "line" not in trendline_spec.trace_patch: # no color override
for m in grouped_mappings:
if m.variable == "color":
next_color = m.sequence[len(m.val_map) % len(m.sequence)]
trendline_spec.trace_patch["line"] = dict(color=next_color)
patch, fit_results = make_trace_kwargs(
args, trendline_spec, args["data_frame"], {}, sizeref
)
trendline_trace.update(patch)
fig.add_trace(
trendline_trace, row="all", col="all", exclude_empty_subplots=True
)
fig.update_traces(selector=-1, showlegend=True)
if fit_results is not None:
trendline_rows.append(dict(px_fit_results=fit_results))
fig._px_trendlines = pd.DataFrame(trendline_rows)
configure_axes(args, constructor, fig, orders)
configure_animation_controls(args, constructor, fig)
return fig
def init_figure(args, subplot_type, frame_list, nrows, ncols, col_labels, row_labels):
# Build subplot specs
specs = [[dict(type=subplot_type or "domain")] * ncols for _ in range(nrows)]
# Default row/column widths uniform
column_widths = [1.0] * ncols
row_heights = [1.0] * nrows
facet_col_wrap = args.get("facet_col_wrap", 0)
# Build column_widths/row_heights
if subplot_type == "xy":
if args.get("marginal_x") is not None:
if args["marginal_x"] == "histogram" or ("color" in args and args["color"]):
main_size = 0.74
else:
main_size = 0.84
row_heights = [main_size] * (nrows - 1) + [1 - main_size]
vertical_spacing = 0.01
elif facet_col_wrap:
vertical_spacing = args.get("facet_row_spacing") or 0.07
else:
vertical_spacing = args.get("facet_row_spacing") or 0.03
if args.get("marginal_y") is not None:
if args["marginal_y"] == "histogram" or ("color" in args and args["color"]):
main_size = 0.74
else:
main_size = 0.84
column_widths = [main_size] * (ncols - 1) + [1 - main_size]
horizontal_spacing = 0.005
else:
horizontal_spacing = args.get("facet_col_spacing") or 0.02
else:
# Other subplot types:
# 'scene', 'geo', 'polar', 'ternary', 'mapbox', 'domain', None
#
# We can customize subplot spacing per type once we enable faceting
# for all plot types
if facet_col_wrap:
vertical_spacing = args.get("facet_row_spacing") or 0.07
else:
vertical_spacing = args.get("facet_row_spacing") or 0.03
horizontal_spacing = args.get("facet_col_spacing") or 0.02
if facet_col_wrap:
subplot_labels = [None] * nrows * ncols
while len(col_labels) < nrows * ncols:
col_labels.append(None)
for i in range(nrows):
for j in range(ncols):
subplot_labels[i * ncols + j] = col_labels[(nrows - 1 - i) * ncols + j]
def _spacing_error_translator(e, direction, facet_arg):
"""
Translates the spacing errors thrown by the underlying make_subplots
routine into one that describes an argument adjustable through px.
"""
if ("%s spacing" % (direction,)) in e.args[0]:
e.args = (
e.args[0]
+ """
Use the {facet_arg} argument to adjust this spacing.""".format(
facet_arg=facet_arg
),
)
raise e
# Create figure with subplots
try:
fig = make_subplots(
rows=nrows,
cols=ncols,
specs=specs,
shared_xaxes="all",
shared_yaxes="all",
row_titles=[] if facet_col_wrap else list(reversed(row_labels)),
column_titles=[] if facet_col_wrap else col_labels,
subplot_titles=subplot_labels if facet_col_wrap else [],
horizontal_spacing=horizontal_spacing,
vertical_spacing=vertical_spacing,
row_heights=row_heights,
column_widths=column_widths,
start_cell="bottom-left",
)
except ValueError as e:
_spacing_error_translator(e, "Horizontal", "facet_col_spacing")
_spacing_error_translator(e, "Vertical", "facet_row_spacing")
# Remove explicit font size of row/col titles so template can take over
for annot in fig.layout.annotations:
annot.update(font=None)
return fig