1 module app.raft; 2 3 4 5 import common.network; 6 import common.raft.node; 7 8 import hunt.raft; 9 import hunt.logging; 10 import hunt.util.Serialize; 11 import hunt.util.timer; 12 import hunt.net; 13 14 import std.string; 15 import std.conv; 16 import std.format; 17 18 import core.thread; 19 import core.sync.mutex; 20 21 22 import app.http; 23 import common.wal.storage; 24 25 alias Server = common.network.server.Server; 26 alias Client = common.network.client.Client; 27 alias Storage = common.wal.storage.Storage; 28 29 class Raft : Node,MessageReceiver 30 { 31 void propose(RequestCommand command , HttpBase h) 32 { 33 _mutex.lock(); 34 auto err = node.Propose(cast(string)serialize(command)); 35 if( err != ErrNil) 36 { 37 logError(err); 38 } 39 else 40 { 41 _request[command.Hash] = h; 42 } 43 _mutex.unlock(); 44 } 45 46 void readIndex(RequestCommand command , HttpBase h) 47 { 48 _mutex.lock(); 49 node.ReadIndex(cast(string)serialize(command)); 50 _request[command.Hash] = h; 51 _mutex.unlock(); 52 } 53 54 void delPropose(HttpBase h) 55 { 56 _mutex.lock(); 57 _request.remove(h.toHash); 58 _mutex.unlock(); 59 } 60 61 void proposeConfChange(ConfChange cc) 62 { 63 _mutex.lock(); 64 auto err = node.ProposeConfChange(cc); 65 if( err != ErrNil) 66 { 67 logError(err); 68 } 69 _mutex.unlock(); 70 } 71 72 73 74 75 76 this(ulong ID ,string apiport , string cluster , bool join) 77 { 78 auto conf = new Config(); 79 auto store = new MemoryStorage(); 80 81 _mutex = new Mutex(); 82 _storage = new Storage(); 83 Snapshot *shot = null; 84 85 ConfState confState; 86 ulong snapshotIndex; 87 ulong appliedIndex; 88 ulong lastIndex; 89 RawNode node; 90 91 92 HardState hs; 93 Entry[] ents; 94 bool exist = _storage.load("snap.log" ~ to!string(ID) , "entry.log" ~ to!string(ID) , "hs.log" ~ to!string(ID) , shot , hs , ents); 95 if(shot != null) 96 { 97 store.ApplySnapshot(*shot); 98 confState = shot.Metadata.CS; 99 snapshotIndex = shot.Metadata.Index; 100 appliedIndex = shot.Metadata.Index; 101 } 102 103 store.setHadrdState(hs); 104 store.Append(ents); 105 106 if(ents.length > 0) 107 { 108 lastIndex = ents[$ - 1].Index; 109 } 110 conf._ID = ID; 111 conf._ElectionTick = 10; 112 conf._HeartbeatTick = 1; 113 conf._storage = store; 114 conf._MaxSizePerMsg = 1024 * 1024; 115 conf._MaxInflightMsgs = 256; 116 117 string[] peerstr = split(cluster , ";"); 118 Peer[] peers; 119 foreach(i , str ; peerstr) 120 { 121 Peer p = {ID:i + 1}; 122 peers ~= p; 123 } 124 125 if(exist) 126 { 127 node = new RawNode(conf); 128 } 129 else 130 { 131 /// next solve. 132 if(join) 133 { 134 node = new RawNode(conf , []); 135 } 136 else 137 { 138 node = new RawNode(conf , peers); 139 } 140 141 } 142 super(ID , store , node , _storage , lastIndex ,confState , snapshotIndex , appliedIndex); 143 _http = new Server!(HttpBase,Raft)(ID , this); 144 _http.listen("0.0.0.0" , to!int(apiport)); 145 146 for(uint i = 0 ; i < peers.length ; i++) 147 { 148 if(i + 1 == ID) 149 { 150 _server = new Server!(Base,MessageReceiver)(ID , this); 151 string[] hostport = split(peerstr[i] ,":"); 152 _server.listen(hostport[0] , to!int(hostport[1])); 153 logInfo(ID , " server open " , hostport[0] , " " , hostport[1]); 154 } 155 else 156 { 157 addPeer(i + 1 , peerstr[i]); 158 } 159 } 160 161 162 163 164 165 /// ready 166 new Thread((){ 167 while(1) 168 { 169 ready(); 170 Thread.sleep(dur!"msecs"(10)); 171 } 172 }).start(); 173 174 /// tick 175 new Thread((){ 176 while(1){ 177 tick(); 178 Thread.sleep(dur!"msecs"(100)); 179 } 180 }).start(); 181 182 NetUtil.startEventLoop(-1); 183 184 } 185 186 void ready() 187 { 188 _mutex.lock(); 189 scope(exit){ 190 _mutex.unlock(); 191 } 192 Ready rd = node.ready(); 193 if(!rd.containsUpdates()) 194 { 195 return; 196 } 197 _storage.save(rd.hs, rd.Entries); 198 if( !IsEmptySnap(rd.snap)) 199 { 200 _storage.saveSnap(rd.snap); 201 storage.ApplySnapshot(rd.snap); 202 publishSnapshot(rd.snap); 203 } 204 storage.Append(rd.Entries); 205 send(rd.Messages); 206 if(!publishEntries(entriesToApply(rd.CommittedEntries))) 207 { 208 logError("will be stop!"); 209 return; 210 } 211 212 213 foreach( r ; rd.ReadStates) 214 { 215 if( r.Index >= appliedIndex) 216 { 217 RequestCommand command = unserialize!RequestCommand(cast(byte[])r.RequestCtx); 218 auto h = command.Hash in _request; 219 if(h == null){ 220 continue; 221 } 222 string value; 223 if(command.Method == RequestMethod.METHOD_GET) 224 { 225 value = _storage.Lookup(command.Key); 226 h.do_response(value ~ "action done"); 227 h.close(); 228 } 229 } 230 } 231 232 maybeTriggerSnapshot(); 233 node.Advance(rd); 234 235 } 236 237 bool publishEntries(Entry[] ents) 238 { 239 for(auto i = 0 ; i < ents.length ;i++) 240 { 241 switch(ents[i].Type) 242 { 243 case EntryType.EntryNormal: 244 if(ents[i].Data.length == 0) 245 break; 246 247 logError(ents[i].Data.length); 248 RequestCommand command = unserialize!RequestCommand(cast(byte[])ents[i].Data); 249 250 string value; 251 if(command.Method == RequestMethod.METHOD_GET) 252 value = _storage.Lookup(command.Key); 253 else 254 _storage.SetValue(command.Key , command.Value); 255 256 auto http = (command.Hash in _request); 257 if(http != null) 258 { 259 http.do_response(value ~ " action done"); 260 http.close(); 261 } 262 263 264 265 break; 266 //next 267 case EntryType.EntryConfChange: 268 ConfChange cc = unserialize!ConfChange(cast(byte[])ents[i].Data); 269 confstate = node.ApplyConfChange(cc); 270 switch(cc.Type) 271 { 272 case ConfChangeType.ConfChangeAddNode: 273 if(cc.NodeID == ID) 274 { 275 break; 276 } 277 278 if( cc.Context.length > 0) 279 { 280 addPeer(cc.NodeID , cc.Context); 281 logInfo("add " , cc.NodeID); 282 } 283 break; 284 case ConfChangeType.ConfChangeRemoveNode: 285 if(cc.NodeID == ID) 286 { 287 logWarning(ID , " I've been removed from the cluster! Shutting down."); 288 return false; 289 } 290 logWarning(ID , " del node " , cc.NodeID); 291 delPeer(cc.NodeID); 292 break; 293 default: 294 break; 295 } 296 break; 297 default: 298 299 } 300 301 appliedIndex = ents[i].Index; 302 303 } 304 return true; 305 } 306 307 308 void addPeer(ulong ID , string data) 309 { 310 if(ID in _clients) 311 return ; 312 313 auto client = new Client(this.ID , ID); 314 string[] hostport = split(data , ":"); 315 client.connect(hostport[0] , to!int(hostport[1]) , (Result!NetSocket result){ 316 if(result.failed()){ 317 318 new Thread((){ 319 Thread.sleep(dur!"seconds"(1)); 320 addPeer(ID , data); 321 }).start(); 322 return; 323 } 324 _clients[ID] = client; 325 logInfo(this.ID , " client connected " , hostport[0] , " " , hostport[1]); 326 }); 327 328 } 329 330 void delPeer(ulong ID) 331 { 332 if(ID !in _clients) 333 return ; 334 335 logInfo(this.ID , " client disconnect " , ID); 336 _clients[ID].close(); 337 _clients.remove(ID); 338 339 return ; 340 } 341 342 343 void send(Message[] msg) 344 { 345 foreach(m ; msg) 346 if(m.To in _clients) 347 _clients[m.To].write(m); 348 } 349 350 void tick() 351 { 352 _mutex.lock(); 353 node.Tick(); 354 _mutex.unlock(); 355 } 356 357 358 void step(Message msg) 359 { 360 _mutex.lock(); 361 node.Step(msg); 362 _mutex.unlock(); 363 } 364 365 366 Server!(Base,MessageReceiver) _server; 367 Server!(HttpBase,Raft) _http; 368 MessageTransfer[ulong] _clients; 369 370 Storage _storage; 371 Mutex _mutex; 372 HttpBase[ulong] _request; 373 } 374