1 module hunt.raft.Log; 2 3 import hunt.raft.Storage; 4 import hunt.raft.Logunstable; 5 import hunt.raft.Msg; 6 import hunt.raft.Util; 7 8 import hunt.logging; 9 10 import std.conv; 11 import std.format; 12 import std.algorithm; 13 14 class raftLog 15 { 16 Storage _storage; 17 unstable _unstable; 18 ulong _committed; 19 ulong _applied; 20 21 this(Storage storage) 22 { 23 if (storage is null) 24 { 25 logError("storage must not be nil"); 26 } 27 28 _storage = storage; 29 _unstable = new unstable(); 30 ulong firstIndex; 31 auto err = _storage.FirstIndex(firstIndex); 32 if(err != ErrNil) 33 { 34 logError(err); 35 } 36 37 ulong lastIndex; 38 err = _storage.LastIndex(lastIndex); 39 if(err != ErrNil) 40 { 41 logError(err); 42 } 43 44 _unstable._offset = lastIndex + 1; 45 _committed = firstIndex - 1; 46 _applied = firstIndex -1; 47 } 48 49 override string toString() 50 { 51 return format("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", 52 _committed, _applied, _unstable._offset, _unstable._entries.length); 53 } 54 55 bool maybeAppend(ulong index , ulong logTerm , ulong committed , Entry[] ents , 56 out ulong lastnewi ) 57 { 58 if(matchTerm(index , logTerm)){ 59 lastnewi = index + ents.length; 60 auto ci = findConflict(ents); 61 if( ci == 0){ } 62 else if(ci <= _committed) 63 { 64 logError(format("entry %d conflict with committed entry [committed(%d)]", ci, _committed)); 65 } 66 else 67 { 68 auto offset = index + 1; 69 append(ents[ cast(uint)(ci - offset) .. $ ]); 70 } 71 commitTo(min(committed , lastnewi)); 72 return true; 73 74 } 75 76 return false; 77 } 78 79 ulong append(Entry[] ents){ 80 if(ents.length == 0) 81 return lastIndex(); 82 83 auto after = ents[0].Index - 1; 84 if(after < _committed) 85 { 86 logError(format("after(%d) is out of range [committed(%d)]", after, _committed)); 87 } 88 89 _unstable.truncateAndAppend(ents); 90 return lastIndex(); 91 } 92 93 ulong findConflict(Entry[] ents) 94 { 95 foreach(ne ; ents){ 96 if(!matchTerm(ne.Index , ne.Term)){ 97 98 if(ne.Index <= lastIndex()){ 99 ulong t; 100 auto err = term(ne.Index , t); 101 logInfo("found conflict at index %d [existing term: %d, conflicting term: %d]", 102 ne.Index, zeroTermOnErrCompacted(t , err), ne.Term); 103 } 104 105 return ne.Index; 106 } 107 } 108 return 0; 109 } 110 111 Entry[] unstableEntries() 112 { 113 return _unstable._entries; 114 } 115 116 Entry[] nextEnts() 117 { 118 auto off = max(_applied + 1 , firstIndex()); 119 if( _committed + 1 > off) 120 { 121 Entry[] ents; 122 auto err = slice(off , _committed + 1, noLimit , ents); 123 if(err != ErrNil) 124 { 125 logError(format("unexpected error when getting unapplied entries (%s)", err)); 126 } 127 return ents; 128 } 129 return null; 130 } 131 132 bool hasNextEnts() { 133 134 auto off = max(_applied + 1, firstIndex()); 135 return _committed + 1 > off; 136 } 137 138 ErrString GetSnap(out Snapshot ss) { 139 if (_unstable._snap != null) { 140 ss = *_unstable._snap; 141 return ErrNil; 142 } 143 return _storage.GetSnap(ss); 144 } 145 146 147 148 149 ulong firstIndex() { 150 151 ulong index; 152 if(_unstable.maybeFirstIndex(index)) 153 return index; 154 155 auto err = _storage.FirstIndex(index); 156 if (err != ErrNil) { 157 logError(err); 158 } 159 return index; 160 } 161 162 163 ulong lastIndex() { 164 ulong index; 165 if(_unstable.maybeLastIndex(index)) 166 { 167 return index; 168 } 169 170 auto err = _storage.LastIndex(index); 171 if(err != ErrNil){ 172 logError(err); 173 } 174 return index; 175 } 176 177 void commitTo(ulong tocommit) { 178 179 if (_committed < tocommit) { 180 if (lastIndex() < tocommit) { 181 logError(format("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, lastIndex())); 182 } 183 _committed = tocommit; 184 } 185 } 186 187 void appliedTo(ulong i) { 188 if (i == 0) { 189 return; 190 } 191 if (_committed < i || i < _applied) { 192 logError(format("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, _applied, _committed)); 193 } 194 _applied = i; 195 } 196 197 void stableTo(ulong i , ulong t) { 198 _unstable.stableTo(i, t); 199 } 200 201 void stableSnapTo(ulong i) { 202 _unstable.stableSnapTo(i) ; 203 } 204 205 ulong lastTerm() { 206 207 ulong t; 208 auto err = term(lastIndex() , t); 209 if( err != ErrNil) { 210 logError(format("unexpected error when getting the last term (%s)", err)); 211 } 212 return t; 213 } 214 215 216 ErrString term(ulong i, out ulong term) { 217 218 auto dummyIndex = firstIndex() - 1; 219 if (i < dummyIndex || i > lastIndex()) { 220 term = 0; 221 return ErrNil; 222 } 223 224 225 if(_unstable.maybeTerm(i ,term)){ 226 227 return ErrNil; 228 } 229 230 auto err = _storage.Term(i , term); 231 if (err == ErrNil) { 232 233 return ErrNil; 234 } 235 if (err == ErrCompacted || err == ErrUnavailable) { 236 237 term = 0; 238 return err; 239 } 240 logError(err); 241 return ErrNil; 242 } 243 244 245 246 ErrString entries(ulong i,ulong maxsize , out Entry[] ents) { 247 if (i > lastIndex()) { 248 return ErrNil; 249 } 250 return slice(i, lastIndex()+1, maxsize , ents); 251 } 252 253 254 Entry[] allEntries() { 255 Entry[] ents; 256 auto err = entries(firstIndex(), noLimit , ents); 257 if (err == ErrNil) { 258 return ents; 259 } 260 261 if (err == ErrCompacted) { 262 return allEntries(); 263 } 264 265 logError(err); 266 return null; 267 } 268 269 270 bool isUpToDate(ulong lasti, ulong term) 271 { 272 return term > lastTerm() || (term == lastTerm() && lasti >= lastIndex()); 273 } 274 275 bool matchTerm(ulong i , ulong t) 276 { 277 ulong t0; 278 auto err = term(i , t0); 279 if(err != ErrNil) 280 return false; 281 return t0 == t; 282 } 283 284 bool maybeCommit(ulong maxIndex , ulong t) 285 { 286 ulong t0; 287 auto err = term(maxIndex ,t0); 288 if(maxIndex > _committed && zeroTermOnErrCompacted(t0 , err) == t) 289 { 290 commitTo(maxIndex); 291 return true; 292 } 293 294 return false; 295 } 296 297 void restore(Snapshot ss) 298 { 299 logInfo(format("log [%s] starts to restore snapshot [index: %d, term: %d]", 300 to!string(this.toHash()), ss.Metadata.Index, ss.Metadata.Term)); 301 _committed = ss.Metadata.Index; 302 _unstable.restore(ss); 303 } 304 305 ErrString slice(ulong lo , ulong hi , ulong maxSize , out Entry[] ents) 306 { 307 auto err = mustCheckOutOfBounds(lo , hi); 308 if( err != ErrNil) 309 { 310 return err; 311 } 312 313 if(lo == hi) 314 { 315 return ErrNil; 316 } 317 318 319 if( lo < _unstable._offset) 320 { 321 Entry[] store; 322 err = _storage.Entries(lo , min(hi , _unstable._offset) ,maxSize , store); 323 if(err == ErrCompacted) 324 { 325 return err; 326 } 327 else if(err == ErrUnavailable) 328 { 329 logError(format("entries[%d:%d) is unavailable from storage", 330 lo, min(hi, _unstable._offset))); 331 } 332 else if(err != ErrNil) 333 { 334 logError(err); 335 } 336 337 if(store.length < min(hi , _unstable._offset) - lo) 338 { 339 ents = store; 340 return ErrNil; 341 } 342 343 ents = store; 344 } 345 346 if(hi > _unstable._offset) 347 { 348 Entry[] uns = _unstable.slice(max(lo , _unstable._offset) , hi); 349 if( ents.length > 0) 350 ents ~= uns; 351 else 352 ents = uns; 353 } 354 355 ents = limitSize(ents , maxSize); 356 return ErrNil; 357 } 358 359 ErrString mustCheckOutOfBounds(ulong lo , ulong hi) 360 { 361 if(lo > hi) 362 { 363 logError(format("invalid slice %d > %d", lo, hi)); 364 } 365 366 ulong fi = firstIndex(); 367 if( lo < fi){ 368 return ErrCompacted; 369 } 370 371 auto length = lastIndex() + 1 - fi; 372 if(lo < fi || hi > fi + length) 373 { 374 logError(format("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, lastIndex())); 375 } 376 377 return ErrNil; 378 } 379 380 381 382 ulong zeroTermOnErrCompacted(ulong t, ErrString err){ 383 if(err == ErrNil) 384 { 385 return t; 386 } 387 if (err == ErrCompacted) { 388 return 0; 389 } 390 391 logError(format("unexpected error (%s)", err)); 392 return 0; 393 } 394 } 395 396 397