1 module hunt.raft.Rawnode;
2 
3 import hunt.raft.Raft;
4 import hunt.raft.Node;
5 import hunt.raft.Msg;
6 import hunt.raft.Log;
7 import hunt.raft.Status;
8 import hunt.raft.Storage;
9 import hunt.raft.Util;
10 
11 import hunt.logging;
12 import hunt.util.Serialize;
13 
14 enum ErrStepLocalMsg = "raft: cannot step raft local message";
15 enum ErrStepPeerNotFound = "raft: cannot step as peer not found";
16 
17 class RawNode
18 {
19 	Raft		_raft;
20 	SoftState*	_preSoftSt;
21 	HardState	_prevHardSt;
22 
23 
24 	Ready newReady()
25 	{
26 		return Ready.newReady(_raft , _preSoftSt , _prevHardSt);
27 	}
28 
29 
30 	bool isLeader()
31 	{
32 		return _raft._lead == _raft._id;
33 	}
34 
35 	void commitReady(Ready rd)
36 	{
37 		 if(rd.softstate != null)
38 		 {
39 			_preSoftSt = rd.softstate;
40 		 }
41 
42 		if (!IsEmptyHardState(rd.hs))
43 		{
44 			_prevHardSt = rd.hs;
45 		}
46 
47 		if(_prevHardSt.Commit != 0)
48 		{
49 			_raft._raftLog.appliedTo(_prevHardSt.Commit);
50 		}
51 
52 		if( rd.Entries.length > 0)
53 		{
54 			auto e = rd.Entries[$ - 1];
55 			_raft._raftLog.stableTo(e.Index , e.Term);
56 		}
57 
58 		if(!IsEmptySnap(rd.snap))
59 		{
60 			_raft._raftLog.stableSnapTo(rd.snap.Metadata.Index);
61 		}
62 
63 		if(rd.ReadStates != null)
64 		{
65 			_raft._readStates = null;
66 		}
67 	}
68 
69 
70 	this(Config c)
71 	{
72 		_raft = new Raft(c);
73 	}
74 
75 
76 
77 	this(Config c , Peer[] peers)
78 	{
79 		if( c._ID == 0)
80 		{
81 			logError("config.ID must not be zero");
82 		}
83 		auto r = new Raft(c);
84 
85 		ulong lastIndex;
86 		auto err = c._storage.LastIndex(lastIndex);
87 		if( err != ErrNil)
88 		{
89 			logError(err);
90 		}
91 
92 		if( lastIndex == 0)
93 		{
94 			r.becomeFollower(1 , None);
95 			Entry[] ents;
96 			foreach( i , peer ; peers )
97 			{
98 				ConfChange cc = { Type:ConfChangeType.ConfChangeAddNode ,
99 				NodeID : peer.ID , Context: peer.Context};
100 				auto data = cast(string)serialize(cc);
101 				Entry ent = {Type:EntryType.EntryConfChange , Term:1 , Index:i + 1 ,
102 				Data:data};
103 				ents ~= ent;
104 			}
105 
106 			r._raftLog.append(ents);
107 			r._raftLog._committed = ents.length;
108 			foreach( peer ; peers)
109 			{
110 				r.addNode(peer.ID);
111 			}
112 		}
113 
114 		_preSoftSt = r.softState();
115 		if( lastIndex == 0)
116 			_prevHardSt = emptyState;
117 		else
118 			_prevHardSt = r.hardState();
119 
120 		_raft = r;
121 	}
122 
123 	void Tick()
124 	{
125 		_raft._tick();
126 	}
127 
128 	void TickQuiesced()
129 	{
130 		_raft._electionElapsed++;
131 	}
132 
133 	ErrString Campaign()
134 	{
135 		Message msg = { Type : MessageType.MsgHup  };
136 		return _raft.Step(msg);
137 
138 	}
139 
140 	ErrString Propose(string data)
141 	{
142 		Message msg = {Type : MessageType.MsgProp , From : _raft._id ,
143 		Entries :[{Data : data}]};
144 
145 		return _raft.Step(msg);
146 	}
147 
148 	ErrString ProposeConfChange(ConfChange cc)
149 	{
150 		auto data = cast(string)serialize(cc);
151 
152 		Message msg = {Type : MessageType.MsgProp , Entries:
153 		[{Type : EntryType.EntryConfChange , Data : data}]};
154 
155 		return _raft.Step(msg);
156 	}
157 
158 	ConfState ApplyConfChange(ConfChange cc)
159 	{
160 		if(cc.NodeID == None)
161 		{
162 			_raft.resetPendingConf();
163 			ConfState cs = {Nodes:_raft.nodes()};
164 			return cs;
165 		}
166 
167 		switch(cc.Type)
168 		{
169 			case ConfChangeType.ConfChangeAddNode:
170 				_raft.addNode(cc.NodeID);
171 				break;
172 			case ConfChangeType.ConfChangeRemoveNode:
173 				_raft.removeNode(cc.NodeID);
174 				break;
175 			case ConfChangeType.ConfChangeUpdateNode:
176 				_raft.resetPendingConf();
177 				break;
178 			default:
179 				logError("unexpected conf type");
180 		}
181 
182 		ConfState cs = {Nodes : _raft.nodes()};
183 		return cs;
184 	}
185 
186 	ErrString Step(Message m)
187 	{
188 		if(  IsLocalMsg(m.Type))
189 			return ErrStepLocalMsg;
190 
191 		auto prs = m.From in _raft._prs;
192 
193 		if(prs != null || !IsResponseMsg(m.Type))
194 		{
195 			return _raft.Step(m);
196 		}
197 
198 		return ErrStepPeerNotFound;
199 	}
200 
201 	Ready ready()
202 	{
203 		auto rd = newReady();
204 		_raft._msgs = null;
205 		return rd;
206 	}
207 
208 	bool HasReady()
209 	{
210 		if(_raft.softState().equals(_preSoftSt))
211 			return true;
212 
213 		auto hardSt = _raft.hardState();
214 
215 		if(!IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt , _prevHardSt))
216 			return true;
217 
218 		if(_raft._raftLog._unstable._snap != null && !IsEmptySnap(*_raft._raftLog._unstable._snap))
219 			return true;
220 	
221 		if(_raft._readStates.length != 0)
222 			return true;
223 
224 		return false;
225 	}
226 
227 
228 	void Advance(Ready rd)
229 	{
230 		commitReady(rd);
231 	}
232 
233 	Status status()
234 	{
235 		return getStatus(_raft);
236 	}
237 
238 	void ReportUnreachable(ulong id)
239 	{
240 		Message msg = {Type : MessageType.MsgUnreachable , From : id};
241 		_raft.Step(msg);
242 	}
243 
244 	void ReportSnapshot(ulong id , SnapshotStatus status)
245 	{
246 		auto rej = (status == SnapshotFailure);
247 
248 		Message msg = {Type : MessageType.MsgSnapStatus , From : id , Reject : rej};
249 
250 		_raft.Step(msg);
251 	}
252 
253 	void TransferLeader(ulong transferee)
254 	{
255 		Message msg = {Type:MessageType.MsgTransferLeader , From : transferee};
256 		_raft.Step(msg);
257 	}
258 
259 	void ReadIndex(string rctx)
260 	{
261 		Message msg = {Type : MessageType.MsgReadIndex ,  Entries :[{Data : rctx}]};
262 		_raft.Step(msg);
263 	}
264 }
265 
266 
267