1 module hunt.raft.Raft;
2 
3 import hunt.raft.Storage;
4 import hunt.raft.Readonly;
5 import hunt.raft.Log;
6 import hunt.raft.Progress;
7 import hunt.raft.Msg;
8 import hunt.raft.Util;
9 import hunt.raft.Node;
10 
11 import hunt.logging;
12 
13 import std.algorithm;
14 import std.algorithm.sorting;
15 import std.experimental.allocator;
16 import std.conv;
17 import std.format;
18 
19 import core.stdc.stdlib;
20 
21 enum StateType{
22 	StateFollower = 0,
23 	StateCandidate,
24 	StateLeader,
25 	StatePreCandidate,
26 	numStates,
27 }
28 
29 enum ReadOnlyOption
30 {
31 	ReadOnlySafe = 0,
32 	ReadOnlyLeaseBased,
33 }
34 
35 alias CampaignType = string;
36 
37 enum	campaignPreElection = "CampaignPreElection";
38 enum	campaignElection	= "CampaignElection";
39 enum	campaignTransfer 	= "CampaignTransfer";
40 
41 
42 
43 class Config
44 {
45 	ulong 			_ID;
46 	ulong[]			_peers;
47 	int				_ElectionTick;
48 	int				_HeartbeatTick;
49 
50 	Storage			_storage;
51 	ulong			_Applied;
52 	ulong 			_MaxSizePerMsg;
53 	int				_MaxInflightMsgs;
54 	bool			_CheckQuorum;
55 	bool			_PreVote;
56 	ReadOnlyOption	_ROOption;
57 	bool			_DisProForw;
58 
59 
60 
61 	ErrString validate()
62 	{
63 		if (_ID == None)
64 			return "cannot use none as id";
65 
66 		if(_HeartbeatTick <= 0)
67 			return "heartbeat tick must be greater than 0";
68 
69 		if(_ElectionTick <= _HeartbeatTick)
70 			return "election tick must be greater than heartbeat tick";
71 
72 		if(_storage is null)
73 			return "storage cannot be nil";
74 
75 		if(_MaxInflightMsgs <= 0)
76 			return "max inflight messages must be greater than 0";
77 
78 		return ErrNil;
79 	}
80 }
81 
82 
83 class Raft
84 {
85 	ulong			_id;
86 	ulong 			_Term;
87 	ulong			_Vote;
88 	ReadState[]		_readStates;
89 	raftLog			_raftLog;
90 	int				_maxInflight;
91 	ulong			_maxMsgSize;
92 
93 	Progress[ulong]	_prs;
94 	StateType		_state;
95 	bool[ulong]		_votes;
96 
97 	Message[]		_msgs;
98 	ulong 			_lead;
99 	ulong			_leadTransferee;
100 	bool			_pendingConf;
101 	readOnly		_readOnly;
102 
103 	int				_electionElapsed;
104 	int				_heartbeatElapsed;
105 	bool			_checkQuorum;
106 	bool			_preVote;
107 
108 	int				_heartbeatTimeout;
109 	int				_electionTimeout;
110 
111 	int				_randomizedElectionTimeout;
112 	bool			_disProForw;
113 	void 	delegate(Message) _step;
114 	void	delegate() _tick;
115 
116 	this(Config c)
117 	{
118 		auto err = c.validate();
119 		if(err != ErrNil)
120 			logError(err);
121 
122 		_raftLog = new raftLog(c._storage);
123 		HardState hs;
124 		ConfState cs;
125 		err =  c._storage.InitalState(hs , cs);
126 		if(err != ErrNil)
127 			logError(err);
128 
129 		auto peers = c._peers;
130 		if(cs.Nodes.length > 0)
131 		{
132 			if(peers.length > 0)
133 			{
134 				logError("cannot specify both newRaft(peers) and ConfState.Nodes)");
135 			}
136 			peers = cs.Nodes;
137 		}
138 
139 		_id = c._ID;
140 		_lead = None;
141 		_maxMsgSize = c._MaxSizePerMsg;
142 		_maxInflight = c._MaxInflightMsgs;
143 		_electionTimeout = c._ElectionTick;
144 		_heartbeatTimeout = c._HeartbeatTick;
145 
146 		_checkQuorum	=	c._CheckQuorum;
147 		_preVote		=  c._PreVote;
148 		_readOnly 		= new readOnly(c._ROOption);
149 		_disProForw		= c._DisProForw;
150 
151 		foreach(p ; peers)
152 		{
153 			auto prs = new Progress();
154 			prs._Next = 1;
155 			prs._ins = new inflights(_maxInflight);
156 			_prs[p] = prs;
157 
158 		}
159 
160 		if( !isHardStateEqual(hs ,emptyState)){
161 			loadState(hs);
162 		}
163 
164 		if(c._Applied > 0 )
165 		{
166 			_raftLog.appliedTo(c._Applied);
167 		}
168 
169 		becomeFollower(_Term , None);
170 
171 		string nodestr;
172 		foreach( n ; nodes())
173 		{
174 			nodestr ~= to!string(n);
175 		}
176 
177 		srand(cast(uint)(_id * _id));
178 
179 		logInfo(format("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
180 				_id, nodestr, _Term, _raftLog._committed, _raftLog._applied,
181 				_raftLog.lastIndex(), _raftLog.lastTerm()));
182 
183 	}
184 
185 	bool hasLeader()
186 	{
187 		return _lead != None;
188 	}
189 
190 	SoftState* softState()
191 	{
192 		return theAllocator.make!SoftState(_lead , _state );
193 	}
194 
195 	HardState hardState()
196 	{
197 		HardState hard = { Term : _Term , Vote : _Vote , Commit :_raftLog._committed};
198 		return hard;
199 	}
200 
201 	int quorum()
202 	{
203 		return cast(int)_prs.length /2 + 1;
204 	}
205 
206 	ulong[] nodes()
207 	{
208 		ulong[] ns = _prs.keys();
209 		ns.sort();
210 		return ns;
211 	}
212 
213 	void send(Message m)
214 	{
215 		m.From = _id;
216 		if( m.Type == MessageType.MsgVote ||
217 			m.Type == MessageType.MsgVoteResp ||
218 			m.Type == MessageType.MsgPreVote ||
219 			m.Type == MessageType.MsgPreVoteResp)
220 		{
221 			if(m.Term == 0)
222 			{
223 				logError(format("term should be set when sending %s", m.Type));
224 			}
225 		}
226 		else
227 		{
228 			if(m.Term != 0)
229 			{
230 				logError(format("term should not be set when sending %s (was %d)", m.Type, m.Term));
231 			}
232 
233 			if( m.Type != MessageType.MsgProp && m.Type != MessageType.MsgReadIndex)
234 			{
235 				m.Term = _Term;
236 			}
237 		}
238 		_msgs ~= m;
239 	}
240 
241 	void sendAppend(ulong to)
242 	{
243 		auto pr = _prs[to];
244 		if(pr.IsPaused())
245 			return;
246 
247 		Message m;
248 		m.To = to;
249 
250 		ulong term;
251 		Entry[] ents;
252 		auto errt = _raftLog.term(pr._Next - 1 , term);
253 		auto erre = _raftLog.entries(pr._Next  , _maxMsgSize , ents);
254 
255 		if( errt != ErrNil || erre != ErrNil )
256 		{
257 			if( !pr._RecentActive )
258 			{
259 				logDebug(format("ignore sending snapshot to %x since it is not recently active", to));
260 				return;
261 			}
262 
263 			m.Type = MessageType.MsgSnap;
264 			Snapshot snap;
265 			auto err = _raftLog.GetSnap(snap);
266 			if( err != ErrNil)
267 			{
268 				if( err == ErrSnapshotTemporarilyUnavailable )
269 				{
270 					logDebug(format("%x failed to send snapshot to %x because snapshot is temporarily unavailable", _id, to));
271 					return ;
272 				}
273 
274 				logError(err);
275 			}
276 
277 			if (IsEmptySnap(snap))
278 			{
279 				logError("need non-empty snapshot");
280 			}
281 
282 			m.snap = snap;
283 			auto sindex = snap.Metadata.Index;
284 			auto sterm = snap.Metadata.Term;
285 
286 			logDebug(format("%d [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
287 					_id, _raftLog.firstIndex(), _raftLog._committed, sindex, sterm, to, pr));
288 
289 			pr.becomeSnapshot(sindex);
290 			logDebug("%x paused sending replication messages to %x [%s]", _id, to, pr);
291 		}
292 		else{
293 			m.Type = MessageType.MsgApp;
294 			m.Index = pr._Next - 1;
295 			m.LogTerm = term;
296 			m.Entries = ents;
297 			m.Commit = _raftLog._committed;
298 
299 			int n = cast(int)m.Entries.length;
300 			if( n != 0)
301 			{
302 				switch(pr._State)
303 				{
304 					case ProgressStateType.ProgressStateReplicate:
305 						auto last = m.Entries[n - 1].Index;
306 						pr.optimisticUpdate(last);
307 						pr._ins.add(last);
308 						break;
309 					case ProgressStateType.ProgressStateProbe:
310 						pr.pause();
311 						break;
312 					default:
313 						logError(format("%x is sending append in unhandled state %s", _id, pr._State));
314 				}
315 			}
316 
317 
318 		}
319 
320 		send(m);
321 	}
322 
323 	void sendHeartbeat(ulong to , string ctx)
324 	{
325 		auto commit = min(_prs[to]._Match , _raftLog._committed);
326 		Message m = {To:to , Type:MessageType.MsgHeartbeat , Commit:commit , Context:ctx};
327 		send(m);
328 	}
329 
330 	void bcastAppend()
331 	{
332 		foreach(  id ; _prs.keys())
333 		{
334 			if(id == _id)
335 				continue;
336 			sendAppend(id);	
337 		}
338 	}
339 
340 	void bcastHeartbeat()
341 	{
342 		auto lastCtx = _readOnly.lastPendingRequestCtx();
343 		if(lastCtx.length == 0)
344 			bcastHeartbeatWithCtx(ErrNil);
345 		else
346 			bcastHeartbeatWithCtx(lastCtx);
347 	}
348 
349 
350 
351 	void bcastHeartbeatWithCtx(string ctx)
352 	{
353 		foreach(  id ; _prs.keys())
354 		{
355 			if(id == _id)
356 				continue;
357 			sendHeartbeat(id , ctx);	
358 		}
359 	}
360 
361 	bool maybeCommit()
362 	{
363 		ulong[] mis;
364 		foreach( id ; _prs.keys())
365 		{
366 			mis ~= _prs[id]._Match;
367 		}
368 
369 		mis.reverse();
370 		mis.sort();
371 		auto mci = mis[quorum() - 1];
372 		return _raftLog.maybeCommit(mci , _Term);
373 	}
374 
375 	void reset(ulong term)
376 	{
377 		if(_Term != term)
378 		{
379 			logWarning(_id , " reset term " , term);
380 			_Term = term;
381 			_Vote = None;
382 		}
383 
384 		_lead = None;
385 		_electionElapsed = 0;
386 		_heartbeatElapsed = 0;
387 		resetRandomizedElectionTimeout();
388 		abortLeaderTransfer();
389 		_votes.clear();
390 		foreach( id ; _prs.keys())
391 		{
392 			auto prs = new Progress();
393 			prs._Next = _raftLog.lastIndex() + 1;
394 			prs._ins = new inflights(_maxInflight);
395 			_prs[id] = prs;
396 		
397 			if(id == _id)
398 			{
399 				prs._Match = _raftLog.lastIndex();
400 			}
401 		}
402 
403 		_pendingConf = false;
404 		_readOnly = new readOnly(_readOnly._option);
405 	}
406 
407 	void appendEntry(Entry []es)
408 	{
409 		auto li = _raftLog.lastIndex();
410 		for( auto i = 0; i < es.length ; i++)
411 		{
412 			es[i].Term = _Term;
413 			es[i].Index = li + 1 + i;
414 		}
415 
416 		_raftLog.append(es);
417 		_prs[_id].maybeUpdate(_raftLog.lastIndex());
418 		maybeCommit();
419 	}
420 
421 	void tickElection()
422 	{
423 		_electionElapsed++;
424 
425 		if( promotable() && pastElectionTimeout())
426 		{
427 			_electionElapsed = 0;
428 			Message msg = {From:_id , Type:MessageType.MsgHup};
429 			Step(msg);
430 		}
431 	}
432 
433 	void tickHeartbeat()
434 	{
435 		_heartbeatElapsed++;
436 		_electionElapsed++;
437 
438 		if(_electionElapsed >= _electionTimeout)
439 		{
440 			_electionElapsed = 0;
441 			if( _checkQuorum)
442 			{
443 				Message msg = {From:_id , Type:MessageType.MsgCheckQuorum};
444 				Step(msg);
445 			}
446 
447 			if(_state == StateType.StateLeader && _leadTransferee != None)
448 			{
449 				abortLeaderTransfer();
450 			}
451 		}
452 
453 		if(_state != StateType.StateLeader)
454 			return;
455 
456 		if(_heartbeatElapsed >= _heartbeatTimeout)
457 		{
458 			_heartbeatElapsed = 0;
459 			Message msg = {From:_id , Type:MessageType.MsgBeat};
460 			Step(msg);
461 		}
462 	}
463 
464 	void becomeFollower(ulong term , ulong lead)
465 	{
466 		_step = &stepFollower;
467 		reset(term);
468 		_tick = &tickElection;
469 		_lead = lead;
470 		_state = StateType.StateFollower;
471 		logInfo(format("%x became follower at term %d", _id, _Term));
472 	}
473 
474 	void becomeCandidate()
475 	{
476 		if(_state == StateType.StateLeader)
477 		{
478 			logError("invalid transition [leader -> candidate]");
479 		}
480 
481 		_step = &stepCandidate;
482 		reset(_Term + 1);
483 		_tick = &tickElection;
484 		_Vote = _id;
485 		_state = StateType.StateCandidate;
486 		logInfo(format("%x became candidate at term %d", _id, _Term));
487 	}
488 
489 	void becomeLeader()
490 	{
491 		if(_state == StateType.StateFollower)
492 		{
493 			logError("invalid transition [follower -> leader]");
494 		}
495 
496 		_step = &stepLeader;
497 		reset(_Term);
498 		_tick = &tickHeartbeat;
499 		_lead = _id;
500 		_state = StateType.StateLeader;
501 		Entry[] ents;
502 		auto err = _raftLog.entries(_raftLog._committed + 1, noLimit , ents);
503 		if( err != ErrNil)
504 		{
505 			logError("unexpected error getting uncommitted entries " , err);
506 		}
507 
508 		auto nconf = numOfPendingConf(ents);
509 		if( nconf > 1)
510 		{
511 			logError("unexpected multiple uncommitted config entry");
512 		}
513 
514 		if(nconf == 1)
515 			_pendingConf = true;
516 
517 
518 		Entry[1] ent;
519 		appendEntry(ent);
520 		logInfo(format("%x became leader at term %d", _id, _Term));
521 	}
522 
523 	void campaign(CampaignType t)
524 	{
525 		ulong term;
526 		MessageType voteMsg;
527 		if(t == campaignPreElection)
528 		{
529 			becomeCandidate();
530 			voteMsg = MessageType.MsgPreVote;
531 			term = _Term + 1;
532 		}
533 		else 
534 		{
535 			becomeCandidate();
536 			voteMsg = MessageType.MsgVote;
537 			term = _Term;
538 		}
539 
540 		if(quorum() == poll(_id , voteRespMsgType(voteMsg) , true))
541 		{
542 			if( t == campaignPreElection)
543 			{
544 				campaign(campaignElection);
545 			}
546 			else
547 			{
548 				becomeLeader();
549 			}
550 			return;
551 		}
552 
553 		foreach( id ; _prs.keys)
554 		{
555 			if(id == _id)
556 				continue;
557 
558 			logInfo(format("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
559 					_id, _raftLog.lastTerm(), _raftLog.lastIndex(), voteMsg, id, _Term));
560 
561 			string ctx;
562 			if(t == campaignTransfer)
563 				ctx = t;
564 
565 			Message msg = {Term : term , To : id , Type : voteMsg , Index:_raftLog.lastIndex(),
566 			LogTerm:_raftLog.lastTerm() , Context:ctx};
567 			send(msg);
568 		}
569 	}
570 
571 	int poll(ulong id , MessageType t , bool v)
572 	{
573 		if(v)
574 		{
575 			logInfo(format("%x received %s from %x at term %d", _id, t, id, _Term));
576 		}
577 		else
578 		{
579 			logInfo(format("%x received %s rejection from %x at term %d", _id, t, id, _Term));
580 		}
581 
582 		auto exist = id in _votes;
583 		if( exist == null)
584 			_votes[id] = v;
585 
586 		ulong granted = 0;
587 		foreach( key,value ; _votes)
588 		{
589 			if(value)
590 			{
591 				granted++;
592 			}
593 		}
594 
595 		return cast(int)granted;
596 	}
597 
598 	ErrString Step(Message m)
599 	{
600 		if(m.Term == 0){}
601 		else if(m.Term > _Term)
602 		{
603 			auto lead = m.From;
604 			if( m.Type == MessageType.MsgVote || m.Type == MessageType.MsgPreVote)
605 			{
606 				auto force = (m.Context == campaignTransfer);
607 				auto inLease = _checkQuorum && _lead != None && _electionElapsed < _electionTimeout;
608 				if( !force && inLease)
609 				{
610 					logInfo(format("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
611 							_id, _raftLog.lastTerm(), _raftLog.lastIndex(), _Vote, m.Type, m.From, m.LogTerm, m.Index,
612 							_Term, _electionTimeout-_electionElapsed));
613 				}
614 				lead = None;
615 			}
616 
617 			if(m.Type == MessageType.MsgPreVote){}
618 			else if(m.Type == MessageType.MsgPreVoteResp && !m.Reject){}
619 			else{
620 				logInfo(format("%x [term: %d] received a %s message with higher term from %x [term: %d]",
621 							_id, _Term, m.Type, m.From, m.Term));
622 					becomeFollower(m.Term , lead);
623 			}
624 
625 		}
626 		else if(m.Term < _Term)
627 		{
628 			if(_checkQuorum && (m.Type == MessageType.MsgHeartbeat || m.Type == MessageType.MsgApp))
629 			{
630 				Message msg = {To:m.From , Type:MessageType.MsgAppResp};
631 				send(msg);
632 			}
633 			else
634 			{
635 				logInfo(format("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
636 						_id, _Term, m.Type, m.From, m.Term));
637 			}
638 
639 			return ErrNil;
640 		}
641 
642 		switch(m.Type)
643 		{
644 			case MessageType.MsgHup:
645 				if(_state != StateType.StateLeader)
646 				{
647 					Entry []ents;
648 					auto err = _raftLog.slice(_raftLog._applied + 1 , _raftLog._committed + 1, noLimit , ents);
649 					if( err != ErrNil)
650 					{
651 						logError("unexpected error getting unapplied entries", err);
652 					}
653 
654 					auto n = numOfPendingConf(ents);
655 					if( n != 0 && _raftLog._committed > _raftLog._applied)
656 					{
657 						logWarning(format("%x cannot campaign at term %d since there are still %d pending configuration changes to apply",
658 							_id, _Term, n));
659 						return ErrNil;
660 					}
661 
662 					logInfo(format("%x is starting a new election at term %d", _id, _Term));
663 
664 					if(_preVote)
665 						campaign(campaignPreElection);
666 					else
667 						campaign(campaignElection);
668 
669 
670 				}
671 				else
672 				{
673 					logDebug("%x ignoring MsgHup because already leader", _id);
674 				}
675 			
676 				break;
677 			case MessageType.MsgVote , MessageType.MsgPreVote:
678 				if( (_Vote == None || m.Term > _Term || _Vote == m.From) && _raftLog.isUpToDate(m.Index ,
679 						m.LogTerm))
680 				{
681 					logInfo(format("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
682 							_id, _raftLog.lastTerm(), _raftLog.lastIndex(), _Vote, m.Type, m.From, m.LogTerm, m.Index, _Term));
683 				
684 					Message msg = {To:m.From , Term:m.Term , Type : voteRespMsgType(m.Type)};
685 					send(msg);
686 					if(m.Type == MessageType.MsgVote)
687 					{
688 						_electionElapsed = 0;
689 						_Vote = m.From;
690 					}
691 				}
692 				else
693 				{
694 					logInfo(format("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
695 							_id, _raftLog.lastTerm(), _raftLog.lastIndex(), _Vote, m.Type, m.From, m.LogTerm, m.Index, _Term));
696 				
697 					Message msg = {To: m.From ,
698 					Term : _Term , Type : voteRespMsgType(m.Type) , Reject:true};
699 					send(msg);
700 				}
701 				break;
702 			default:
703 				_step(m);
704 		}
705 
706 		return ErrNil;
707 	}
708 
709 	void stepLeader(Message m)
710 	{
711 		switch( m.Type)
712 		{
713 			case MessageType.MsgBeat:
714 				bcastHeartbeat();
715 				return;
716 			case MessageType.MsgCheckQuorum:
717 				if( !checkQuorumActive())
718 				{
719 					logWarning(_id , " stepped down to follower since quorum is not active");
720 					becomeFollower(_Term , None);
721 				}
722 				return;
723 			case MessageType.MsgProp:
724 				if(m.Entries.length == 0)
725 				{
726 					logError(_id ," stepped empty MsgProp");
727 				}
728 
729 				auto exist = _id in _prs;
730 				if(exist == null)
731 					return;
732 
733 				if(_leadTransferee != None)
734 				{
735 					logDebug(format("%x [term %d] transfer leadership to %x is in progress; dropping proposal",
736 							_id, _Term, _leadTransferee));
737 					return;
738 				}
739 
740 				foreach( i , e ; m.Entries)
741 				{
742 					if(e.Type == EntryType.EntryConfChange)
743 					{
744 						if(_pendingConf)
745 						{
746 							logInfo(format("propose conf %s ignored since pending unapplied configuration", e));
747 							Entry en = {Type : EntryType.EntryNormal};
748 							m.Entries[i] = en;
749 						}
750 						_pendingConf = true;
751 					}
752 				}
753 
754 				appendEntry(m.Entries);
755 				bcastAppend();
756 				return;
757 			case MessageType.MsgReadIndex:
758 				if( quorum() > 1)
759 				{	
760 					ulong term;
761 					auto err = _raftLog.term(_raftLog._committed , term);
762 					if(_raftLog.zeroTermOnErrCompacted(term , err) != _Term)
763 					{
764 						return;
765 					}
766 
767 					switch(_readOnly._option)
768 					{
769 						case ReadOnlyOption.ReadOnlySafe:
770 							_readOnly.addRequest(_raftLog._committed , m);
771 							bcastHeartbeatWithCtx(m.Entries[0].Data);
772 							break;
773 						case ReadOnlyOption.ReadOnlyLeaseBased:
774 						{
775 							ulong ri = 0;
776 							if(_checkQuorum)
777 							{
778 								ri = _raftLog._committed;
779 							}
780 
781 							if(m.From == None || m.From == _id)
782 							{
783 								ReadState rs = {Index:_raftLog._committed , RequestCtx: m.Entries[0].Data};
784 								_readStates ~= rs;
785 							}else
786 							{
787 								Message msg = {To : m.From ,  Type:MessageType.MsgReadIndexResp , Index:ri , Entries:m.Entries};
788 
789 								send(msg);
790 							}
791 						}
792 						break;
793 						default:
794 							assert(0);
795 					}
796 
797 				}
798 				else
799 				{
800 					ReadState rs = {Index:_raftLog._committed , RequestCtx:m.Entries[0].Data};
801 					_readStates ~= rs;
802 				}
803 				return;
804 			default:
805 
806 		}
807 
808 		auto pr = m.From in _prs;
809 		if(pr == null)
810 		{
811 			logDebug(format("%x no progress available for %x", _id, m.From));
812 		}
813 
814 		switch(m.Type)
815 		{
816 			case MessageType.MsgAppResp:
817 				pr._RecentActive = true;
818 
819 				if(m.Reject )
820 				{
821 					logDebug(format("%x received msgApp rejection(lastindex: %d) from %x for index %d",
822 						_id, m.RejectHint, m.From, m.Index));
823 					if(pr.maybeDecrTo(m.Index , m.RejectHint))
824 					{
825 						logDebug(format("%x decreased progress of %x to [%s]", _id, m.From, pr));
826 						if(pr._State == ProgressStateType.ProgressStateReplicate)
827 						{
828 							pr.becomeProbe();
829 						}
830 						sendAppend(m.From);
831 					}
832 				}
833 				else
834 				{
835 					auto oldPaused = pr.IsPaused();
836 					if( pr.maybeUpdate(m.Index))
837 					{
838 						if(pr._State == ProgressStateType.ProgressStateProbe)
839 							pr.becomeReplicate();
840 						else if(pr._State == ProgressStateType.ProgressStateSnapshot &&
841 							pr.needSnapshotAbort())
842 						{
843 							logDebug("%x snapshot aborted, resumed sending replication messages to %x [%s]", _id, m.From, pr);
844 							pr.becomeProbe();
845 						}
846 						else if(pr._State == ProgressStateType.ProgressStateReplicate)
847 							pr._ins.freeTo(m.Index);
848 
849 						if(maybeCommit())
850 						{
851 							bcastAppend();
852 						}
853 						else if(oldPaused)
854 						{
855 							sendAppend(m.From);
856 						}
857 
858 						if(m.From == _leadTransferee && pr._Match == _raftLog.lastIndex())
859 						{
860 							logInfo(format("%x sent MsgTimeoutNow to %x after received MsgAppResp", _id, m.From));
861 							sendTimeoutNow(m.From);
862 						}
863 					}
864 
865 				}
866 				break;
867 			case MessageType.MsgHeartbeatResp:
868 				pr._RecentActive = true;
869 				pr.resume();
870 
871 				if( pr._State == ProgressStateType.ProgressStateReplicate && pr._ins.full())
872 				{
873 					pr._ins.freeFirstOne();
874 				}
875 
876 				if(pr._Match < _raftLog.lastIndex())
877 				{
878 					sendAppend(m.From);
879 				}
880 
881 				if(_readOnly._option != ReadOnlyOption.ReadOnlySafe || m.Context.length == 0)
882 					return ;
883 
884 				auto ackCount = _readOnly.recvAck(m);
885 				if(ackCount < quorum())
886 				{
887 					return;
888 				}
889 
890 				auto rss = _readOnly.advance(m);
891 				foreach( rs ; rss)
892 				{
893 					auto req = rs.req;
894 					if(req.From == None || req.From == _id)
895 					{
896 						ReadState rs0 = { Index:rs.index , RequestCtx:req.Entries[0].Data};
897 						_readStates ~= rs0;
898 					}
899 					else
900 					{
901 						Message msg = {To : req.From , Type : MessageType.MsgReadIndexResp ,
902 						Index:rs.index ,Entries:req.Entries};
903 						send(msg);
904 					}
905 				}
906 				break;
907 			case MessageType.MsgSnapStatus:
908 				if( pr._State != ProgressStateType.ProgressStateSnapshot)
909 					return;
910 
911 				if (!m.Reject)
912 				{
913 					pr.becomeProbe();
914 					logDebug(format("%x snapshot succeeded, resumed sending replication messages to %x [%s]",
915 							_id, m.From, pr));
916 				}
917 				else
918 				{
919 					pr.snapshotFailure();
920 					pr.becomeProbe();
921 					logDebug(format("%x snapshot failed, resumed sending replication messages to %x [%s]", _id, m.From, pr));
922 				}
923 
924 				pr.pause();
925 				break;
926 			case MessageType.MsgUnreachable:
927 				if( pr._State == ProgressStateType.ProgressStateReplicate)
928 				{
929 					pr.becomeProbe();
930 				}
931 				logDebug(format("%x failed to send message to %x because it is unreachable [%s]", 
932 						_id, m.From, pr));
933 				break;
934 			case MessageType.MsgTransferLeader:
935 				auto leadTransferee = m.From;
936 				auto lastLeadTransferee = _leadTransferee;
937 				if( lastLeadTransferee != None)
938 				{
939 					if( lastLeadTransferee == leadTransferee)
940 					{
941 						logInfo(format("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
942 								_id, _Term, leadTransferee, leadTransferee));
943 						return;
944 					}
945 					abortLeaderTransfer();
946 					logInfo(format("%x [term %d] abort previous transferring leadership to %x", _id, _Term, lastLeadTransferee));
947 				}
948 
949 				if(leadTransferee == _id)
950 				{
951 					logDebug(_id , " is already leader. Ignored transferring leadership to self");
952 					return ;
953 				}
954 
955 				logInfo(format("%x [term %d] starts to transfer leadership to %x", _id, _Term, leadTransferee));
956 				_electionElapsed = 0;
957 				_leadTransferee = leadTransferee;
958 				if(pr._Match == _raftLog.lastIndex())
959 				{
960 					sendTimeoutNow(leadTransferee);
961 					logInfo("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log",
962 						_id, leadTransferee, leadTransferee);
963 				}
964 				else
965 				{
966 					sendAppend(leadTransferee);
967 				}
968 				break;
969 			default:
970 			//	logInfo(m.Type);
971 		}
972 	}
973 
974 	void stepCandidate(Message m)
975 	{
976 		MessageType myVoteRespType;
977 
978 		if(_state == StateType.StatePreCandidate)
979 			myVoteRespType = MessageType.MsgPreVoteResp;
980 		else
981 			myVoteRespType = MessageType.MsgVoteResp;
982 
983 		switch(m.Type)
984 		{
985 			case MessageType.MsgProp:
986 				logInfo("%x no leader at term %d; dropping proposal", _id, _Term);
987 				return;
988 			case MessageType.MsgApp:
989 				becomeFollower(_Term , m.From);
990 				handleAppendEntries(m);
991 				break;
992 			case MessageType.MsgHeartbeat:
993 				becomeFollower(_Term , m.From);
994 				handleHeartbeat(m);
995 				break;
996 			case MessageType.MsgSnap:
997 				becomeFollower(_Term , m.From);
998 				handleSnapshot(m);
999 				break;
1000 			case MessageType.MsgTimeoutNow:
1001 				logDebug(format("%x [term %d state %s] ignored MsgTimeoutNow from %x", _id, _Term, _state, m.From));
1002 				break;
1003 			default:
1004 		}
1005 
1006 		if( myVoteRespType == m.Type)
1007 		{
1008 			auto gr = poll(m.From , m.Type , !m.Reject);
1009 			logInfo(format("%x [quorum:%d] has received %d %s votes and %d vote rejections",
1010 					_id, quorum(), gr, m.Type, _votes.length-gr));
1011 			auto qr = quorum();
1012 		
1013 			if( gr == qr)
1014 			{		
1015 				if( _state == StateType.StatePreCandidate)
1016 				{
1017 					campaign(campaignElection);
1018 				}
1019 				else
1020 				{
1021 					becomeLeader();
1022 					bcastAppend();
1023 				}
1024 			}
1025 			else if( _votes.length - gr == qr)
1026 			{
1027 				becomeFollower(_Term , None);
1028 			}
1029 		}
1030 	}	
1031 
1032 	void stepFollower(Message m)
1033 	{
1034 		switch( m.Type)
1035 		{
1036 			case MessageType.MsgProp:
1037 				if(_lead == None)
1038 				{
1039 					logInfo(format("%x no leader at term %d; dropping proposal", _id, _Term));
1040 					return;
1041 				}
1042 				else if(_disProForw)
1043 				{
1044 					logInfo(format("%x not forwarding to leader %x at term %d; dropping proposal", _id, _lead, _Term));
1045 				}
1046 
1047 				m.To = _lead;
1048 				send(m);
1049 				break;
1050 			case MessageType.MsgApp:
1051 				_electionElapsed = 0;
1052 				_lead = m.From;
1053 				handleAppendEntries(m);
1054 				break;
1055 			case MessageType.MsgHeartbeat:
1056 				_electionElapsed = 0;
1057 				_lead = m.From;
1058 				handleHeartbeat(m);
1059 				break;
1060 			case MessageType.MsgSnap:
1061 				_electionElapsed = 0;
1062 				_lead = m.From;
1063 				handleSnapshot(m);
1064 				break;
1065 			case MessageType.MsgTransferLeader:
1066 				if (_lead == None) {
1067 					logInfo(format("%x no leader at term %d; dropping leader transfer msg",
1068 							_id, _Term));
1069 					return;
1070 				}
1071 				m.To = _lead;
1072 				send(m);
1073 				break;
1074 			case MessageType.MsgTimeoutNow:
1075 				if (promotable()) {
1076 					logInfo(format("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.",
1077 							_id, _Term, m.From));
1078 
1079 					campaign(campaignTransfer);
1080 				}
1081 				else {
1082 
1083 					logInfo(format("%x received MsgTimeoutNow from %x but is not promotable", 
1084 							_id, m.From));
1085 				}
1086 				break;
1087 			case MessageType.MsgReadIndex:
1088 				if (_lead == None) {
1089 					logInfo(format("%x no leader at term %d; dropping index reading msg",
1090 							_id, _Term));
1091 					return;
1092 				}
1093 				m.To = _lead;
1094 				send(m);
1095 				break;
1096 
1097 			case MessageType.MsgReadIndexResp:
1098 				if (m.Entries.length != 1) {
1099 					logError(format("%x invalid format of MsgReadIndexResp from %x, entries count: %d",
1100 									_id, m.From, m.Entries.length));
1101 					return;
1102 				}
1103 				ReadState rs = { Index:m.Index , RequestCtx:m.Entries[0].Data};
1104 				_readStates ~= rs;
1105 				break;
1106 			default:
1107 
1108 		}
1109 	}
1110 
1111 
1112 	void handleAppendEntries(Message m) {
1113 		if (m.Index < _raftLog._committed) {
1114 		
1115 			Message msg = {To: m.From, Type: MessageType.MsgAppResp, Index: _raftLog._committed};
1116 			send(msg);
1117 			return;
1118 		}
1119 
1120 		ulong mlastIndex;
1121 		auto ok = _raftLog.maybeAppend(m.Index , m.LogTerm , m.Commit , m.Entries , mlastIndex);
1122 
1123 		if(ok){
1124 			Message msg = { To:m.From , Type:MessageType.MsgAppResp , Index:mlastIndex};
1125 			send(msg);
1126 		} 
1127 		else {
1128 			ulong term;
1129 			auto err = _raftLog.term(m.Index , term);
1130 
1131 			logDebug(format("%s %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
1132 					err , _id, _raftLog.zeroTermOnErrCompacted(term , err), m.Index, m.LogTerm, m.Index, m.From));
1133 
1134 			Message msg = {To: m.From, Type: MessageType.MsgAppResp,
1135 			Index: m.Index, Reject: true, RejectHint: _raftLog.lastIndex()};
1136 
1137 			send(msg);
1138 		}
1139 	}
1140 
1141 	void handleHeartbeat(Message m) {
1142 		_raftLog.commitTo(m.Commit);
1143 		Message msg = {To: m.From, Type: MessageType.MsgHeartbeatResp, Context: m.Context};
1144 		send(msg);
1145 	}
1146 	
1147 	void handleSnapshot(Message m) {
1148 		auto sindex= m.snap.Metadata.Index;
1149 		auto sterm = m.snap.Metadata.Term;
1150 		
1151 		if (restore(m.snap)) {
1152 			logInfo(format("%x [commit: %d] restored snapshot [index: %d, term: %d]",
1153 					_id, _raftLog._committed, sindex, sterm));
1154 			Message msg = {To: m.From, Type: MessageType.MsgAppResp, Index: _raftLog.lastIndex()};
1155 			send(msg);
1156 		} else {
1157 			logInfo(format("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
1158 					_id, _raftLog._committed, sindex, sterm));
1159 			Message msg = {To: m.From, Type: MessageType.MsgAppResp, Index: _raftLog._committed};
1160 			send(msg);
1161 		}
1162 	}
1163 
1164 	bool restore(Snapshot s ) {
1165 		if (s.Metadata.Index <= _raftLog._committed ){
1166 			return false;
1167 		}
1168 		if (_raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term)) {
1169 			logInfo(format("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
1170 					_id, _raftLog._committed, _raftLog.lastIndex(), _raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term));
1171 			_raftLog.commitTo(s.Metadata.Index);
1172 			return false;
1173 		}
1174 		
1175 		logInfo(format("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
1176 				_id, _raftLog._committed, _raftLog.lastIndex(), _raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term));
1177 
1178 		_raftLog.restore(s);
1179 
1180 		foreach(  n ;s.Metadata.CS.Nodes) {
1181 					
1182 			ulong match = 0;
1183 			auto next = _raftLog.lastIndex()+1;
1184 			if (n == _id) {
1185 				match = next - 1;
1186 			}
1187 
1188 			setProgress(n, match, next);
1189 			logInfo(format("%x restored progress of %x [%s]", _id, n, _prs[n]));
1190 		}
1191 		return true;
1192 	}
1193 
1194 	bool promotable()  {
1195 		auto exist = _id in _prs;
1196 		return exist != null;
1197 	}
1198 	
1199 	void addNode(ulong id) {
1200 
1201 		_pendingConf = false;
1202 		if( id in _prs)
1203 			return;
1204 		setProgress(id, 0, _raftLog.lastIndex()+1);
1205 		_prs[id]._RecentActive = true;
1206 	}
1207 
1208 
1209 
1210 	void removeNode(ulong id) {
1211 		delProgress(id);
1212 		_pendingConf = false;
1213 
1214 		if (_prs.length == 0 ){
1215 			return;
1216 		}
1217 		
1218 	
1219 		if (maybeCommit() ){
1220 			bcastAppend();
1221 		}
1222 
1223 		if (_state == StateType.StateLeader && _leadTransferee == id) {
1224 			abortLeaderTransfer();
1225 		}
1226 	}
1227 	
1228 	void resetPendingConf() { _pendingConf = false ;}
1229 	
1230 	void setProgress(ulong id, ulong match, ulong next) {
1231 		auto prs = new Progress();
1232 		prs._Next = next;
1233 		prs._Match = match;
1234 		prs._ins = new inflights(_maxInflight);
1235 		_prs[id] = prs;
1236 	}
1237 	
1238 	void delProgress(ulong id ) {
1239 		_prs.remove(id);
1240 	}
1241 	
1242 	void loadState( HardState state) {
1243 		if( state.Commit < _raftLog._committed || state.Commit > _raftLog.lastIndex()) {
1244 
1245 			logError(format("%x state.commit %d is out of range [%d, %d]",
1246 					_id, state.Commit, _raftLog._committed, _raftLog.lastIndex()));
1247 
1248 		}
1249 		_raftLog._committed = state.Commit;
1250 		logWarning(_id , " loadState term " , _Term);
1251 		_Term = state.Term;
1252 		_Vote = state.Vote;
1253 	}
1254 
1255 	bool pastElectionTimeout()  {
1256 		return _electionElapsed >= _randomizedElectionTimeout;
1257 	}
1258 	
1259 	void resetRandomizedElectionTimeout() {
1260 		_randomizedElectionTimeout = _electionTimeout + rand()%_electionTimeout;
1261 		logInfo(_id , " resetRandomizedElectionTimeout " , _randomizedElectionTimeout);
1262 	}
1263 
1264 	bool checkQuorumActive()  {
1265 		int act;
1266 			
1267 		foreach( id ; _prs.keys()) 
1268 		{
1269 			if (id == _id) { // self is always active
1270 				act++;
1271 				continue;
1272 			}
1273 			
1274 			if (_prs[id]._RecentActive) { 
1275 				act++;
1276 			}
1277 			
1278 			_prs[id]._RecentActive = false;
1279 		}
1280 		
1281 		return act >= quorum();
1282 	}
1283 
1284 	void sendTimeoutNow(ulong to) {
1285 
1286 		Message msg = {To: to, Type: MessageType.MsgTimeoutNow};
1287 		send(msg);
1288 	}
1289 	
1290 	void abortLeaderTransfer() {
1291 		_leadTransferee = None;
1292 	}
1293 	
1294 	int numOfPendingConf(Entry[] ents) {
1295 		auto n = 0;
1296 		foreach( i,v;ents) {
1297 			if (ents[i].Type == EntryType.EntryConfChange) {
1298 				n++;
1299 			}
1300 		}
1301 		return n;
1302 	}
1303 }
1304