1 module hunt.raft.Raft; 2 3 import hunt.raft.Storage; 4 import hunt.raft.Readonly; 5 import hunt.raft.Log; 6 import hunt.raft.Progress; 7 import hunt.raft.Msg; 8 import hunt.raft.Util; 9 import hunt.raft.Node; 10 11 import hunt.logging; 12 13 import std.algorithm; 14 import std.algorithm.sorting; 15 import std.experimental.allocator; 16 import std.conv; 17 import std.format; 18 19 import core.stdc.stdlib; 20 21 enum StateType{ 22 StateFollower = 0, 23 StateCandidate, 24 StateLeader, 25 StatePreCandidate, 26 numStates, 27 } 28 29 enum ReadOnlyOption 30 { 31 ReadOnlySafe = 0, 32 ReadOnlyLeaseBased, 33 } 34 35 alias CampaignType = string; 36 37 enum campaignPreElection = "CampaignPreElection"; 38 enum campaignElection = "CampaignElection"; 39 enum campaignTransfer = "CampaignTransfer"; 40 41 42 43 class Config 44 { 45 ulong _ID; 46 ulong[] _peers; 47 int _ElectionTick; 48 int _HeartbeatTick; 49 50 Storage _storage; 51 ulong _Applied; 52 ulong _MaxSizePerMsg; 53 int _MaxInflightMsgs; 54 bool _CheckQuorum; 55 bool _PreVote; 56 ReadOnlyOption _ROOption; 57 bool _DisProForw; 58 59 60 61 ErrString validate() 62 { 63 if (_ID == None) 64 return "cannot use none as id"; 65 66 if(_HeartbeatTick <= 0) 67 return "heartbeat tick must be greater than 0"; 68 69 if(_ElectionTick <= _HeartbeatTick) 70 return "election tick must be greater than heartbeat tick"; 71 72 if(_storage is null) 73 return "storage cannot be nil"; 74 75 if(_MaxInflightMsgs <= 0) 76 return "max inflight messages must be greater than 0"; 77 78 return ErrNil; 79 } 80 } 81 82 83 class Raft 84 { 85 ulong _id; 86 ulong _Term; 87 ulong _Vote; 88 ReadState[] _readStates; 89 raftLog _raftLog; 90 int _maxInflight; 91 ulong _maxMsgSize; 92 93 Progress[ulong] _prs; 94 StateType _state; 95 bool[ulong] _votes; 96 97 Message[] _msgs; 98 ulong _lead; 99 ulong _leadTransferee; 100 bool _pendingConf; 101 readOnly _readOnly; 102 103 int _electionElapsed; 104 int _heartbeatElapsed; 105 bool _checkQuorum; 106 bool _preVote; 107 108 int _heartbeatTimeout; 109 int _electionTimeout; 110 111 int _randomizedElectionTimeout; 112 bool _disProForw; 113 void delegate(Message) _step; 114 void delegate() _tick; 115 116 this(Config c) 117 { 118 auto err = c.validate(); 119 if(err != ErrNil) 120 logError(err); 121 122 _raftLog = new raftLog(c._storage); 123 HardState hs; 124 ConfState cs; 125 err = c._storage.InitalState(hs , cs); 126 if(err != ErrNil) 127 logError(err); 128 129 auto peers = c._peers; 130 if(cs.Nodes.length > 0) 131 { 132 if(peers.length > 0) 133 { 134 logError("cannot specify both newRaft(peers) and ConfState.Nodes)"); 135 } 136 peers = cs.Nodes; 137 } 138 139 _id = c._ID; 140 _lead = None; 141 _maxMsgSize = c._MaxSizePerMsg; 142 _maxInflight = c._MaxInflightMsgs; 143 _electionTimeout = c._ElectionTick; 144 _heartbeatTimeout = c._HeartbeatTick; 145 146 _checkQuorum = c._CheckQuorum; 147 _preVote = c._PreVote; 148 _readOnly = new readOnly(c._ROOption); 149 _disProForw = c._DisProForw; 150 151 foreach(p ; peers) 152 { 153 auto prs = new Progress(); 154 prs._Next = 1; 155 prs._ins = new inflights(_maxInflight); 156 _prs[p] = prs; 157 158 } 159 160 if( !isHardStateEqual(hs ,emptyState)){ 161 loadState(hs); 162 } 163 164 if(c._Applied > 0 ) 165 { 166 _raftLog.appliedTo(c._Applied); 167 } 168 169 becomeFollower(_Term , None); 170 171 string nodestr; 172 foreach( n ; nodes()) 173 { 174 nodestr ~= to!string(n); 175 } 176 177 srand(cast(uint)(_id * _id)); 178 179 logInfo(format("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", 180 _id, nodestr, _Term, _raftLog._committed, _raftLog._applied, 181 _raftLog.lastIndex(), _raftLog.lastTerm())); 182 183 } 184 185 bool hasLeader() 186 { 187 return _lead != None; 188 } 189 190 SoftState* softState() 191 { 192 return theAllocator.make!SoftState(_lead , _state ); 193 } 194 195 HardState hardState() 196 { 197 HardState hard = { Term : _Term , Vote : _Vote , Commit :_raftLog._committed}; 198 return hard; 199 } 200 201 int quorum() 202 { 203 return cast(int)_prs.length /2 + 1; 204 } 205 206 ulong[] nodes() 207 { 208 ulong[] ns = _prs.keys(); 209 ns.sort(); 210 return ns; 211 } 212 213 void send(Message m) 214 { 215 m.From = _id; 216 if( m.Type == MessageType.MsgVote || 217 m.Type == MessageType.MsgVoteResp || 218 m.Type == MessageType.MsgPreVote || 219 m.Type == MessageType.MsgPreVoteResp) 220 { 221 if(m.Term == 0) 222 { 223 logError(format("term should be set when sending %s", m.Type)); 224 } 225 } 226 else 227 { 228 if(m.Term != 0) 229 { 230 logError(format("term should not be set when sending %s (was %d)", m.Type, m.Term)); 231 } 232 233 if( m.Type != MessageType.MsgProp && m.Type != MessageType.MsgReadIndex) 234 { 235 m.Term = _Term; 236 } 237 } 238 _msgs ~= m; 239 } 240 241 void sendAppend(ulong to) 242 { 243 auto pr = _prs[to]; 244 if(pr.IsPaused()) 245 return; 246 247 Message m; 248 m.To = to; 249 250 ulong term; 251 Entry[] ents; 252 auto errt = _raftLog.term(pr._Next - 1 , term); 253 auto erre = _raftLog.entries(pr._Next , _maxMsgSize , ents); 254 255 if( errt != ErrNil || erre != ErrNil ) 256 { 257 if( !pr._RecentActive ) 258 { 259 logDebug(format("ignore sending snapshot to %x since it is not recently active", to)); 260 return; 261 } 262 263 m.Type = MessageType.MsgSnap; 264 Snapshot snap; 265 auto err = _raftLog.GetSnap(snap); 266 if( err != ErrNil) 267 { 268 if( err == ErrSnapshotTemporarilyUnavailable ) 269 { 270 logDebug(format("%x failed to send snapshot to %x because snapshot is temporarily unavailable", _id, to)); 271 return ; 272 } 273 274 logError(err); 275 } 276 277 if (IsEmptySnap(snap)) 278 { 279 logError("need non-empty snapshot"); 280 } 281 282 m.snap = snap; 283 auto sindex = snap.Metadata.Index; 284 auto sterm = snap.Metadata.Term; 285 286 logDebug(format("%d [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", 287 _id, _raftLog.firstIndex(), _raftLog._committed, sindex, sterm, to, pr)); 288 289 pr.becomeSnapshot(sindex); 290 logDebug("%x paused sending replication messages to %x [%s]", _id, to, pr); 291 } 292 else{ 293 m.Type = MessageType.MsgApp; 294 m.Index = pr._Next - 1; 295 m.LogTerm = term; 296 m.Entries = ents; 297 m.Commit = _raftLog._committed; 298 299 int n = cast(int)m.Entries.length; 300 if( n != 0) 301 { 302 switch(pr._State) 303 { 304 case ProgressStateType.ProgressStateReplicate: 305 auto last = m.Entries[n - 1].Index; 306 pr.optimisticUpdate(last); 307 pr._ins.add(last); 308 break; 309 case ProgressStateType.ProgressStateProbe: 310 pr.pause(); 311 break; 312 default: 313 logError(format("%x is sending append in unhandled state %s", _id, pr._State)); 314 } 315 } 316 317 318 } 319 320 send(m); 321 } 322 323 void sendHeartbeat(ulong to , string ctx) 324 { 325 auto commit = min(_prs[to]._Match , _raftLog._committed); 326 Message m = {To:to , Type:MessageType.MsgHeartbeat , Commit:commit , Context:ctx}; 327 send(m); 328 } 329 330 void bcastAppend() 331 { 332 foreach( id ; _prs.keys()) 333 { 334 if(id == _id) 335 continue; 336 sendAppend(id); 337 } 338 } 339 340 void bcastHeartbeat() 341 { 342 auto lastCtx = _readOnly.lastPendingRequestCtx(); 343 if(lastCtx.length == 0) 344 bcastHeartbeatWithCtx(ErrNil); 345 else 346 bcastHeartbeatWithCtx(lastCtx); 347 } 348 349 350 351 void bcastHeartbeatWithCtx(string ctx) 352 { 353 foreach( id ; _prs.keys()) 354 { 355 if(id == _id) 356 continue; 357 sendHeartbeat(id , ctx); 358 } 359 } 360 361 bool maybeCommit() 362 { 363 ulong[] mis; 364 foreach( id ; _prs.keys()) 365 { 366 mis ~= _prs[id]._Match; 367 } 368 369 mis.reverse(); 370 mis.sort(); 371 auto mci = mis[quorum() - 1]; 372 return _raftLog.maybeCommit(mci , _Term); 373 } 374 375 void reset(ulong term) 376 { 377 if(_Term != term) 378 { 379 logWarning(_id , " reset term " , term); 380 _Term = term; 381 _Vote = None; 382 } 383 384 _lead = None; 385 _electionElapsed = 0; 386 _heartbeatElapsed = 0; 387 resetRandomizedElectionTimeout(); 388 abortLeaderTransfer(); 389 _votes.clear(); 390 foreach( id ; _prs.keys()) 391 { 392 auto prs = new Progress(); 393 prs._Next = _raftLog.lastIndex() + 1; 394 prs._ins = new inflights(_maxInflight); 395 _prs[id] = prs; 396 397 if(id == _id) 398 { 399 prs._Match = _raftLog.lastIndex(); 400 } 401 } 402 403 _pendingConf = false; 404 _readOnly = new readOnly(_readOnly._option); 405 } 406 407 void appendEntry(Entry []es) 408 { 409 auto li = _raftLog.lastIndex(); 410 for( auto i = 0; i < es.length ; i++) 411 { 412 es[i].Term = _Term; 413 es[i].Index = li + 1 + i; 414 } 415 416 _raftLog.append(es); 417 _prs[_id].maybeUpdate(_raftLog.lastIndex()); 418 maybeCommit(); 419 } 420 421 void tickElection() 422 { 423 _electionElapsed++; 424 425 if( promotable() && pastElectionTimeout()) 426 { 427 _electionElapsed = 0; 428 Message msg = {From:_id , Type:MessageType.MsgHup}; 429 Step(msg); 430 } 431 } 432 433 void tickHeartbeat() 434 { 435 _heartbeatElapsed++; 436 _electionElapsed++; 437 438 if(_electionElapsed >= _electionTimeout) 439 { 440 _electionElapsed = 0; 441 if( _checkQuorum) 442 { 443 Message msg = {From:_id , Type:MessageType.MsgCheckQuorum}; 444 Step(msg); 445 } 446 447 if(_state == StateType.StateLeader && _leadTransferee != None) 448 { 449 abortLeaderTransfer(); 450 } 451 } 452 453 if(_state != StateType.StateLeader) 454 return; 455 456 if(_heartbeatElapsed >= _heartbeatTimeout) 457 { 458 _heartbeatElapsed = 0; 459 Message msg = {From:_id , Type:MessageType.MsgBeat}; 460 Step(msg); 461 } 462 } 463 464 void becomeFollower(ulong term , ulong lead) 465 { 466 _step = &stepFollower; 467 reset(term); 468 _tick = &tickElection; 469 _lead = lead; 470 _state = StateType.StateFollower; 471 logInfo(format("%x became follower at term %d", _id, _Term)); 472 } 473 474 void becomeCandidate() 475 { 476 if(_state == StateType.StateLeader) 477 { 478 logError("invalid transition [leader -> candidate]"); 479 } 480 481 _step = &stepCandidate; 482 reset(_Term + 1); 483 _tick = &tickElection; 484 _Vote = _id; 485 _state = StateType.StateCandidate; 486 logInfo(format("%x became candidate at term %d", _id, _Term)); 487 } 488 489 void becomeLeader() 490 { 491 if(_state == StateType.StateFollower) 492 { 493 logError("invalid transition [follower -> leader]"); 494 } 495 496 _step = &stepLeader; 497 reset(_Term); 498 _tick = &tickHeartbeat; 499 _lead = _id; 500 _state = StateType.StateLeader; 501 Entry[] ents; 502 auto err = _raftLog.entries(_raftLog._committed + 1, noLimit , ents); 503 if( err != ErrNil) 504 { 505 logError("unexpected error getting uncommitted entries " , err); 506 } 507 508 auto nconf = numOfPendingConf(ents); 509 if( nconf > 1) 510 { 511 logError("unexpected multiple uncommitted config entry"); 512 } 513 514 if(nconf == 1) 515 _pendingConf = true; 516 517 518 Entry[1] ent; 519 appendEntry(ent); 520 logInfo(format("%x became leader at term %d", _id, _Term)); 521 } 522 523 void campaign(CampaignType t) 524 { 525 ulong term; 526 MessageType voteMsg; 527 if(t == campaignPreElection) 528 { 529 becomeCandidate(); 530 voteMsg = MessageType.MsgPreVote; 531 term = _Term + 1; 532 } 533 else 534 { 535 becomeCandidate(); 536 voteMsg = MessageType.MsgVote; 537 term = _Term; 538 } 539 540 if(quorum() == poll(_id , voteRespMsgType(voteMsg) , true)) 541 { 542 if( t == campaignPreElection) 543 { 544 campaign(campaignElection); 545 } 546 else 547 { 548 becomeLeader(); 549 } 550 return; 551 } 552 553 foreach( id ; _prs.keys) 554 { 555 if(id == _id) 556 continue; 557 558 logInfo(format("%x [logterm: %d, index: %d] sent %s request to %x at term %d", 559 _id, _raftLog.lastTerm(), _raftLog.lastIndex(), voteMsg, id, _Term)); 560 561 string ctx; 562 if(t == campaignTransfer) 563 ctx = t; 564 565 Message msg = {Term : term , To : id , Type : voteMsg , Index:_raftLog.lastIndex(), 566 LogTerm:_raftLog.lastTerm() , Context:ctx}; 567 send(msg); 568 } 569 } 570 571 int poll(ulong id , MessageType t , bool v) 572 { 573 if(v) 574 { 575 logInfo(format("%x received %s from %x at term %d", _id, t, id, _Term)); 576 } 577 else 578 { 579 logInfo(format("%x received %s rejection from %x at term %d", _id, t, id, _Term)); 580 } 581 582 auto exist = id in _votes; 583 if( exist == null) 584 _votes[id] = v; 585 586 ulong granted = 0; 587 foreach( key,value ; _votes) 588 { 589 if(value) 590 { 591 granted++; 592 } 593 } 594 595 return cast(int)granted; 596 } 597 598 ErrString Step(Message m) 599 { 600 if(m.Term == 0){} 601 else if(m.Term > _Term) 602 { 603 auto lead = m.From; 604 if( m.Type == MessageType.MsgVote || m.Type == MessageType.MsgPreVote) 605 { 606 auto force = (m.Context == campaignTransfer); 607 auto inLease = _checkQuorum && _lead != None && _electionElapsed < _electionTimeout; 608 if( !force && inLease) 609 { 610 logInfo(format("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)", 611 _id, _raftLog.lastTerm(), _raftLog.lastIndex(), _Vote, m.Type, m.From, m.LogTerm, m.Index, 612 _Term, _electionTimeout-_electionElapsed)); 613 } 614 lead = None; 615 } 616 617 if(m.Type == MessageType.MsgPreVote){} 618 else if(m.Type == MessageType.MsgPreVoteResp && !m.Reject){} 619 else{ 620 logInfo(format("%x [term: %d] received a %s message with higher term from %x [term: %d]", 621 _id, _Term, m.Type, m.From, m.Term)); 622 becomeFollower(m.Term , lead); 623 } 624 625 } 626 else if(m.Term < _Term) 627 { 628 if(_checkQuorum && (m.Type == MessageType.MsgHeartbeat || m.Type == MessageType.MsgApp)) 629 { 630 Message msg = {To:m.From , Type:MessageType.MsgAppResp}; 631 send(msg); 632 } 633 else 634 { 635 logInfo(format("%x [term: %d] ignored a %s message with lower term from %x [term: %d]", 636 _id, _Term, m.Type, m.From, m.Term)); 637 } 638 639 return ErrNil; 640 } 641 642 switch(m.Type) 643 { 644 case MessageType.MsgHup: 645 if(_state != StateType.StateLeader) 646 { 647 Entry []ents; 648 auto err = _raftLog.slice(_raftLog._applied + 1 , _raftLog._committed + 1, noLimit , ents); 649 if( err != ErrNil) 650 { 651 logError("unexpected error getting unapplied entries", err); 652 } 653 654 auto n = numOfPendingConf(ents); 655 if( n != 0 && _raftLog._committed > _raftLog._applied) 656 { 657 logWarning(format("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", 658 _id, _Term, n)); 659 return ErrNil; 660 } 661 662 logInfo(format("%x is starting a new election at term %d", _id, _Term)); 663 664 if(_preVote) 665 campaign(campaignPreElection); 666 else 667 campaign(campaignElection); 668 669 670 } 671 else 672 { 673 logDebug("%x ignoring MsgHup because already leader", _id); 674 } 675 676 break; 677 case MessageType.MsgVote , MessageType.MsgPreVote: 678 if( (_Vote == None || m.Term > _Term || _Vote == m.From) && _raftLog.isUpToDate(m.Index , 679 m.LogTerm)) 680 { 681 logInfo(format("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d", 682 _id, _raftLog.lastTerm(), _raftLog.lastIndex(), _Vote, m.Type, m.From, m.LogTerm, m.Index, _Term)); 683 684 Message msg = {To:m.From , Term:m.Term , Type : voteRespMsgType(m.Type)}; 685 send(msg); 686 if(m.Type == MessageType.MsgVote) 687 { 688 _electionElapsed = 0; 689 _Vote = m.From; 690 } 691 } 692 else 693 { 694 logInfo(format("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", 695 _id, _raftLog.lastTerm(), _raftLog.lastIndex(), _Vote, m.Type, m.From, m.LogTerm, m.Index, _Term)); 696 697 Message msg = {To: m.From , 698 Term : _Term , Type : voteRespMsgType(m.Type) , Reject:true}; 699 send(msg); 700 } 701 break; 702 default: 703 _step(m); 704 } 705 706 return ErrNil; 707 } 708 709 void stepLeader(Message m) 710 { 711 switch( m.Type) 712 { 713 case MessageType.MsgBeat: 714 bcastHeartbeat(); 715 return; 716 case MessageType.MsgCheckQuorum: 717 if( !checkQuorumActive()) 718 { 719 logWarning(_id , " stepped down to follower since quorum is not active"); 720 becomeFollower(_Term , None); 721 } 722 return; 723 case MessageType.MsgProp: 724 if(m.Entries.length == 0) 725 { 726 logError(_id ," stepped empty MsgProp"); 727 } 728 729 auto exist = _id in _prs; 730 if(exist == null) 731 return; 732 733 if(_leadTransferee != None) 734 { 735 logDebug(format("%x [term %d] transfer leadership to %x is in progress; dropping proposal", 736 _id, _Term, _leadTransferee)); 737 return; 738 } 739 740 foreach( i , e ; m.Entries) 741 { 742 if(e.Type == EntryType.EntryConfChange) 743 { 744 if(_pendingConf) 745 { 746 logInfo(format("propose conf %s ignored since pending unapplied configuration", e)); 747 Entry en = {Type : EntryType.EntryNormal}; 748 m.Entries[i] = en; 749 } 750 _pendingConf = true; 751 } 752 } 753 754 appendEntry(m.Entries); 755 bcastAppend(); 756 return; 757 case MessageType.MsgReadIndex: 758 if( quorum() > 1) 759 { 760 ulong term; 761 auto err = _raftLog.term(_raftLog._committed , term); 762 if(_raftLog.zeroTermOnErrCompacted(term , err) != _Term) 763 { 764 return; 765 } 766 767 switch(_readOnly._option) 768 { 769 case ReadOnlyOption.ReadOnlySafe: 770 _readOnly.addRequest(_raftLog._committed , m); 771 bcastHeartbeatWithCtx(m.Entries[0].Data); 772 break; 773 case ReadOnlyOption.ReadOnlyLeaseBased: 774 { 775 ulong ri = 0; 776 if(_checkQuorum) 777 { 778 ri = _raftLog._committed; 779 } 780 781 if(m.From == None || m.From == _id) 782 { 783 ReadState rs = {Index:_raftLog._committed , RequestCtx: m.Entries[0].Data}; 784 _readStates ~= rs; 785 }else 786 { 787 Message msg = {To : m.From , Type:MessageType.MsgReadIndexResp , Index:ri , Entries:m.Entries}; 788 789 send(msg); 790 } 791 } 792 break; 793 default: 794 assert(0); 795 } 796 797 } 798 else 799 { 800 ReadState rs = {Index:_raftLog._committed , RequestCtx:m.Entries[0].Data}; 801 _readStates ~= rs; 802 } 803 return; 804 default: 805 806 } 807 808 auto pr = m.From in _prs; 809 if(pr == null) 810 { 811 logDebug(format("%x no progress available for %x", _id, m.From)); 812 } 813 814 switch(m.Type) 815 { 816 case MessageType.MsgAppResp: 817 pr._RecentActive = true; 818 819 if(m.Reject ) 820 { 821 logDebug(format("%x received msgApp rejection(lastindex: %d) from %x for index %d", 822 _id, m.RejectHint, m.From, m.Index)); 823 if(pr.maybeDecrTo(m.Index , m.RejectHint)) 824 { 825 logDebug(format("%x decreased progress of %x to [%s]", _id, m.From, pr)); 826 if(pr._State == ProgressStateType.ProgressStateReplicate) 827 { 828 pr.becomeProbe(); 829 } 830 sendAppend(m.From); 831 } 832 } 833 else 834 { 835 auto oldPaused = pr.IsPaused(); 836 if( pr.maybeUpdate(m.Index)) 837 { 838 if(pr._State == ProgressStateType.ProgressStateProbe) 839 pr.becomeReplicate(); 840 else if(pr._State == ProgressStateType.ProgressStateSnapshot && 841 pr.needSnapshotAbort()) 842 { 843 logDebug("%x snapshot aborted, resumed sending replication messages to %x [%s]", _id, m.From, pr); 844 pr.becomeProbe(); 845 } 846 else if(pr._State == ProgressStateType.ProgressStateReplicate) 847 pr._ins.freeTo(m.Index); 848 849 if(maybeCommit()) 850 { 851 bcastAppend(); 852 } 853 else if(oldPaused) 854 { 855 sendAppend(m.From); 856 } 857 858 if(m.From == _leadTransferee && pr._Match == _raftLog.lastIndex()) 859 { 860 logInfo(format("%x sent MsgTimeoutNow to %x after received MsgAppResp", _id, m.From)); 861 sendTimeoutNow(m.From); 862 } 863 } 864 865 } 866 break; 867 case MessageType.MsgHeartbeatResp: 868 pr._RecentActive = true; 869 pr.resume(); 870 871 if( pr._State == ProgressStateType.ProgressStateReplicate && pr._ins.full()) 872 { 873 pr._ins.freeFirstOne(); 874 } 875 876 if(pr._Match < _raftLog.lastIndex()) 877 { 878 sendAppend(m.From); 879 } 880 881 if(_readOnly._option != ReadOnlyOption.ReadOnlySafe || m.Context.length == 0) 882 return ; 883 884 auto ackCount = _readOnly.recvAck(m); 885 if(ackCount < quorum()) 886 { 887 return; 888 } 889 890 auto rss = _readOnly.advance(m); 891 foreach( rs ; rss) 892 { 893 auto req = rs.req; 894 if(req.From == None || req.From == _id) 895 { 896 ReadState rs0 = { Index:rs.index , RequestCtx:req.Entries[0].Data}; 897 _readStates ~= rs0; 898 } 899 else 900 { 901 Message msg = {To : req.From , Type : MessageType.MsgReadIndexResp , 902 Index:rs.index ,Entries:req.Entries}; 903 send(msg); 904 } 905 } 906 break; 907 case MessageType.MsgSnapStatus: 908 if( pr._State != ProgressStateType.ProgressStateSnapshot) 909 return; 910 911 if (!m.Reject) 912 { 913 pr.becomeProbe(); 914 logDebug(format("%x snapshot succeeded, resumed sending replication messages to %x [%s]", 915 _id, m.From, pr)); 916 } 917 else 918 { 919 pr.snapshotFailure(); 920 pr.becomeProbe(); 921 logDebug(format("%x snapshot failed, resumed sending replication messages to %x [%s]", _id, m.From, pr)); 922 } 923 924 pr.pause(); 925 break; 926 case MessageType.MsgUnreachable: 927 if( pr._State == ProgressStateType.ProgressStateReplicate) 928 { 929 pr.becomeProbe(); 930 } 931 logDebug(format("%x failed to send message to %x because it is unreachable [%s]", 932 _id, m.From, pr)); 933 break; 934 case MessageType.MsgTransferLeader: 935 auto leadTransferee = m.From; 936 auto lastLeadTransferee = _leadTransferee; 937 if( lastLeadTransferee != None) 938 { 939 if( lastLeadTransferee == leadTransferee) 940 { 941 logInfo(format("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x", 942 _id, _Term, leadTransferee, leadTransferee)); 943 return; 944 } 945 abortLeaderTransfer(); 946 logInfo(format("%x [term %d] abort previous transferring leadership to %x", _id, _Term, lastLeadTransferee)); 947 } 948 949 if(leadTransferee == _id) 950 { 951 logDebug(_id , " is already leader. Ignored transferring leadership to self"); 952 return ; 953 } 954 955 logInfo(format("%x [term %d] starts to transfer leadership to %x", _id, _Term, leadTransferee)); 956 _electionElapsed = 0; 957 _leadTransferee = leadTransferee; 958 if(pr._Match == _raftLog.lastIndex()) 959 { 960 sendTimeoutNow(leadTransferee); 961 logInfo("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", 962 _id, leadTransferee, leadTransferee); 963 } 964 else 965 { 966 sendAppend(leadTransferee); 967 } 968 break; 969 default: 970 // logInfo(m.Type); 971 } 972 } 973 974 void stepCandidate(Message m) 975 { 976 MessageType myVoteRespType; 977 978 if(_state == StateType.StatePreCandidate) 979 myVoteRespType = MessageType.MsgPreVoteResp; 980 else 981 myVoteRespType = MessageType.MsgVoteResp; 982 983 switch(m.Type) 984 { 985 case MessageType.MsgProp: 986 logInfo("%x no leader at term %d; dropping proposal", _id, _Term); 987 return; 988 case MessageType.MsgApp: 989 becomeFollower(_Term , m.From); 990 handleAppendEntries(m); 991 break; 992 case MessageType.MsgHeartbeat: 993 becomeFollower(_Term , m.From); 994 handleHeartbeat(m); 995 break; 996 case MessageType.MsgSnap: 997 becomeFollower(_Term , m.From); 998 handleSnapshot(m); 999 break; 1000 case MessageType.MsgTimeoutNow: 1001 logDebug(format("%x [term %d state %s] ignored MsgTimeoutNow from %x", _id, _Term, _state, m.From)); 1002 break; 1003 default: 1004 } 1005 1006 if( myVoteRespType == m.Type) 1007 { 1008 auto gr = poll(m.From , m.Type , !m.Reject); 1009 logInfo(format("%x [quorum:%d] has received %d %s votes and %d vote rejections", 1010 _id, quorum(), gr, m.Type, _votes.length-gr)); 1011 auto qr = quorum(); 1012 1013 if( gr == qr) 1014 { 1015 if( _state == StateType.StatePreCandidate) 1016 { 1017 campaign(campaignElection); 1018 } 1019 else 1020 { 1021 becomeLeader(); 1022 bcastAppend(); 1023 } 1024 } 1025 else if( _votes.length - gr == qr) 1026 { 1027 becomeFollower(_Term , None); 1028 } 1029 } 1030 } 1031 1032 void stepFollower(Message m) 1033 { 1034 switch( m.Type) 1035 { 1036 case MessageType.MsgProp: 1037 if(_lead == None) 1038 { 1039 logInfo(format("%x no leader at term %d; dropping proposal", _id, _Term)); 1040 return; 1041 } 1042 else if(_disProForw) 1043 { 1044 logInfo(format("%x not forwarding to leader %x at term %d; dropping proposal", _id, _lead, _Term)); 1045 } 1046 1047 m.To = _lead; 1048 send(m); 1049 break; 1050 case MessageType.MsgApp: 1051 _electionElapsed = 0; 1052 _lead = m.From; 1053 handleAppendEntries(m); 1054 break; 1055 case MessageType.MsgHeartbeat: 1056 _electionElapsed = 0; 1057 _lead = m.From; 1058 handleHeartbeat(m); 1059 break; 1060 case MessageType.MsgSnap: 1061 _electionElapsed = 0; 1062 _lead = m.From; 1063 handleSnapshot(m); 1064 break; 1065 case MessageType.MsgTransferLeader: 1066 if (_lead == None) { 1067 logInfo(format("%x no leader at term %d; dropping leader transfer msg", 1068 _id, _Term)); 1069 return; 1070 } 1071 m.To = _lead; 1072 send(m); 1073 break; 1074 case MessageType.MsgTimeoutNow: 1075 if (promotable()) { 1076 logInfo(format("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", 1077 _id, _Term, m.From)); 1078 1079 campaign(campaignTransfer); 1080 } 1081 else { 1082 1083 logInfo(format("%x received MsgTimeoutNow from %x but is not promotable", 1084 _id, m.From)); 1085 } 1086 break; 1087 case MessageType.MsgReadIndex: 1088 if (_lead == None) { 1089 logInfo(format("%x no leader at term %d; dropping index reading msg", 1090 _id, _Term)); 1091 return; 1092 } 1093 m.To = _lead; 1094 send(m); 1095 break; 1096 1097 case MessageType.MsgReadIndexResp: 1098 if (m.Entries.length != 1) { 1099 logError(format("%x invalid format of MsgReadIndexResp from %x, entries count: %d", 1100 _id, m.From, m.Entries.length)); 1101 return; 1102 } 1103 ReadState rs = { Index:m.Index , RequestCtx:m.Entries[0].Data}; 1104 _readStates ~= rs; 1105 break; 1106 default: 1107 1108 } 1109 } 1110 1111 1112 void handleAppendEntries(Message m) { 1113 if (m.Index < _raftLog._committed) { 1114 1115 Message msg = {To: m.From, Type: MessageType.MsgAppResp, Index: _raftLog._committed}; 1116 send(msg); 1117 return; 1118 } 1119 1120 ulong mlastIndex; 1121 auto ok = _raftLog.maybeAppend(m.Index , m.LogTerm , m.Commit , m.Entries , mlastIndex); 1122 1123 if(ok){ 1124 Message msg = { To:m.From , Type:MessageType.MsgAppResp , Index:mlastIndex}; 1125 send(msg); 1126 } 1127 else { 1128 ulong term; 1129 auto err = _raftLog.term(m.Index , term); 1130 1131 logDebug(format("%s %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", 1132 err , _id, _raftLog.zeroTermOnErrCompacted(term , err), m.Index, m.LogTerm, m.Index, m.From)); 1133 1134 Message msg = {To: m.From, Type: MessageType.MsgAppResp, 1135 Index: m.Index, Reject: true, RejectHint: _raftLog.lastIndex()}; 1136 1137 send(msg); 1138 } 1139 } 1140 1141 void handleHeartbeat(Message m) { 1142 _raftLog.commitTo(m.Commit); 1143 Message msg = {To: m.From, Type: MessageType.MsgHeartbeatResp, Context: m.Context}; 1144 send(msg); 1145 } 1146 1147 void handleSnapshot(Message m) { 1148 auto sindex= m.snap.Metadata.Index; 1149 auto sterm = m.snap.Metadata.Term; 1150 1151 if (restore(m.snap)) { 1152 logInfo(format("%x [commit: %d] restored snapshot [index: %d, term: %d]", 1153 _id, _raftLog._committed, sindex, sterm)); 1154 Message msg = {To: m.From, Type: MessageType.MsgAppResp, Index: _raftLog.lastIndex()}; 1155 send(msg); 1156 } else { 1157 logInfo(format("%x [commit: %d] ignored snapshot [index: %d, term: %d]", 1158 _id, _raftLog._committed, sindex, sterm)); 1159 Message msg = {To: m.From, Type: MessageType.MsgAppResp, Index: _raftLog._committed}; 1160 send(msg); 1161 } 1162 } 1163 1164 bool restore(Snapshot s ) { 1165 if (s.Metadata.Index <= _raftLog._committed ){ 1166 return false; 1167 } 1168 if (_raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term)) { 1169 logInfo(format("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", 1170 _id, _raftLog._committed, _raftLog.lastIndex(), _raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)); 1171 _raftLog.commitTo(s.Metadata.Index); 1172 return false; 1173 } 1174 1175 logInfo(format("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", 1176 _id, _raftLog._committed, _raftLog.lastIndex(), _raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)); 1177 1178 _raftLog.restore(s); 1179 1180 foreach( n ;s.Metadata.CS.Nodes) { 1181 1182 ulong match = 0; 1183 auto next = _raftLog.lastIndex()+1; 1184 if (n == _id) { 1185 match = next - 1; 1186 } 1187 1188 setProgress(n, match, next); 1189 logInfo(format("%x restored progress of %x [%s]", _id, n, _prs[n])); 1190 } 1191 return true; 1192 } 1193 1194 bool promotable() { 1195 auto exist = _id in _prs; 1196 return exist != null; 1197 } 1198 1199 void addNode(ulong id) { 1200 1201 _pendingConf = false; 1202 if( id in _prs) 1203 return; 1204 setProgress(id, 0, _raftLog.lastIndex()+1); 1205 _prs[id]._RecentActive = true; 1206 } 1207 1208 1209 1210 void removeNode(ulong id) { 1211 delProgress(id); 1212 _pendingConf = false; 1213 1214 if (_prs.length == 0 ){ 1215 return; 1216 } 1217 1218 1219 if (maybeCommit() ){ 1220 bcastAppend(); 1221 } 1222 1223 if (_state == StateType.StateLeader && _leadTransferee == id) { 1224 abortLeaderTransfer(); 1225 } 1226 } 1227 1228 void resetPendingConf() { _pendingConf = false ;} 1229 1230 void setProgress(ulong id, ulong match, ulong next) { 1231 auto prs = new Progress(); 1232 prs._Next = next; 1233 prs._Match = match; 1234 prs._ins = new inflights(_maxInflight); 1235 _prs[id] = prs; 1236 } 1237 1238 void delProgress(ulong id ) { 1239 _prs.remove(id); 1240 } 1241 1242 void loadState( HardState state) { 1243 if( state.Commit < _raftLog._committed || state.Commit > _raftLog.lastIndex()) { 1244 1245 logError(format("%x state.commit %d is out of range [%d, %d]", 1246 _id, state.Commit, _raftLog._committed, _raftLog.lastIndex())); 1247 1248 } 1249 _raftLog._committed = state.Commit; 1250 logWarning(_id , " loadState term " , _Term); 1251 _Term = state.Term; 1252 _Vote = state.Vote; 1253 } 1254 1255 bool pastElectionTimeout() { 1256 return _electionElapsed >= _randomizedElectionTimeout; 1257 } 1258 1259 void resetRandomizedElectionTimeout() { 1260 _randomizedElectionTimeout = _electionTimeout + rand()%_electionTimeout; 1261 logInfo(_id , " resetRandomizedElectionTimeout " , _randomizedElectionTimeout); 1262 } 1263 1264 bool checkQuorumActive() { 1265 int act; 1266 1267 foreach( id ; _prs.keys()) 1268 { 1269 if (id == _id) { // self is always active 1270 act++; 1271 continue; 1272 } 1273 1274 if (_prs[id]._RecentActive) { 1275 act++; 1276 } 1277 1278 _prs[id]._RecentActive = false; 1279 } 1280 1281 return act >= quorum(); 1282 } 1283 1284 void sendTimeoutNow(ulong to) { 1285 1286 Message msg = {To: to, Type: MessageType.MsgTimeoutNow}; 1287 send(msg); 1288 } 1289 1290 void abortLeaderTransfer() { 1291 _leadTransferee = None; 1292 } 1293 1294 int numOfPendingConf(Entry[] ents) { 1295 auto n = 0; 1296 foreach( i,v;ents) { 1297 if (ents[i].Type == EntryType.EntryConfChange) { 1298 n++; 1299 } 1300 } 1301 return n; 1302 } 1303 } 1304