1 module hunt.raft.Node;
2 
3 import hunt.raft.Msg;
4 import hunt.raft.Storage;
5 import hunt.raft.Raft;
6 import hunt.raft.Readonly;
7 import hunt.raft.Status;
8 import hunt.raft.Util;
9 
10 import hunt.util.Serialize;
11 import hunt.logging;
12 
13 import std.container;
14 
15 
16 
17 
18 alias Context = Object;
19 alias SnapshotStatus = int;
20 
21 immutable SnapshotStatus SnapshotFinish = 1;
22 immutable SnapshotStatus SnapshotFailure = 2;
23 
24 immutable HardState emptyState;
25 enum ErrStopped = "raft: stopped";
26 
27 struct SoftState{
28 	ulong 		Lead;
29 	StateType 	RaftState;
30 
31 	bool equals(SoftState *b)
32 	{
33 		return Lead == b.Lead && RaftState == b.RaftState;
34 	}
35 }
36 
37 struct Ready
38 {
39 	SoftState* 		softstate;
40 	HardState		hs;
41 
42 	ReadState[]		ReadStates;
43 	Entry[]			Entries;
44 
45 	Snapshot		snap;
46 	Entry[]			CommittedEntries;
47 
48 	Message[]		Messages;
49 
50 	bool			mustSync;
51 
52 
53 	bool containsUpdates()
54 	{
55 		return  softstate != null || !IsEmptyHardState(hs) ||
56 			!IsEmptySnap(snap) || Entries.length > 0 ||
57 			CommittedEntries.length > 0 || Messages.length > 0 ||
58 				ReadStates.length != 0 ;
59 
60 	}
61 
62 
63 	static bool MustSync( HardState prevst , HardState st , int entsnum)
64 	{
65 		return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term;
66 	}
67 	
68 	static Ready newReady(Raft r , SoftState* prevSortSt , HardState prevHardST)
69 	{
70 		Ready rd = {
71 		Entries:			r._raftLog.unstableEntries() ,
72 		CommittedEntries:	r._raftLog.nextEnts() ,
73 		Messages: 			r._msgs};
74 		
75 		auto softSt = r.softState();
76 		if(prevSortSt != null && !softSt.equals(prevSortSt))
77 			rd.softstate = softSt;
78 		
79 		auto hardSt = r.hardState();
80 		if(!isHardStateEqual(hardSt , prevHardST))
81 			rd.hs = hardSt;
82 
83 		if(r._raftLog._unstable._snap != null)
84 			rd.snap = *r._raftLog._unstable._snap;
85 
86 		if(r._readStates.length != 0)
87 			rd.ReadStates = r._readStates;
88 		
89 		rd.mustSync = Ready.MustSync(rd.hs , prevHardST , cast(int)rd.Entries.length);
90 		return rd;
91 	}
92 }
93 
94 bool isHardStateEqual(HardState a ,HardState b)
95 {
96 	return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit;
97 }
98 
99 bool IsEmptyHardState(HardState st)
100 {
101 	return isHardStateEqual(st , emptyState);
102 }
103 
104 bool IsEmptySnap(Snapshot sp)
105 {
106 	return sp.Metadata.Index == 0;
107 }
108 
109 struct Peer
110 {
111 	ulong 		ID;
112 	string 		Context;
113 }
114 
115 
116 
117 
118 version(NO_ORIGIN):
119 
120 interface Node
121 {
122 	void Tick();
123 	ErrString Campaign(Context ctx);
124 	ErrString Propose(Context ctx , string data);
125 	ErrString ProposeConfChange(Context ctx , ConfChange cc);
126 
127 	ErrString Step(Context ctx , Message msg);
128 
129 	DList!Ready ready();
130 
131 	void Advance();
132 	ConfState  ApplyConfChange(ConfChange cc);
133 	void TransferLeadership(Context ctx , ulong lead , ulong transferee);
134 	ErrString ReadIndex(Context ctx , string rctx);
135 	Status status();
136 	void ReportUnreachable(ulong id);
137 	void ReportSnapshot(ulong id , SnapshotStatus status);
138 
139 	void Stop();
140 
141 }
142 
143 
144 
145 
146 Node StartNode(Config c , Peer[] peers)
147 {
148 	auto r = new Raft(c);
149 
150 	r.becomeFollower(1 , None);
151 
152 	foreach( peer ; peers)
153 	{
154 		ConfChange cc = {Type:ConfChangeType.ConfChangeAddNode , NodeID:peer.ID , Context:peer.Context};
155 		byte[] ser = serialize(cc);
156 		Entry[] ents;
157 		Entry e =  {Type:EntryType.EntryConfChange , Term : 1 , Index : r._raftLog.lastIndex() + 1 , Data:cast(string)ser};
158 		ents ~= e;
159 		r._raftLog.append(ents);
160 	}
161 
162 	r._raftLog._committed = r._raftLog.lastIndex();
163 
164 	foreach(peer ; peers)
165 	{
166 		r.addNode(peer.ID);
167 	}
168 
169 	auto n = new node();
170 	// go run.
171 	return n;
172 }
173 
174 Node RestartNode(Config c)
175 {
176 	auto r = new Raft(c);
177 
178 	auto n = new node();
179 
180 	//go run
181 
182 	return n;
183 }
184 
185 
186 class node : Node
187 {
188 	DList!Message 				_propc;
189 	DList!Message 				_recvc;
190 	DList!ConfChange			_confc;
191 	DList!ConfState				_confstatec;
192 	DList!Ready					_readyc;
193 	DList!Object				_advancec;
194 	DList!Object				_tickc;
195 	DList!Object				_done;
196 	DList!Object				_stop;
197 	DList!(DList!Status)		_status;
198 
199 
200 	this()
201 	{
202 
203 	}
204 
205 
206 
207 
208 	//next.
209 	void Stop()
210 	{
211 		while(!_done.empty())
212 			break;
213 	}
214 
215 	void run(Raft r)
216 	{
217 		DList!Message* 	propc ;
218 		DList!Ready*   	readyc;
219 		DList!Object*	advancec;
220 		ulong			prevLastUnstablei, prevLastUnstablet;
221 		bool			havePrevLastUnstablei;
222 		ulong 			prevSnapi;
223 		Ready			rd;
224 
225 		ulong lead = None;
226 		SoftState* prevSoftSt = r.softState();
227 		HardState prevHardSt = emptyState;
228 
229 		while(1)
230 		{
231 			if( advancec != null)
232 			{
233 				readyc = null;
234 			}
235 			else
236 			{
237 				//next.
238 			}
239 
240 			if(lead != r._lead)
241 			{
242 				if(r.hasLeader())
243 				{
244 					if( lead == None)
245 					{
246 						logInfo(format("raft.node: %x elected leader %x at term %d",
247 								r._id, r._lead, r._Term));
248 					}
249 					else
250 					{
251 						logInfo(format("raft.node: %x changed leader from %x to %x at term %d",
252 								r._id, lead, r._lead, r._Term));
253 					}
254 					propc = &_propc;
255 				}
256 				else
257 				{
258 					logInfo("raft.node: %x lost leader %x at term %d" , r._id , lead , r._Term);
259 					propc = null;
260 				}
261 
262 				lead = r._lead;
263 			}
264 
265 			while(1)
266 			{
267 				if(!propc.empty())
268 				{
269 					Message m = propc.front();
270 					m.From = r._id;
271 					r.Step(m);
272 					propc.removeFront();
273 				}
274 				else if(!_recvc.empty())
275 				{
276 					Message m = _recvc.front();
277 					auto ok = m.From in r._prs;
278 					if( ok || !IsResponseMsg(m.Type)){
279 						r.Step(m);
280 					}
281 					_recvc.removeFront();
282 				}
283 				else if(!_confc.empty())
284 				{
285 					ConfChange cc = _confc.front();
286 					if( cc.NodeID == None)
287 					{
288 						r.resetPendingConf();
289 						//next
290 						ConfState cs = { Nodes:r.nodes()};
291 						_confstatec.insertBack(cs);
292 						_confc.removeFront();
293 						break;
294 					}
295 					switch(cc.Type)
296 					{
297 						case ConfChangeType.ConfChangeAddNode:
298 							r.addNode(cc.NodeID);
299 							break;
300 						case ConfChangeType.ConfChangeRemoveNode:
301 							if(cc.NodeID == r._id)
302 								propc = null;
303 							r.removeNode(cc.NodeID);
304 							break;
305 						case ConfChangeType.ConfChangeUpdateNode:
306 							r.resetPendingConf();
307 							break;
308 						default:
309 							logError("unexpected conf type");
310 					}
311 					//next
312 					ConfState cs = {Nodes:r.nodes()};
313 					_confstatec.insertBack(cs);
314 					_confc.removeFront();
315 				}
316 				else if(!_tickc.empty())
317 				{
318 					r._tick();
319 					_tickc.removeFront();
320 				}
321 				else if(readyc !is null)
322 				{
323 					//next
324 					//if(rd.softstate != null)
325 						prevSoftSt = rd.softstate;
326 
327 					if(rd.Entries.length > 0 )
328 					{
329 						prevLastUnstablei = rd.Entries[$ - 1].Index;
330 						prevLastUnstablet = rd.Entries[$ - 1].Term;
331 						havePrevLastUnstablei = true;
332 					}
333 
334 					if( !IsEmptyHardState(rd.hs))
335 					{
336 						prevHardSt = rd.hs;
337 					}
338 
339 					r._msgs = null;
340 					r._readStates = null;
341 					advancec = &_advancec;
342 					readyc.insertBack(rd);
343 				}
344 				else if(!advancec.empty())
345 				{
346 					if(prevHardSt.Commit != 0)
347 						r._raftLog.appliedTo(prevHardSt.Commit);
348 
349 					if(havePrevLastUnstablei)
350 					{
351 						r._raftLog.stableTo(prevLastUnstablei , prevLastUnstablet);
352 						havePrevLastUnstablei = false;
353 					}
354 
355 					r._raftLog.stableSnapTo(prevSnapi);
356 					advancec.removeFront();
357 					advancec = null;
358 				}
359 				else if(!_status.empty())
360 				{
361 					auto c = _status.front();
362 					c.insertBack(getStatus(r));
363 					_status.removeFront();
364 				}
365 				else if(!_stop.empty())
366 				{
367 					//next
368 					//close(_done);
369 				}
370 			}
371 
372 
373 		}
374 	}
375 
376 
377 	void Tick()
378 	{
379 		//next
380 		_tickc.insertBack(new Object);
381 	}
382 
383 	ErrString Campaign(Context ctx)
384 	{
385 		Message msg = {Type : MessageType.MsgHup};
386 		return step(ctx ,msg);
387 	}
388 
389 	ErrString Propose(Context ctx , string data)
390 	{
391 		Message msg = {Type : MessageType.MsgProp , Entries :[{Data : data}]};
392 		return step(ctx , msg);
393 	}
394 
395 	string ProposeConfChange(Context ctx , ConfChange cc)
396 	{
397 		return ErrNil;
398 	}
399 
400 
401 	ErrString Step(Context ctx , Message m)
402 	{
403 		if( IsLocalMsg(m.Type))
404 			return ErrNil;
405 
406 		return step(ctx , m);
407 	}
408 
409 	ErrString step(Context ctx , Message m)
410 	{
411 		//next
412 		if(m.Type == MessageType.MsgProp)
413 		{
414 			_propc.insertBack(m);
415 		}
416 		else
417 		{
418 			_recvc.insertBack(m);
419 		}
420 
421 		return ErrNil;
422 	}
423 
424 	DList!Ready ready()
425 	{
426 		return _readyc;
427 	}
428 
429 	void Advance()
430 	{
431 		_advancec.insertBack(new Object);
432 	}
433 
434 	ConfState ApplyConfChange(ConfChange cc)
435 	{
436 		//next
437 		_confc.insertBack(cc);
438 
439 		while(!_confstatec.empty()){}
440 
441 		ConfState cs = _confstatec.front();
442 
443 		_confstatec.removeBack();
444 
445 
446 		return cs;
447 	}
448 
449 	Status status()
450 	{
451 		//next
452 		//_status.
453 		Status st;
454 
455 		return st;
456 	}
457 
458 	void ReportUnreachable(ulong id)
459 	{
460 
461 	}
462 
463 	void ReportSnapshot(ulong id , SnapshotStatus status)
464 	{
465 
466 	}
467 
468 	void TransferLeadership(Context ctx , ulong lead , ulong transferee)
469 	{
470 
471 	}
472 
473 	ErrString ReadIndex(Context ctx , string rctx)
474 	{
475 
476 		Entry[] ents = [{Data:rctx}];
477 		Message msg = {Type : MessageType.MsgReadIndex , Entries : ents};
478 		step(ctx , msg);
479 		return ErrNil;
480 	}
481 
482 }