1 module hunt.raft.Progress;
2
3 import hunt.logging;
4
5 import std.algorithm;
6 import std.format;
7
8 enum ProgressStateType
9 {
10 ProgressStateProbe = 0,
11 ProgressStateReplicate,
12 ProgressStateSnapshot,
13 }
14
15 /*
16 immutable string[] prstmap =
17 [
18 "ProgressStateProbe",
19 "ProgressStateReplicate",
20 "ProgressStateSnapshot"
21 ];
22
23 string toString(ProgressStateType st)
24 {
25 return prstmap[st];
26 }
27 */
28
29 class inflights
30 {
31 int _start;
32 int _count;
33 int _size;
34 ulong[] _bufffer;
35
36 this(int size)
37 {
38 _size = size;
39 }
40
41 void add(ulong inflight)
42 {
43 if(full())
44 logError("cannot add into a full inflights");
45
46 auto next = _start + _count;
47 auto size = _size;
48 if(next >= size)
49 next -= size;
50
51 if(next >= _bufffer.length)
52 growBuf();
53
54 _bufffer[next] = inflight;
55 _count++;
56 }
57
58 void growBuf()
59 {
60 auto newSize = _bufffer.length * 2;
61 if(newSize == 0)
62 newSize = 1;
63 else if( newSize > _size)
64 newSize = _size;
65
66 _bufffer.length = newSize;
67 }
68
69 void freeTo(ulong to)
70 {
71 if(_count == 0 || to < _bufffer[_start])
72 return;
73
74 auto i = 0;
75 auto idx = _start;
76 auto size = _size;
77
78 for( ; i < _count ; i++)
79 {
80 if( to < _bufffer[idx])
81 break;
82
83 idx++;
84 if(idx >= size)
85 idx -= size;
86 }
87
88 _count -= i;
89 _start = idx;
90
91 if(_count == 0)
92 _start = 0;
93 }
94
95
96 void freeFirstOne()
97 {
98 freeTo(_bufffer[_start]);
99 }
100
101
102 bool full()
103 {
104 return _count == _size;
105 }
106
107 void reset()
108 {
109 _count = 0;
110 _start = 0;
111 }
112
113 }
114
115
116 class Progress
117 {
118 ulong _Match;
119 ulong _Next;
120
121 ProgressStateType _State;
122 bool _Paused;
123 ulong _PendingSnapshot;
124
125 bool _RecentActive;
126
127 inflights _ins;
128
129
130 void resetState(ProgressStateType state)
131 {
132 _Paused = false;
133 _PendingSnapshot = 0;
134 _State = state;
135 _ins.reset();
136 }
137
138 void becomeProbe()
139 {
140 if(_State == ProgressStateType.ProgressStateSnapshot)
141 {
142 auto PendingSnapshot = _PendingSnapshot;
143 resetState(ProgressStateType.ProgressStateProbe);
144 _Next = max(_Match + 1 , PendingSnapshot + 1);
145 }
146 else
147 {
148 resetState(ProgressStateType.ProgressStateProbe);
149 _Next = _Match + 1;
150 }
151 }
152
153
154 void becomeReplicate()
155 {
156 resetState(ProgressStateType.ProgressStateReplicate);
157 _Next = _Match + 1;
158 }
159
160 void becomeSnapshot(ulong snapshoti)
161 {
162 resetState(ProgressStateType.ProgressStateSnapshot);
163 _PendingSnapshot = snapshoti;
164 }
165
166 bool maybeUpdate(ulong n)
167 {
168 bool updated = false;
169 if(_Match < n)
170 {
171 _Match = n;
172 updated = true;
173 resume();
174 }
175 if(_Next < n + 1)
176 {
177 _Next = n + 1;
178 }
179
180 return updated;
181 }
182
183 void optimisticUpdate(ulong n)
184 {
185 _Next = n + 1;
186 }
187
188 bool maybeDecrTo(ulong rejected , ulong last)
189 {
190 if(_State == ProgressStateType.ProgressStateReplicate)
191 {
192 if( rejected <= _Match)
193 return false;
194
195 _Next = _Match + 1;
196 return true;
197 }
198
199 if(_Next - 1 != rejected)
200 {
201 return false;
202 }
203
204 _Next = min(rejected , last + 1);
205 if(_Next < 1)
206 _Next = 1;
207
208 resume();
209
210 return true;
211 }
212
213 void pause() { _Paused = true;}
214 void resume(){ _Paused = false;}
215
216 bool IsPaused()
217 {
218 switch(_State)
219 {
220 case ProgressStateType.ProgressStateProbe:
221 return _Paused;
222 case ProgressStateType.ProgressStateReplicate:
223 return _ins.full();
224 case ProgressStateType.ProgressStateSnapshot:
225 return true;
226 default:
227 return false;
228 }
229 }
230
231 void snapshotFailure()
232 {
233 _PendingSnapshot = 0;
234 }
235
236 bool needSnapshotAbort()
237 {
238 return _State == ProgressStateType.ProgressStateSnapshot && _Match >= _PendingSnapshot;
239 }
240
241 override string toString() {
242 return format("next = %d, match = %d, state = %s, waiting = %d, pendingSnapshot = %d", _Next, _Match, _State, IsPaused(), _PendingSnapshot);
243 }
244
245 }
246
247