1 module hunt.raft.Progress; 2 3 import hunt.logging; 4 5 import std.algorithm; 6 import std.format; 7 8 enum ProgressStateType 9 { 10 ProgressStateProbe = 0, 11 ProgressStateReplicate, 12 ProgressStateSnapshot, 13 } 14 15 /* 16 immutable string[] prstmap = 17 [ 18 "ProgressStateProbe", 19 "ProgressStateReplicate", 20 "ProgressStateSnapshot" 21 ]; 22 23 string toString(ProgressStateType st) 24 { 25 return prstmap[st]; 26 } 27 */ 28 29 class inflights 30 { 31 int _start; 32 int _count; 33 int _size; 34 ulong[] _bufffer; 35 36 this(int size) 37 { 38 _size = size; 39 } 40 41 void add(ulong inflight) 42 { 43 if(full()) 44 logError("cannot add into a full inflights"); 45 46 auto next = _start + _count; 47 auto size = _size; 48 if(next >= size) 49 next -= size; 50 51 if(next >= _bufffer.length) 52 growBuf(); 53 54 _bufffer[next] = inflight; 55 _count++; 56 } 57 58 void growBuf() 59 { 60 auto newSize = _bufffer.length * 2; 61 if(newSize == 0) 62 newSize = 1; 63 else if( newSize > _size) 64 newSize = _size; 65 66 _bufffer.length = newSize; 67 } 68 69 void freeTo(ulong to) 70 { 71 if(_count == 0 || to < _bufffer[_start]) 72 return; 73 74 auto i = 0; 75 auto idx = _start; 76 auto size = _size; 77 78 for( ; i < _count ; i++) 79 { 80 if( to < _bufffer[idx]) 81 break; 82 83 idx++; 84 if(idx >= size) 85 idx -= size; 86 } 87 88 _count -= i; 89 _start = idx; 90 91 if(_count == 0) 92 _start = 0; 93 } 94 95 96 void freeFirstOne() 97 { 98 freeTo(_bufffer[_start]); 99 } 100 101 102 bool full() 103 { 104 return _count == _size; 105 } 106 107 void reset() 108 { 109 _count = 0; 110 _start = 0; 111 } 112 113 } 114 115 116 class Progress 117 { 118 ulong _Match; 119 ulong _Next; 120 121 ProgressStateType _State; 122 bool _Paused; 123 ulong _PendingSnapshot; 124 125 bool _RecentActive; 126 127 inflights _ins; 128 129 130 void resetState(ProgressStateType state) 131 { 132 _Paused = false; 133 _PendingSnapshot = 0; 134 _State = state; 135 _ins.reset(); 136 } 137 138 void becomeProbe() 139 { 140 if(_State == ProgressStateType.ProgressStateSnapshot) 141 { 142 auto PendingSnapshot = _PendingSnapshot; 143 resetState(ProgressStateType.ProgressStateProbe); 144 _Next = max(_Match + 1 , PendingSnapshot + 1); 145 } 146 else 147 { 148 resetState(ProgressStateType.ProgressStateProbe); 149 _Next = _Match + 1; 150 } 151 } 152 153 154 void becomeReplicate() 155 { 156 resetState(ProgressStateType.ProgressStateReplicate); 157 _Next = _Match + 1; 158 } 159 160 void becomeSnapshot(ulong snapshoti) 161 { 162 resetState(ProgressStateType.ProgressStateSnapshot); 163 _PendingSnapshot = snapshoti; 164 } 165 166 bool maybeUpdate(ulong n) 167 { 168 bool updated = false; 169 if(_Match < n) 170 { 171 _Match = n; 172 updated = true; 173 resume(); 174 } 175 if(_Next < n + 1) 176 { 177 _Next = n + 1; 178 } 179 180 return updated; 181 } 182 183 void optimisticUpdate(ulong n) 184 { 185 _Next = n + 1; 186 } 187 188 bool maybeDecrTo(ulong rejected , ulong last) 189 { 190 if(_State == ProgressStateType.ProgressStateReplicate) 191 { 192 if( rejected <= _Match) 193 return false; 194 195 _Next = _Match + 1; 196 return true; 197 } 198 199 if(_Next - 1 != rejected) 200 { 201 return false; 202 } 203 204 _Next = min(rejected , last + 1); 205 if(_Next < 1) 206 _Next = 1; 207 208 resume(); 209 210 return true; 211 } 212 213 void pause() { _Paused = true;} 214 void resume(){ _Paused = false;} 215 216 bool IsPaused() 217 { 218 switch(_State) 219 { 220 case ProgressStateType.ProgressStateProbe: 221 return _Paused; 222 case ProgressStateType.ProgressStateReplicate: 223 return _ins.full(); 224 case ProgressStateType.ProgressStateSnapshot: 225 return true; 226 default: 227 return false; 228 } 229 } 230 231 void snapshotFailure() 232 { 233 _PendingSnapshot = 0; 234 } 235 236 bool needSnapshotAbort() 237 { 238 return _State == ProgressStateType.ProgressStateSnapshot && _Match >= _PendingSnapshot; 239 } 240 241 override string toString() { 242 return format("next = %d, match = %d, state = %s, waiting = %d, pendingSnapshot = %d", _Next, _Match, _State, IsPaused(), _PendingSnapshot); 243 } 244 245 } 246 247