1 module common.raft.node;
2 
3 
4 import common.wal.api;
5 
6 
7 import hunt.raft;
8 import hunt.logging;
9 import hunt.util.Serialize;
10 import hunt.util.timer;
11 import hunt.net;
12 
13 import std.string;
14 import std.conv;
15 import std.format;
16 
17 
18 enum defaultSnapCount = 10000;
19 enum snapshotCatchUpEntriesN = 10000;
20 
21 
22 class Node
23 {
24 	ulong 				ID;
25 	MemoryStorage 		storage;
26 	RawNode				node;
27 	bool				join;
28 	ulong				lastIndex;
29 	ConfState			confstate;
30 	ulong				snapshotIndex;
31 	ulong				appliedIndex;
32 	DataStorage			store;
33 
34 	this(ulong ID , MemoryStorage storage , RawNode node , DataStorage store ,
35 		ulong lastIndex , ConfState confstate , ulong snapshotIndex , ulong appliedIndex)
36 	{
37 		this.ID = ID;
38 		this.storage = storage;
39 		this.node = node;
40 		this.store = store;
41 		this.lastIndex = lastIndex;
42 		this.confstate = confstate;
43 		this.snapshotIndex = snapshotIndex;
44 		this.appliedIndex = appliedIndex;
45 	}
46 
47 	void publishSnapshot(Snapshot snap)
48 	{
49 		if(IsEmptySnap(snap))
50 			return;
51 		
52 		if(snap.Metadata.Index <= appliedIndex)
53 		{
54 			logError(format("snapshot index [%d] should > progress.appliedIndex [%d] + 1", 
55 					snap.Metadata.Index, appliedIndex));
56 		}
57 		
58 		confstate = snap.Metadata.CS;
59 		snapshotIndex = snap.Metadata.Index;
60 		appliedIndex = snap.Metadata.Index;
61 	}
62 
63 	Entry[] entriesToApply(Entry[] ents)
64 	{
65 		if(ents.length == 0)
66 			return null;
67 		
68 		auto firstIdx = ents[0].Index;
69 		if(firstIdx > appliedIndex + 1)
70 		{
71 			logError(format("first index of committed entry[%d] should <= progress.appliedIndex[%d] 1",
72 					firstIdx, appliedIndex));
73 		}
74 		
75 		if(appliedIndex - firstIdx + 1 < ents.length)
76 			return ents[appliedIndex - firstIdx + 1 .. $];
77 		
78 		return null;
79 	}
80 
81 
82 
83 	void maybeTriggerSnapshot()
84 	{
85 		if(appliedIndex - snapshotIndex <= defaultSnapCount)
86 			return;
87 		
88 		logInfo(format("start snapshot [applied index: %d | last snapshot index: %d]",
89 				appliedIndex, snapshotIndex));
90 		
91 		auto data = store.getSnapData();
92 		Snapshot snap;
93 		auto err = storage.CreateSnapshot(appliedIndex ,&confstate , data , snap);
94 		if(err != ErrNil)
95 		{
96 			logError(err);
97 		}
98 		
99 		store.saveSnap(snap);
100 
101 		long compactIndex = 1;
102 		if(appliedIndex > snapshotCatchUpEntriesN)
103 			compactIndex = appliedIndex - snapshotCatchUpEntriesN;
104 		
105 		storage.Compact(compactIndex);
106 		logInfo("compacted log at index " , compactIndex);
107 		snapshotIndex = appliedIndex;
108 	}
109 
110 	bool publishEntries(Entry[] ents)
111 	{	
112 		for(auto i = 0 ; i < ents.length ;i++)
113 		{
114 			appliedIndex = ents[i].Index;
115 		}
116 		return true;
117 	}
118 
119 	
120 	/*
121 	void send(Message[] msg)
122 	{
123 
124 	}
125 
126 	void addPeer(ulong ID , string data)
127 	{
128 		
129 	}
130 
131 	void delPeer(ulong ID)
132 	{
133 
134 	}
135 
136 
137     void ready()
138 	{
139 		Ready rd = node.ready();
140 		if(!rd.containsUpdates())
141 		{
142 			return;
143 		}
144 		store.save(rd.hs, rd.Entries);
145 		if( !IsEmptySnap(rd.snap))
146 		{
147 			store.saveSnap(rd.snap);
148 			storage.ApplySnapshot(rd.snap);
149 			publishSnapshot(rd.snap);
150 		}
151 		storage.Append(rd.Entries);
152 		send(rd.Messages);
153 		if(!publishEntries(entriesToApply(rd.CommittedEntries)))
154 		{
155 			logWarning("will be stop!");
156 			return;
157 		}
158 
159 		//for readindex
160 		foreach( r ; rd.ReadStates)
161 		{
162 			if( r.Index >= appliedIndex)
163 			{
164 
165 			}
166 		}
167 		
168 		maybeTriggerSnapshot();
169 		node.Advance(rd);
170 		
171 	}*/
172 }
173