1 module app.raft;
2 
3 
4 
5 import common.network;
6 import common.raft.node;
7 
8 import hunt.raft;
9 import hunt.logging;
10 import hunt.util.Serialize;
11 import hunt.util.timer;
12 import hunt.net;
13 
14 import std.string;
15 import std.conv;
16 import std.format;
17 
18 import core.thread;
19 import core.sync.mutex;
20 
21 
22 import app.http;
23 import common.wal.storage;
24 
25 alias Server = common.network.server.Server;
26 alias Client = common.network.client.Client;
27 alias Storage = common.wal.storage.Storage;
28 
29 class Raft : Node,MessageReceiver
30 {
31 	void propose(RequestCommand command , HttpBase h)
32 	{
33          _mutex.lock();
34 		auto err = node.Propose(cast(string)serialize(command));
35 		if( err != ErrNil)
36 		{
37 			logError(err);
38 		}
39 		else
40 		{
41 			_request[command.Hash] = h;
42 		}
43           _mutex.unlock();
44 	}
45 
46 	void readIndex(RequestCommand command , HttpBase h)
47 	{
48         _mutex.lock();
49 		node.ReadIndex(cast(string)serialize(command));
50 		_request[command.Hash] = h;
51         _mutex.unlock();
52 	}
53 
54 	void delPropose(HttpBase h)
55 	{
56         _mutex.lock();
57 		_request.remove(h.toHash);
58         _mutex.unlock();
59 	}
60 
61 	void proposeConfChange(ConfChange cc)
62 	{
63         _mutex.lock();
64 		auto err = node.ProposeConfChange(cc);
65 		if( err != ErrNil)
66 		{
67 			logError(err);
68 		}
69         _mutex.unlock();
70 	}
71 
72 
73 
74 
75 
76 	this(ulong ID ,string apiport , string cluster , bool join)
77 	{
78         auto conf = new Config(); 
79         auto store = new MemoryStorage();
80 
81         _mutex = new Mutex();
82         _storage = new Storage();
83 		Snapshot *shot = null;
84 
85         ConfState confState;
86         ulong snapshotIndex;
87         ulong appliedIndex;
88         ulong lastIndex;
89         RawNode	node;
90 
91 
92         HardState hs;
93         Entry[] ents;
94         bool exist = _storage.load("snap.log" ~ to!string(ID) , "entry.log" ~ to!string(ID) , "hs.log" ~ to!string(ID) , shot , hs , ents);
95         if(shot != null)
96         {
97             store.ApplySnapshot(*shot);
98             confState = shot.Metadata.CS;
99             snapshotIndex = shot.Metadata.Index;
100             appliedIndex = shot.Metadata.Index;
101         }
102 
103         store.setHadrdState(hs);
104         store.Append(ents);
105 
106         if(ents.length > 0)
107         {
108             lastIndex = ents[$ - 1].Index;
109         }
110         conf._ID 		   	= ID;
111         conf._ElectionTick 	= 10;
112         conf._HeartbeatTick = 1;
113         conf._storage		= store;
114         conf._MaxSizePerMsg = 1024 * 1024;
115         conf._MaxInflightMsgs = 256;
116 
117         string[] peerstr = split(cluster , ";");
118 		Peer[] peers;
119 		foreach(i , str ; peerstr)
120 		{
121 			Peer p = {ID:i + 1};
122 			peers ~= p;
123 		}
124 
125     	if(exist)
126 		{
127 			node = new RawNode(conf);
128 		}
129 		else
130 		{
131             /// next solve.
132 			if(join)
133 			{
134 				node = new RawNode(conf , []);
135 			}
136             else
137             {
138                 node = new RawNode(conf , peers);
139             }
140 			    
141 		}
142         super(ID , store , node , _storage , lastIndex ,confState , snapshotIndex , appliedIndex);	
143 		_http = new Server!(HttpBase,Raft)(ID , this);
144 		_http.listen("0.0.0.0" , to!int(apiport));
145 
146 		for(uint i = 0 ; i < peers.length ; i++)
147 		{
148 			if(i + 1 == ID)
149 			{
150 				_server = new Server!(Base,MessageReceiver)(ID , this);
151 				string[] hostport = split(peerstr[i] ,":");
152 				_server.listen(hostport[0] , to!int(hostport[1]));
153 				logInfo(ID , " server open " , hostport[0] , " " , hostport[1]);
154 			}
155             else
156             {
157                 addPeer(i + 1 , peerstr[i]);    
158             }
159 		}
160 
161       
162   
163         
164 
165         /// ready
166         new Thread((){
167             while(1)
168             {
169                 ready();
170                 Thread.sleep(dur!"msecs"(10));
171             }
172 		}).start();
173 		
174         /// tick
175 		new Thread((){
176 			while(1){
177 				tick();
178 				Thread.sleep(dur!"msecs"(100));
179 			}
180 		}).start();
181 
182         NetUtil.startEventLoop(-1);
183 
184 	}
185 
186     void ready()
187 	{
188         _mutex.lock();
189         scope(exit){
190             _mutex.unlock();
191         }
192 		Ready rd = node.ready();
193 		if(!rd.containsUpdates())
194 		{
195 			return;
196 		}
197 		_storage.save(rd.hs, rd.Entries);
198 		if( !IsEmptySnap(rd.snap))
199 		{
200 			_storage.saveSnap(rd.snap);
201 			storage.ApplySnapshot(rd.snap);
202 			publishSnapshot(rd.snap);
203 		}
204 		storage.Append(rd.Entries);
205 		send(rd.Messages);
206 		if(!publishEntries(entriesToApply(rd.CommittedEntries)))
207 		{
208 			logError("will be stop!");
209 			return;
210 		}
211 
212 		
213 		foreach( r ; rd.ReadStates)
214 		{
215 			if( r.Index >= appliedIndex)
216 			{
217 				RequestCommand command =  unserialize!RequestCommand(cast(byte[])r.RequestCtx);
218 				auto h =  command.Hash in _request;
219 				if(h == null){
220 					continue;
221 				}
222 				string value;
223 				if(command.Method == RequestMethod.METHOD_GET)
224 				{	
225 					value = _storage.Lookup(command.Key);
226 					h.do_response(value ~ "action done");
227 					h.close();
228 				}
229 			}
230 		}
231 		
232 		maybeTriggerSnapshot();
233 		node.Advance(rd);
234 		
235 	}
236 
237     bool publishEntries(Entry[] ents)
238 	{
239 		for(auto i = 0 ; i < ents.length ;i++)
240 		{
241 			switch(ents[i].Type)
242 			{
243 				case EntryType.EntryNormal:
244 					if(ents[i].Data.length == 0)
245 						break;
246                     
247                     logError(ents[i].Data.length);
248 					RequestCommand command = unserialize!RequestCommand(cast(byte[])ents[i].Data);
249 					
250 					string value;
251 					if(command.Method == RequestMethod.METHOD_GET)
252 						value = _storage.Lookup(command.Key);
253 					else
254 						_storage.SetValue(command.Key , command.Value);
255 						
256 					auto http = (command.Hash in _request);
257 					if(http != null)
258 					{
259 						http.do_response(value ~ " action done");
260 						http.close();
261 					}
262 
263 
264 
265 					break;
266 					//next
267 				case EntryType.EntryConfChange:
268 					ConfChange cc = unserialize!ConfChange(cast(byte[])ents[i].Data);
269 					confstate = node.ApplyConfChange(cc);
270 					switch(cc.Type)
271 					{
272 						case ConfChangeType.ConfChangeAddNode:
273                             if(cc.NodeID == ID)
274                             {
275                                 break;
276                             }
277 
278 							if( cc.Context.length > 0)
279 							{
280 								addPeer(cc.NodeID , cc.Context);
281 								logInfo("add " , cc.NodeID);
282 							}
283 							break;
284 						case ConfChangeType.ConfChangeRemoveNode:
285 							if(cc.NodeID == ID)
286 							{
287 								logWarning(ID , " I've been removed from the cluster! Shutting down.");
288 								return false;
289 							}
290 							logWarning(ID , " del node " , cc.NodeID);
291 							delPeer(cc.NodeID);
292 							break;
293 						default:
294 							break;
295 					}
296 					break;
297 				default:
298 
299 			}
300 
301 			appliedIndex = ents[i].Index;
302 
303 		}
304 		return true;
305 	}
306 
307 
308     void addPeer(ulong ID , string data)
309 	{
310 		if(ID in _clients)
311 			return ;
312 		
313 		auto client = new Client(this.ID , ID);
314 		string[] hostport = split(data , ":");
315 		client.connect(hostport[0] , to!int(hostport[1]) , (Result!NetSocket result){
316 			if(result.failed()){
317 				
318                 new Thread((){
319                     Thread.sleep(dur!"seconds"(1));
320                     addPeer(ID , data);
321                 }).start();
322                 return;
323 			}
324 			_clients[ID] = client;
325 			logInfo(this.ID , " client connected " , hostport[0] , " " , hostport[1]);
326 		});
327 		
328 	}
329     
330     void delPeer(ulong ID)
331 	{
332 		if(ID !in _clients)
333 			return ;
334 
335 		logInfo(this.ID , " client disconnect " , ID);
336 		_clients[ID].close();
337 		_clients.remove(ID);
338 		
339 		return ;
340 	}
341 
342 
343     void send(Message[] msg)
344     {
345         foreach(m ; msg)
346             if(m.To in _clients)
347 			    _clients[m.To].write(m);
348     }
349 
350     void tick()
351     {
352         _mutex.lock();
353         node.Tick();
354         _mutex.unlock();
355     }
356   
357 
358     void step(Message msg)
359     {
360          _mutex.lock();
361         node.Step(msg);
362         _mutex.unlock();
363     }
364 
365 
366 	Server!(Base,MessageReceiver)			_server;
367 	Server!(HttpBase,Raft)					_http;
368 	MessageTransfer[ulong]					_clients;
369 	
370     Storage                                 _storage;
371 	Mutex									_mutex;									
372 	HttpBase[ulong]							_request;
373 }
374