Source code for openseries.series

"""The OpenTimeSeries class."""

from __future__ import annotations

import datetime as dt
from copy import deepcopy
from logging import getLogger
from typing import TYPE_CHECKING, Any, Self, TypeVar, cast

if TYPE_CHECKING:  # pragma: no cover
    from numpy.typing import NDArray
    from pandas import Timestamp

from numpy import (
    append,
    array,
    asarray,
    cumprod,
    diff,
    float64,
    insert,
    isnan,
    log,
    sqrt,
    square,
)
from pandas import (
    DataFrame,
    DatetimeIndex,
    Index,
    MultiIndex,
    Series,
    date_range,
)
from pydantic import field_validator, model_validator
from scipy.stats import chi2, norm

from ._common_model import (
    _calculate_time_factor,
    _CommonModel,
    _demeaned_returns_for_autocorr,
)
from .datefixer import _do_resample_to_business_period_ends, date_fix
from .owntypes import (
    Countries,
    CountriesType,
    Currency,
    CurrencyStringType,
    DateAlignmentError,
    DateListType,
    DaysInYearType,
    IncorrectArgumentComboError,
    LiteralBizDayFreq,
    LiteralPandasReindexMethod,
    LiteralSeriesProps,
    MarketsNotStringNorListStrError,
    OpenTimeSeriesPropertiesList,
    ResampleDataLossError,
    ValueListType,
    ValueType,
)

logger = getLogger(__name__)

__all__ = ["OpenTimeSeries", "timeseries_chain"]

TypeOpenTimeSeries = TypeVar("TypeOpenTimeSeries", bound="OpenTimeSeries")


[docs] class OpenTimeSeries(_CommonModel[float]): """OpenTimeSeries objects are at the core of the openseries package. The intended use is to allow analyses of financial timeseries. It is only intended for daily or less frequent data samples. Args: timeseries_id: Database identifier of the timeseries. instrument_id: Database identifier of the instrument associated with the timeseries. name: String identifier of the timeseries and/or instrument. valuetype: Identifies if the series is a series of values or returns. dates: Dates of the individual timeseries items. These dates will not be altered by methods. values: The value or return values of the timeseries items. These values will not be altered by methods. local_ccy: Boolean flag indicating if timeseries is in local currency. tsdf: Pandas object holding dates and values that can be altered via methods. currency: ISO 4217 currency code of the timeseries. domestic: ISO 4217 currency code of the user's home currency. Defaults to "SEK". countries: (List of) country code(s) according to ISO 3166-1 alpha-2. Defaults to "SE". markets: (List of) markets code(s) supported by exchange_calendars. Optional. isin: ISO 6166 identifier code of the associated instrument. Optional. label: Placeholder for a name of the timeseries. Optional. """ timeseries_id: str instrument_id: str name: str valuetype: ValueType dates: DateListType values: ValueListType local_ccy: bool tsdf: DataFrame currency: CurrencyStringType domestic: CurrencyStringType = "SEK" countries: CountriesType = "SE" isin: str | None = None label: str | None = None @field_validator("domestic", mode="before") @classmethod def _validate_domestic(cls, value: CurrencyStringType) -> CurrencyStringType: """Pydantic validator to ensure domestic field is validated.""" Currency(ccy=value) return value @field_validator("countries", mode="before") @classmethod def _validate_countries(cls, value: CountriesType) -> CountriesType: """Pydantic validator to ensure countries field is validated.""" Countries(countryinput=value) return value @field_validator("markets", mode="before") @classmethod def _validate_markets( cls, value: list[str] | str | None, ) -> list[str] | str | None: """Pydantic validator to ensure markets field is validated. Raises: MarketsNotStringNorListStrError: If ``markets`` is neither a string nor a non-empty list of strings. """ msg = ( "'markets' must be a string or list of strings, " f"got {type(value).__name__!r}" ) if value is None or isinstance(value, str): return value if isinstance(value, list): if all(isinstance(item, str) for item in value) and len(value) != 0: return value item_msg = "All items in 'markets' must be strings." raise MarketsNotStringNorListStrError(item_msg) raise MarketsNotStringNorListStrError(msg) @model_validator(mode="after") def _dates_and_values_validate(self: Self) -> Self: """Pydantic validator to ensure dates and values are validated. Raises: ValueError: If dates are not unique or if numbers of dates and values do not match the shape of ``tsdf``. """ values_list_length = len(self.values) dates_list_length = len(self.dates) dates_set_length = len(set(self.dates)) if dates_list_length != dates_set_length: msg = "Dates are not unique" raise ValueError(msg) if ( (dates_list_length != values_list_length) or (len(self.tsdf.index) != self.tsdf.shape[0]) or (self.tsdf.shape[1] != 1) ): msg = "Number of dates and values passed do not match" raise ValueError(msg) return self def _coerce_result( self: Self, result: Series[float], name: str, ) -> float: _ = name return float(asarray(a=result, dtype=float64).squeeze())
[docs] @classmethod def from_arrays( cls, name: str, dates: DateListType, values: ValueListType, valuetype: ValueType = ValueType.PRICE, timeseries_id: str = "", instrument_id: str = "", isin: str | None = None, baseccy: CurrencyStringType = "SEK", *, local_ccy: bool = True, ) -> Self: """Create series from a list of dates and a list of values. Args: name: String identifier of the timeseries and/or instrument. dates: List of date strings as ISO 8601 YYYY-MM-DD. values: Array of float values. valuetype: Identifies if the series is a series of values or returns. Defaults to ValueType.PRICE. timeseries_id: Database identifier of the timeseries. Optional. instrument_id: Database identifier of the instrument associated with the timeseries. Optional. isin: ISO 6166 identifier code of the associated instrument. Optional. baseccy: ISO 4217 currency code of the timeseries. Defaults to "SEK". local_ccy: Boolean flag indicating if timeseries is in local currency. Defaults to True. Returns: An OpenTimeSeries object. """ return cls( name=name, label=name, dates=dates, values=values, valuetype=valuetype, timeseries_id=timeseries_id, instrument_id=instrument_id, isin=isin, currency=baseccy, local_ccy=local_ccy, tsdf=DataFrame( data=values, index=[deyt.date() for deyt in DatetimeIndex(dates)], columns=[[name], [valuetype]], dtype="float64", ), )
[docs] @classmethod def from_df( cls, dframe: Series | DataFrame | object, column_nmbr: int = 0, valuetype: ValueType = ValueType.PRICE, baseccy: CurrencyStringType = "SEK", *, local_ccy: bool = True, ) -> Self: """Create series from a Pandas DataFrame or Series. Args: dframe: Pandas DataFrame or Series. column_nmbr: Using iloc[:, column_nmbr] to pick column. Defaults to 0. valuetype: Identifies if the series is a series of values or returns. Defaults to ValueType.PRICE. baseccy: ISO 4217 currency code of the timeseries. Defaults to "SEK". local_ccy: Boolean flag indicating if timeseries is in local currency. Defaults to True. Returns: An OpenTimeSeries object. Raises: TypeError: If ``dframe`` is not a ``pandas.Series`` or a ``pandas.DataFrame``. """ msg = "Argument dframe must be pandas Series or DataFrame." values: list[float] pandas_obj: Series | DataFrame if isinstance(dframe, Series): pandas_obj = dframe if isinstance(dframe.name, tuple): label, _ = dframe.name else: label = dframe.name values = dframe.to_numpy().tolist() elif isinstance(dframe, DataFrame): pandas_obj = dframe values = dframe.iloc[:, column_nmbr].to_list() if isinstance(dframe.columns, MultiIndex): if _check_if_none( dframe.columns.get_level_values(0).to_numpy()[column_nmbr], ): label = "Series" msg = f"Label missing. Adding: {label}" logger.warning(msg) else: label = dframe.columns.get_level_values(0).to_numpy()[column_nmbr] if _check_if_none( dframe.columns.get_level_values(1).to_numpy()[column_nmbr], ): valuetype = ValueType.PRICE msg = f"valuetype missing. Adding: {valuetype.value}" logger.warning(msg) else: valuetype = dframe.columns.get_level_values(1).to_numpy()[ column_nmbr ] else: label = dframe.columns.to_numpy()[column_nmbr] else: raise TypeError(msg) dates = [date_fix(d).strftime("%Y-%m-%d") for d in pandas_obj.index] return cls( timeseries_id="", instrument_id="", currency=baseccy, dates=dates, name=label, label=label, valuetype=valuetype, values=values, local_ccy=local_ccy, tsdf=DataFrame( data=values, index=[deyt.date() for deyt in DatetimeIndex(dates)], columns=[[label], [valuetype]], dtype="float64", ), )
[docs] @classmethod def from_fixed_rate( cls, rate: float, d_range: DatetimeIndex | None = None, days: int | None = None, end_dt: dt.date | None = None, label: str = "Series", valuetype: ValueType = ValueType.PRICE, baseccy: CurrencyStringType = "SEK", *, local_ccy: bool = True, ) -> Self: """Create series from values accruing with a given fixed rate return. Providing a date_range of type Pandas DatetimeIndex takes priority over providing a combination of days and an end date. Args: rate: The accrual rate. d_range: A given range of dates. Optional. days: Number of days to generate when date_range not provided. Must be combined with end_dt. Optional. end_dt: End date of date range to generate when date_range not provided. Must be combined with days. Optional. label: Placeholder for a name of the timeseries. valuetype: Identifies if the series is a series of values or returns. Defaults to ValueType.PRICE. baseccy: The currency of the timeseries. Defaults to "SEK". local_ccy: Boolean flag indicating if timeseries is in local currency. Defaults to True. Returns: An OpenTimeSeries object. Raises: IncorrectArgumentComboError: If ``d_range`` is not provided and the combination of ``days`` and ``end_dt`` is incomplete. """ if d_range is None: if days is not None and end_dt is not None: d_range = DatetimeIndex( [d.date() for d in date_range(periods=days, end=end_dt, freq="D")], ) else: msg = "If d_range is not provided both days and end_dt must be." raise IncorrectArgumentComboError(msg) deltas = array([i.days for i in d_range[1:] - d_range[:-1]]) arr: list[float] = list(cumprod(insert(1 + deltas * rate / 365, 0, 1.0))) dates = [d.strftime("%Y-%m-%d") for d in d_range] return cls( timeseries_id="", instrument_id="", currency=baseccy, dates=dates, name=label, label=label, valuetype=valuetype, values=arr, local_ccy=local_ccy, tsdf=DataFrame( data=arr, index=[d.date() for d in DatetimeIndex(dates)], columns=[[label], [valuetype]], dtype="float64", ), )
[docs] def from_deepcopy(self: Self) -> Self: """Create copy of OpenTimeSeries object. Returns: An OpenTimeSeries object. """ return deepcopy(self)
[docs] def pandas_df(self: Self) -> Self: """Populate .tsdf Pandas DataFrame from the .dates and .values lists. Returns: An OpenTimeSeries object. """ dframe = DataFrame( data=self.values, index=[d.date() for d in DatetimeIndex(self.dates)], columns=[[self.label], [self.valuetype]], dtype="float64", ) self.tsdf = dframe return self
[docs] def all_properties( self: Self, properties: list[LiteralSeriesProps] | None = None, ) -> DataFrame: """Calculate chosen properties. Args: properties: The properties to calculate. Defaults to calculating all available. Optional. Returns: Properties of the OpenTimeSeries. """ if not properties: properties = cast( "list[LiteralSeriesProps]", OpenTimeSeriesPropertiesList.allowed_strings, ) props = OpenTimeSeriesPropertiesList(*properties) def _prop_value(name: str) -> float | int | dt.date | Series[float]: attr = getattr(self, name) return cast( "float | int | dt.date | Series[float]", attr() if callable(attr) else attr, ) pdf = DataFrame.from_dict( {x: _prop_value(x) for x in props}, orient="index", ) pdf.columns = self.tsdf.columns return pdf
[docs] def value_to_ret(self: Self) -> Self: """Convert series of values into series of returns. Returns: The returns of the values in the series. """ returns = self.tsdf.ffill().pct_change() returns.iloc[0] = 0 self.valuetype = ValueType.RTRN arrays = cast("Any", [[self.label], [self.valuetype]]) returns.columns = MultiIndex.from_arrays(arrays) self.tsdf = returns.copy() return self
[docs] def value_to_diff(self: Self, periods: int = 1) -> Self: """Convert series of values to series of their period differences. Args: periods: The number of periods between observations over which difference is calculated. Defaults to 1. Returns: An OpenTimeSeries object. """ self.tsdf = self.tsdf.diff(periods=periods) self.tsdf.iloc[0] = 0 self.valuetype = ValueType.RTRN self.tsdf.columns = MultiIndex.from_arrays( [ [self.label], [self.valuetype], ], ) return self
[docs] def to_cumret(self: Self) -> Self: """Convert series of returns into cumulative series of values. Returns: An OpenTimeSeries object. """ if self.valuetype == ValueType.PRICE: self.value_to_ret() self.tsdf = self.tsdf.add(1.0) self.tsdf = self.tsdf.cumprod(axis=0) / self.tsdf.iloc[0] self.valuetype = ValueType.PRICE self.tsdf.columns = MultiIndex.from_arrays( [ [self.label], [self.valuetype], ], ) return self
[docs] def from_1d_rate_to_cumret( self: Self, days_in_year: int = 365, divider: float = 1.0, ) -> Self: """Convert series of 1-day rates into series of cumulative values. Args: days_in_year: Calendar days per year used as divisor. Defaults to 365. divider: Convenience divider for when the 1-day rate is not scaled correctly. Defaults to 1.0. Returns: An OpenTimeSeries object. """ arr: NDArray[float64] = array(self.values) / divider deltas = array([i.days for i in self.tsdf.index[1:] - self.tsdf.index[:-1]]) arr = cast( "NDArray[float64]", cumprod( a=insert( arr=1.0 + deltas * arr[:-1] / days_in_year, obj=0, values=1.0 ), ), ) self.dates = [d.strftime("%Y-%m-%d") for d in self.tsdf.index] self.values = list(arr) self.valuetype = ValueType.PRICE self.tsdf = DataFrame( data=self.values, index=[d.date() for d in DatetimeIndex(self.dates)], columns=[[self.label], [self.valuetype]], dtype="float64", ) return self
[docs] def resample( self: Self, freq: LiteralBizDayFreq | str = "BME", ) -> Self: """Resamples the timeseries frequency. Args: freq: The date offset string that sets the resampled frequency. Defaults to "BME". Returns: An OpenTimeSeries object. """ self.tsdf.index = DatetimeIndex(self.tsdf.index) if self.valuetype == ValueType.RTRN: self.tsdf = self.tsdf.resample(freq).sum() else: self.tsdf = self.tsdf.resample(freq).last() self.tsdf.index = Index(DatetimeIndex(self.tsdf.index).date) return self
[docs] def resample_to_business_period_ends( self: Self, freq: LiteralBizDayFreq = "BME", method: LiteralPandasReindexMethod = "nearest", ) -> Self: """Resamples timeseries frequency to the business calendar month end dates. Stubs left in place. Stubs will be aligned to the shortest stub. Args: freq: The date offset string that sets the resampled frequency. Defaults to BME. method: Controls the method used to align values across columns. Defaults to nearest. Returns: An OpenTimeSeries object. Raises: ResampleDataLossError: If called on a return series (``valuetype`` is ``ValueType.RTRN``), since summation across sparser frequency would be required to avoid data loss. """ if self.valuetype == ValueType.RTRN: msg = ( "Do not run resample_to_business_period_ends on return series. " "The operation will pick the last data point in the sparser series. " "It will not sum returns and therefore data will be lost." ) raise ResampleDataLossError(msg) dates = _do_resample_to_business_period_ends( data=self.tsdf, freq=freq, countries=self.countries, markets=self.markets, ) self.tsdf = self.tsdf.reindex([deyt.date() for deyt in dates], method=method) return self
[docs] def ewma_vol_func( self: Self, lmbda: float = 0.94, day_chunk: int = 11, dlta_degr_freedms: int = 0, months_from_last: int | None = None, from_date: dt.date | None = None, to_date: dt.date | None = None, periods_in_a_year_fixed: DaysInYearType | None = None, ) -> Series[float]: """Exponentially Weighted Moving Average Model for Volatility. Reference: https://www.investopedia.com/articles/07/ewma.asp. Args: lmbda: Scaling factor to determine weighting. Defaults to 0.94. day_chunk: Sampling the data which is assumed to be daily. Defaults to 11. dlta_degr_freedms: Variance bias factor taking the value 0 or 1. Defaults to 0. months_from_last: Number of months offset as positive integer. Overrides use of from_date and to_date. Optional. from_date: Specific from date. Optional. to_date: Specific to date. Optional. periods_in_a_year_fixed: Allows locking the periods-in-a-year to simplify test cases and comparisons. Optional. Returns: Series EWMA volatility. """ earlier, later = self.calc_range( months_offset=months_from_last, from_dt=from_date, to_dt=to_date, ) time_factor = _calculate_time_factor( data=self.tsdf.loc[ cast("Timestamp", earlier) : cast("Timestamp", later) ].iloc[:, 0], earlier=earlier, later=later, periods_in_a_year_fixed=periods_in_a_year_fixed, ) data = self.tsdf.loc[ cast("Timestamp", earlier) : cast("Timestamp", later) ].copy() data.loc[:, (self.label, ValueType.RTRN)] = log( data.loc[:, self.tsdf.columns.to_numpy()[0]], ).diff() rawdata = [ data[(self.label, ValueType.RTRN)] .iloc[1:day_chunk] .std(ddof=dlta_degr_freedms) * sqrt(time_factor), ] for item in data[(self.label, ValueType.RTRN)].iloc[1:]: prev = rawdata[-1] rawdata.append( sqrt( square(item) * time_factor * (1 - lmbda) + square(prev) * lmbda, ), ) return Series( data=rawdata, index=data.index, name=(self.label, ValueType.EWMA_VOL), dtype="float64", )
[docs] def ewma_var_func( self: Self, lmbda: float = 0.94, day_chunk: int = 11, level: float = 0.95, dlta_degr_freedms: int = 0, months_from_last: int | None = None, from_date: dt.date | None = None, to_date: dt.date | None = None, periods_in_a_year_fixed: DaysInYearType | None = None, ) -> Series[float]: """Exponentially Weighted Moving Average Model for Value At Risk (VaR). Reference: https://www.investopedia.com/articles/07/ewma.asp. Args: lmbda: Scaling factor to determine weighting. Defaults to 0.94. day_chunk: Sampling the data which is assumed to be daily. Defaults to 11. level: The sought VaR level. Defaults to 0.95. dlta_degr_freedms: Variance bias factor taking the value 0 or 1. Defaults to 0. months_from_last: Number of months offset as positive integer. Overrides use of from_date and to_date. Optional. from_date: Specific from date. Optional. to_date: Specific to date. Optional. periods_in_a_year_fixed: Allows locking the periods-in-a-year to simplify test cases and comparisons. Optional. Returns: Series EWMA VaR. """ earlier, later = self.calc_range( months_offset=months_from_last, from_dt=from_date, to_dt=to_date, ) time_factor = _calculate_time_factor( data=self.tsdf.loc[ cast("Timestamp", earlier) : cast("Timestamp", later) ].iloc[:, 0], earlier=earlier, later=later, periods_in_a_year_fixed=periods_in_a_year_fixed, ) data = self.tsdf.loc[ cast("Timestamp", earlier) : cast("Timestamp", later) ].copy() data.loc[:, (self.label, ValueType.RTRN)] = log( data.loc[:, self.tsdf.columns.to_numpy()[0]], ).diff() rawdata = [ data[(self.label, ValueType.RTRN)] .iloc[1:day_chunk] .std(ddof=dlta_degr_freedms) * sqrt(time_factor), ] for item in data[(self.label, ValueType.RTRN)].iloc[1:]: prev = rawdata[-1] rawdata.append( sqrt( square(item) * time_factor * (1 - lmbda) + square(prev) * lmbda, ), ) return Series( data=array(rawdata) * norm.ppf(1 - level), index=data.index, name=(self.label, ValueType.EWMA_VAR), dtype="float64", )
[docs] def running_adjustment( self: Self, adjustment: float, days_in_year: int = 365, ) -> Self: """Add or subtract a fee from the timeseries return. Args: adjustment: Fee to add or subtract. days_in_year: The calculation divisor and assumed number of days in a calendar year. Defaults to 365. Returns: An OpenTimeSeries object. """ if self.valuetype == ValueType.RTRN: ra_df = self.tsdf.copy() initial_value = 1.0 returns_input = True else: initial_value = cast("float", self.tsdf.iloc[0, 0]) ra_df = self.tsdf.ffill().pct_change() returns_input = False ra_df = ra_df.dropna() dates_index = DatetimeIndex(ra_df.index) dates_list = [self.first_idx] + [d.date() for d in dates_index] dates_np = array( [dt.datetime.combine(d, dt.time()) for d in dates_list], dtype="datetime64[D]", ) date_diffs = cast( "NDArray[float64]", diff(dates_np).astype("timedelta64[D]").astype(float64), ) returns_array = cast( "NDArray[float64]", ra_df.iloc[:, 0].to_numpy(), ) adjustment_factors = ( 1.0 + returns_array + adjustment * date_diffs / days_in_year ) values_array = cumprod(insert(adjustment_factors, 0, initial_value)) values = list(values_array) self.tsdf = DataFrame(data=values, index=dates_list) self.valuetype = ValueType.PRICE self.tsdf.columns = MultiIndex.from_arrays( [ [self.label], [self.valuetype], ], ) self.tsdf.index = Index(DatetimeIndex(self.tsdf.index).date) if returns_input: self.value_to_ret() return self
[docs] def set_new_label( self: Self, lvl_zero: str | None = None, lvl_one: ValueType | None = None, *, delete_lvl_one: bool = False, ) -> Self: """Set the column labels of the .tsdf Pandas Dataframe. Args: lvl_zero: New level zero label. Optional. lvl_one: New level one label. Optional. delete_lvl_one: If True the level one label is deleted. Defaults to False. Returns: An OpenTimeSeries object. """ if lvl_zero is None and lvl_one is None: self.tsdf.columns = MultiIndex.from_arrays( [[self.label], [self.valuetype]], ) elif lvl_zero is not None and lvl_one is None: self.tsdf.columns = MultiIndex.from_arrays([[lvl_zero], [self.valuetype]]) self.label = lvl_zero elif lvl_zero is None and lvl_one is not None: self.tsdf.columns = MultiIndex.from_arrays([[self.label], [lvl_one]]) self.valuetype = lvl_one else: self.tsdf.columns = MultiIndex.from_arrays([[lvl_zero], [lvl_one]]) self.label, self.valuetype = lvl_zero, cast("ValueType", lvl_one) if delete_lvl_one: self.tsdf.columns = self.tsdf.columns.droplevel(level=1) return self
def _returns_series(self: Self, *, squared: bool = False) -> Series[float]: """Return demeaned return series for autocorrelation analysis.""" data: Series[float] = self.tsdf.iloc[:, 0] return _demeaned_returns_for_autocorr( series=data, valuetype=self.valuetype, squared=squared )
[docs] def acf( self: Self, lags: int | list[int], *, squared: bool = False, ) -> Series[float]: """Calculate autocorrelation function for specified lags. Args: lags: If int, compute ACF from lag 0 to this value (inclusive). If list, compute ACF at lag 0 plus each lag in the list. squared: If True, compute ACF of squared returns. Defaults to False. Returns: Series of autocorrelations indexed by lag. """ rets = self._returns_series(squared=squared) if isinstance(lags, int): lag_list = list(range(lags + 1)) else: lag_list = sorted({0} | set(lags)) values: list[float] = [] for lag in lag_list: if lag == 0: values.append(1.0) else: values.append(float(rets.autocorr(lag=lag))) return Series( data=values, index=lag_list, name="ACF", dtype="float64", )
[docs] def partial_autocorr(self: Self, lag: int = 1, *, squared: bool = False) -> float: """Calculate partial autocorrelation at a given lag. Args: lag: The lag at which to compute partial autocorrelation. Defaults to 1. squared: If True, compute partial autocorrelation of squared returns. Defaults to False. Returns: Partial autocorrelation at the specified lag. """ pacf_series = self.pacf(lags=lag, squared=squared) return float(pacf_series.loc[lag])
[docs] def pacf( self: Self, lags: int | list[int], *, squared: bool = False, ) -> Series[float]: """Calculate partial autocorrelation function for specified lags. Args: lags: If int, compute PACF from lag 0 to this value (inclusive). If list, compute PACF at lag 0 plus each lag in the list. squared: If True, compute PACF of squared returns. Defaults to False. Returns: Series of partial autocorrelations indexed by lag. """ if isinstance(lags, int): lag_list = list(range(lags + 1)) else: lag_list = sorted({0} | set(lags)) max_lag = max(lag_list) if lag_list else 0 acf_vals = self.acf(lags=max_lag, squared=squared) acf_arr = array([acf_vals.loc[k] for k in range(max_lag + 1)]) pacf_values: list[float] = [1.0] phi: list[list[float]] = [] for k in range(1, max_lag + 1): if k == 1: phi_kk = acf_arr[1] else: numer = acf_arr[k] denom = 1.0 for j in range(k - 1): numer -= phi[k - 2][j] * acf_arr[k - 1 - j] denom -= phi[k - 2][j] * acf_arr[j + 1] phi_kk = numer / denom phi_row = [0.0] * k for j in range(k - 1): phi_row[j] = phi[k - 2][j] - phi_kk * phi[k - 2][k - 2 - j] phi_row[k - 1] = phi_kk phi.append(phi_row) pacf_values.append(phi_kk) result = {lag: pacf_values[lag] for lag in lag_list} return Series( data=[result[lag] for lag in lag_list], index=lag_list, name="PACF", dtype="float64", )
[docs] def ljung_box( self: Self, lags: int | list[int], *, squared: bool = False, ) -> tuple[float, float, list[int]]: """Compute Ljung-Box test for autocorrelation. Args: lags: If int, use lags 1 through this value. If list, use the given lags (lag 0 excluded from test). squared: If True, test autocorrelation of squared returns. Defaults to False. Returns: Tuple of (statistic, pvalue, lags) where statistic is the Ljung-Box Q statistic, pvalue is the chi-squared p-value, and lags is the list of lags used. """ rets = self._returns_series(squared=squared) n = len(rets) if isinstance(lags, int): lag_list = list(range(1, lags + 1)) else: lag_list = sorted({k for k in lags if k > 0}) if not lag_list: return 0.0, 1.0, [] r_k_sq_sum = 0.0 for k in lag_list: if k < n: r_k = float(rets.autocorr(lag=k)) r_k_sq_sum += r_k**2 / (n - k) q_stat = n * (n + 2) * r_k_sq_sum df = len(lag_list) pval = float(1.0 - chi2.cdf(q_stat, df)) return q_stat, pval, lag_list
[docs] def timeseries_chain( front: TypeOpenTimeSeries, back: TypeOpenTimeSeries, old_fee: float = 0.0, ) -> TypeOpenTimeSeries: """Chain two timeseries together. The function assumes that the two series have at least one date in common. Args: front: Earlier series to chain with. back: Later series to chain with. old_fee: Fee to apply to earlier series. Defaults to 0.0. Returns: An OpenTimeSeries object or a subclass thereof. """ old = front.from_deepcopy() old.running_adjustment(old_fee) new = back.from_deepcopy() idx = 0 first = new.tsdf.index[idx] if old.last_idx < first: msg = "Timeseries dates must overlap to allow them to be chained." raise DateAlignmentError(msg) while first not in old.tsdf.index: idx += 1 first = new.tsdf.index[idx] if first > old.tsdf.index[-1]: msg = "Failed to find a matching date between series" raise DateAlignmentError(msg) dates: list[str] = [x.strftime("%Y-%m-%d") for x in old.tsdf.index if x < first] old_values = Series(old.tsdf.iloc[: len(dates), 0]) old_values = old_values.mul( Series(new.tsdf.iloc[:, 0]).loc[first] / Series(old.tsdf.iloc[:, 0]).loc[first], ) values = append(old_values, new.tsdf.iloc[:, 0]) dates.extend([x.strftime("%Y-%m-%d") for x in new.tsdf.index]) return back.__class__( timeseries_id=new.timeseries_id, instrument_id=new.instrument_id, currency=new.currency, dates=dates, name=new.name, label=new.name, valuetype=new.valuetype, values=list(values), local_ccy=new.local_ccy, tsdf=DataFrame( data=values, index=[d.date() for d in DatetimeIndex(dates)], columns=[[new.label], [new.valuetype]], dtype="float64", ), )
def _check_if_none(item: object) -> bool: """Check if a variable is None or equivalent. Args: item: Variable to be checked. Returns: Answer to whether the variable is None or equivalent. """ if item is None: return True try: return cast("bool", isnan(cast("float", item))) except (TypeError, ValueError): return len(str(item)) == 0