-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMCM.py
163 lines (131 loc) · 4.67 KB
/
MCM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python3
#
# MCM.py is a collection of functions for the Markov-chain
# Mixture Distribution model for forecasting.
#
# The model was published in:
# [1] J. Munkhammar&J. Widén, Probabilistic forecasting of high-resolution
# clear-sky index time-series using a Markov-chain mixture distribution
# model, Solar Energy vol XX 2019. (Available as preprint on Researchgate)
#
# Any use of this model should cite the reference and state that this model
# was used.
#
# This file contains the following functions:
#
# P = MCMFit(Data,N) which determines an NxN transition matrix P from a
# time-series in Data.
#
# X, Y = MCMForecast(P,a,b,obspoint) which delivers a piece-wise uniform
# distribution (X,Y) from the transition matrix P, minimum a and maximum b
# of the time-series and obspoint as the observation point to forecast
# from.
#
# NewSamples = MCMRnd(X,Y,Num) which delivers Num number of samples of the
# distribution X,Y (obtained in MCMForecast), which is the predictive
# distribution (the forecast).
#
# These functions can all be tested using MCMtest.py, which is accompanied
# by a test data set TestData.txt
import numpy as np
def MCMFit(data, n, timeSteps=1):
"""Estimates the transition probability to the future time-step.
Parameters
----------
data : (n,)
Numpy array containing the data. This array should be of width
1.
n : int
Number of states to be fitted in the transition matrix.
timeSteps : int, optional
The time-steps of the returned transition matrix (the default
is 1, which returns the transition matrix for the following time-
step).
Returns
-------
np.array(n,n)
The transition matrix for the time-steps.
"""
# Set up bins and limits
a = np.min(data)
b = np.max(data)
binWidth = (b - a) / n
# Identify the states for the Markov-chain in the data set
state = np.floor((data-a) / binWidth)
state[state > (n-1)] = n - 1
state = state.astype('int32')
# Generate the transition matrix
p = np.zeros((n,n))
for i in range(1,len(data)):
p[state[i-1], state[i]]= p[state[i-1], state[i]] + 1
# Normalizing the transition matrix
rowSums = p.sum(1)
rowSums[rowSums == 0] = 1.0 # do not divide by zero
p = p / rowSums[:, np.newaxis]
p = np.linalg.matrix_power(p, timeSteps)
# Return the transition matrix
return(p)
def MCMForecast(p, minValue, maxValue, obsPoint):
"""Returns the transition row and the bin stratring values.
Parameters
---------
p : np.arange(n,n)
The transition matrix.
minValue : float
The minimum value in the range of the data.
maxValue : float
The maximum value in the range of the data.
obsPoint : float
Observation from which to forecast.
Returns
-------
np.array(n,)
The bin starting values.
np.array(n,)
The row in the transition matrix p which represents starting
from the observation.
"""
assert minValue < maxValue, "minValue must be less than maxValue."
assert obsPoint >= minValue, "Observation has to be larger than " \
+ "minValue."
# The number of bins
n = p.shape[0]
# Bin starting values
binWidth = (maxValue - minValue) / n
# Calculate the range of the bins
binStarts = np.arange(n) * binWidth + minValue
# Identify which bin the obspoint belongs to
obsBin = np.where(obsPoint >= binStarts)[0][-1]
# Return the X and Y of the piece-wise uniform distribution
return(binStarts, p[obsBin, :])
def MCMRnd(binStarts, transProbs, count):
"""Generate random forecasts from a bin.
Parameters
----------
binStarts : np.array(n,)
An array contains the bin starting values.
transProbs : np.array(n,)
The transition probabilities from the forecast point.
count : int
Number of forecast samples.
Returns
-------
np.array(count,)
An array of forecast samples
"""
# Calculate the bin-width
binWidth = np.diff(binStarts)[0]
# Define the CDF (for later use of inverse CDF)
probsCDF = np.cumsum(transProbs)
# Calculating the Num samples from the distribution
# First setting initial conditions and a randomizer
randWithinECDF = np.random.uniform(0, 1, count)
# Setting the uniform random variable in each bin
randWithinBin = np.random.uniform(0, binWidth, count)
fcstSamples = np.zeros(count)
# Sampling from the CDF and then obtaining the inverse CDF
for i in range(count):
binIndex = np.where(randWithinECDF[i] <= probsCDF)[0][0]
fcstSamples[i] = binStarts[binIndex] + randWithinBin[i]
# Return the samples
return(fcstSamples)