1 module app.groupraft;
2 
3 
4 import common.network;
5 import common.raft.node;
6 
7 import hunt.raft;
8 import hunt.logging;
9 import hunt.util.Serialize;
10 import hunt.util.timer;
11 import hunt.net;
12 
13 import std.string;
14 import std.conv;
15 import std.format;
16 
17 import core.thread;
18 import core.sync.mutex;
19 
20 
21 import common.wal.storage;
22 
23 alias Server = common.network.server.Server;
24 alias Client = common.network.client.Client;
25 alias Storage = common.wal.storage.Storage;
26 
27 
28 class ClusterClient
29 {
30 	ulong 				firstID;
31 	string				host;
32 	ushort				port;
33 	string				apihost;
34 	ushort				apiport;
35 }
36 
37 
38 class GroupRaft : MessageReceiver
39 {
40 	this()
41 	{
42 	
43 	}
44 
45 	//node1 [1,2,3]
46 	//node2 [1,2,4]
47 	//node3 [2,3,4]
48 	//node4 [1,2,3]
49 	void addPeer(ulong ID , string data)
50 	{
51 		if(ID in _clients)
52 			return ;
53 		
54 		auto client = new Client(_ID , ID);
55 		string[] hostport = split(data , ":");
56 		client.connect(hostport[0] , to!int(hostport[1]) , (Result!NetSocket result){
57 			if(result.failed()){
58 				
59                 new Thread((){
60                     Thread.sleep(dur!"seconds"(1));
61                     addPeer(ID , data);
62                 }).start();
63                 return;
64 			}
65 			_clients[ID] = client;
66 			logInfo(_ID , " client connected " , hostport[0] , " " , hostport[1]);
67 		});
68 		
69 	}
70 
71 	void start(ulong firstID , ulong[][ulong] regions , ClusterClient[] clients)
72 	{
73 		foreach(c ; clients)
74 		{
75 			//server
76 			if(firstID == c.firstID)
77 			{
78 				_server = new Server!(Base ,MessageReceiver)(firstID , this);
79 				_server.listen(c.host , c.port);
80 				logInfo(firstID ,  " server open " , c.host , " " , c.port);
81 			}
82 			//client
83 			else
84 			{
85 				addPeer(c.firstID , c.host ~ ":" ~ to!string(c.port));
86 			}
87 		}
88 
89 		_ID = firstID;
90 		_mutex = new Mutex();
91 
92 		foreach(k , v ; regions)
93 		{
94 			Config conf = new Config();
95 			auto kvs = new Storage();
96 			auto store = new MemoryStorage();
97 			auto ID = (firstID * 10 + k);
98 			Peer[] peers;
99 			foreach( id ; v)
100 			{
101 				Peer p = {ID: id * 10 +k};
102 				peers ~= p;
103 			}
104 			logInfo(ID ," " , peers , v);
105 
106 			
107 
108 			Snapshot *shot = null;
109 
110 			ConfState confState;
111 			ulong snapshotIndex;
112 			ulong appliedIndex;
113 			ulong lastIndex;
114 			RawNode	node;
115 
116 
117 			HardState hs;
118 			Entry[] ents;
119 			bool exist = kvs.load("snap.log" ~ to!string(ID) , "entry.log" ~ to!string(ID) ,"hs.log" ~ to!string(ID), shot , hs , ents);
120 			if(shot != null)
121 			{
122 				store.ApplySnapshot(*shot);
123 				confState = shot.Metadata.CS;
124 				snapshotIndex = shot.Metadata.Index;
125 				appliedIndex = shot.Metadata.Index;
126 			}
127 
128 			store.setHadrdState(hs);
129 			store.Append(ents);
130 
131 			if(ents.length > 0)
132 			{
133 				lastIndex = ents[$ - 1].Index;
134 			}
135 			conf._ID 		   	= ID;
136 			conf._ElectionTick 	= 10;
137 			conf._HeartbeatTick = 1;
138 			conf._storage		= store;
139 			conf._MaxSizePerMsg = 1024 * 1024;
140 			conf._MaxInflightMsgs = 256;
141 
142 			if(exist)
143 			{
144 				node = new RawNode(conf);
145 			}
146 			else
147 			{
148 				node = new RawNode(conf , peers);
149 			}
150 
151 			_rafts[ID] = new Node(ID , store , node , kvs , lastIndex ,confState , snapshotIndex , appliedIndex);
152 		
153 		}
154 
155 		logInfo(_clients , " " , _rafts);
156 
157 		
158 
159 	        /// ready
160         new Thread((){
161             while(1)
162             {
163                 ready();
164                 Thread.sleep(dur!"msecs"(10));
165             }
166 		}).start();
167 		
168         /// tick
169 		new Thread((){
170 			while(1){
171 				tick();
172 				Thread.sleep(dur!"msecs"(100));
173 			}
174 		}).start();
175 
176         NetUtil.startEventLoop(-1);
177 
178 	}
179 
180 
181 	void tick()
182 	{
183 		_mutex.lock();
184 		foreach( r ; _rafts)
185 		{
186 			r.node.Tick();
187 		}
188 		_mutex.unlock();
189 	}
190 
191 	void ready()
192 	{
193 		_mutex.lock();
194 		foreach(r ; _rafts)
195 		{
196 			Ready rd = r.node.ready();
197 			if(!rd.containsUpdates())
198 			{
199 				continue;
200 			}
201 
202 			r.store.save(rd.hs , rd.Entries);
203 			if(!IsEmptySnap(rd.snap))
204 			{
205 				r.store.saveSnap(rd.snap);
206 				r.storage.ApplySnapshot(rd.snap);
207 				r.publishSnapshot(rd.snap);
208 			}
209 
210 			r.storage.Append(rd.Entries);
211 			send(rd.Messages);
212 			r.publishEntries(r.entriesToApply(rd.CommittedEntries));
213 			r.maybeTriggerSnapshot();
214 			r.node.Advance(rd);
215 		}
216 		_mutex.unlock();
217 	}
218 
219 	void send(Message[] msg)
220 	{
221 		//logInfo(msg);
222 		foreach(m ; msg)
223 		{
224 			ulong ID = m.To / 10;
225 			if(ID in _clients)
226 				_clients[ID].write(m);
227 		}
228 	}
229 
230 	void step(Message msg)
231 	{
232 		_mutex.lock();
233 		_rafts[msg.To].node.Step(msg);
234 		_mutex.unlock();
235 	}
236 
237 
238 
239 	Server!(Base,MessageReceiver)			_server;
240 	MessageTransfer[ulong]					_clients;
241 	Node[ulong]								_rafts;
242     Storage                                 _storage;
243 	Mutex									_mutex;		
244 	ulong									_ID;
245 }
246