1 module hunt.raft.Node; 2 3 import hunt.raft.Msg; 4 import hunt.raft.Storage; 5 import hunt.raft.Raft; 6 import hunt.raft.Readonly; 7 import hunt.raft.Status; 8 import hunt.raft.Util; 9 10 import hunt.util.Serialize; 11 import hunt.logging; 12 13 import std.container; 14 15 16 17 18 alias Context = Object; 19 alias SnapshotStatus = int; 20 21 immutable SnapshotStatus SnapshotFinish = 1; 22 immutable SnapshotStatus SnapshotFailure = 2; 23 24 immutable HardState emptyState; 25 enum ErrStopped = "raft: stopped"; 26 27 struct SoftState{ 28 ulong Lead; 29 StateType RaftState; 30 31 bool equals(SoftState *b) 32 { 33 return Lead == b.Lead && RaftState == b.RaftState; 34 } 35 } 36 37 struct Ready 38 { 39 SoftState* softstate; 40 HardState hs; 41 42 ReadState[] ReadStates; 43 Entry[] Entries; 44 45 Snapshot snap; 46 Entry[] CommittedEntries; 47 48 Message[] Messages; 49 50 bool mustSync; 51 52 53 bool containsUpdates() 54 { 55 return softstate != null || !IsEmptyHardState(hs) || 56 !IsEmptySnap(snap) || Entries.length > 0 || 57 CommittedEntries.length > 0 || Messages.length > 0 || 58 ReadStates.length != 0 ; 59 60 } 61 62 63 static bool MustSync( HardState prevst , HardState st , int entsnum) 64 { 65 return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term; 66 } 67 68 static Ready newReady(Raft r , SoftState* prevSortSt , HardState prevHardST) 69 { 70 Ready rd = { 71 Entries: r._raftLog.unstableEntries() , 72 CommittedEntries: r._raftLog.nextEnts() , 73 Messages: r._msgs}; 74 75 auto softSt = r.softState(); 76 if(prevSortSt != null && !softSt.equals(prevSortSt)) 77 rd.softstate = softSt; 78 79 auto hardSt = r.hardState(); 80 if(!isHardStateEqual(hardSt , prevHardST)) 81 rd.hs = hardSt; 82 83 if(r._raftLog._unstable._snap != null) 84 rd.snap = *r._raftLog._unstable._snap; 85 86 if(r._readStates.length != 0) 87 rd.ReadStates = r._readStates; 88 89 rd.mustSync = Ready.MustSync(rd.hs , prevHardST , cast(int)rd.Entries.length); 90 return rd; 91 } 92 } 93 94 bool isHardStateEqual(HardState a ,HardState b) 95 { 96 return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit; 97 } 98 99 bool IsEmptyHardState(HardState st) 100 { 101 return isHardStateEqual(st , emptyState); 102 } 103 104 bool IsEmptySnap(Snapshot sp) 105 { 106 return sp.Metadata.Index == 0; 107 } 108 109 struct Peer 110 { 111 ulong ID; 112 string Context; 113 } 114 115 116 117 118 version(NO_ORIGIN): 119 120 interface Node 121 { 122 void Tick(); 123 ErrString Campaign(Context ctx); 124 ErrString Propose(Context ctx , string data); 125 ErrString ProposeConfChange(Context ctx , ConfChange cc); 126 127 ErrString Step(Context ctx , Message msg); 128 129 DList!Ready ready(); 130 131 void Advance(); 132 ConfState ApplyConfChange(ConfChange cc); 133 void TransferLeadership(Context ctx , ulong lead , ulong transferee); 134 ErrString ReadIndex(Context ctx , string rctx); 135 Status status(); 136 void ReportUnreachable(ulong id); 137 void ReportSnapshot(ulong id , SnapshotStatus status); 138 139 void Stop(); 140 141 } 142 143 144 145 146 Node StartNode(Config c , Peer[] peers) 147 { 148 auto r = new Raft(c); 149 150 r.becomeFollower(1 , None); 151 152 foreach( peer ; peers) 153 { 154 ConfChange cc = {Type:ConfChangeType.ConfChangeAddNode , NodeID:peer.ID , Context:peer.Context}; 155 byte[] ser = serialize(cc); 156 Entry[] ents; 157 Entry e = {Type:EntryType.EntryConfChange , Term : 1 , Index : r._raftLog.lastIndex() + 1 , Data:cast(string)ser}; 158 ents ~= e; 159 r._raftLog.append(ents); 160 } 161 162 r._raftLog._committed = r._raftLog.lastIndex(); 163 164 foreach(peer ; peers) 165 { 166 r.addNode(peer.ID); 167 } 168 169 auto n = new node(); 170 // go run. 171 return n; 172 } 173 174 Node RestartNode(Config c) 175 { 176 auto r = new Raft(c); 177 178 auto n = new node(); 179 180 //go run 181 182 return n; 183 } 184 185 186 class node : Node 187 { 188 DList!Message _propc; 189 DList!Message _recvc; 190 DList!ConfChange _confc; 191 DList!ConfState _confstatec; 192 DList!Ready _readyc; 193 DList!Object _advancec; 194 DList!Object _tickc; 195 DList!Object _done; 196 DList!Object _stop; 197 DList!(DList!Status) _status; 198 199 200 this() 201 { 202 203 } 204 205 206 207 208 //next. 209 void Stop() 210 { 211 while(!_done.empty()) 212 break; 213 } 214 215 void run(Raft r) 216 { 217 DList!Message* propc ; 218 DList!Ready* readyc; 219 DList!Object* advancec; 220 ulong prevLastUnstablei, prevLastUnstablet; 221 bool havePrevLastUnstablei; 222 ulong prevSnapi; 223 Ready rd; 224 225 ulong lead = None; 226 SoftState* prevSoftSt = r.softState(); 227 HardState prevHardSt = emptyState; 228 229 while(1) 230 { 231 if( advancec != null) 232 { 233 readyc = null; 234 } 235 else 236 { 237 //next. 238 } 239 240 if(lead != r._lead) 241 { 242 if(r.hasLeader()) 243 { 244 if( lead == None) 245 { 246 logInfo(format("raft.node: %x elected leader %x at term %d", 247 r._id, r._lead, r._Term)); 248 } 249 else 250 { 251 logInfo(format("raft.node: %x changed leader from %x to %x at term %d", 252 r._id, lead, r._lead, r._Term)); 253 } 254 propc = &_propc; 255 } 256 else 257 { 258 logInfo("raft.node: %x lost leader %x at term %d" , r._id , lead , r._Term); 259 propc = null; 260 } 261 262 lead = r._lead; 263 } 264 265 while(1) 266 { 267 if(!propc.empty()) 268 { 269 Message m = propc.front(); 270 m.From = r._id; 271 r.Step(m); 272 propc.removeFront(); 273 } 274 else if(!_recvc.empty()) 275 { 276 Message m = _recvc.front(); 277 auto ok = m.From in r._prs; 278 if( ok || !IsResponseMsg(m.Type)){ 279 r.Step(m); 280 } 281 _recvc.removeFront(); 282 } 283 else if(!_confc.empty()) 284 { 285 ConfChange cc = _confc.front(); 286 if( cc.NodeID == None) 287 { 288 r.resetPendingConf(); 289 //next 290 ConfState cs = { Nodes:r.nodes()}; 291 _confstatec.insertBack(cs); 292 _confc.removeFront(); 293 break; 294 } 295 switch(cc.Type) 296 { 297 case ConfChangeType.ConfChangeAddNode: 298 r.addNode(cc.NodeID); 299 break; 300 case ConfChangeType.ConfChangeRemoveNode: 301 if(cc.NodeID == r._id) 302 propc = null; 303 r.removeNode(cc.NodeID); 304 break; 305 case ConfChangeType.ConfChangeUpdateNode: 306 r.resetPendingConf(); 307 break; 308 default: 309 logError("unexpected conf type"); 310 } 311 //next 312 ConfState cs = {Nodes:r.nodes()}; 313 _confstatec.insertBack(cs); 314 _confc.removeFront(); 315 } 316 else if(!_tickc.empty()) 317 { 318 r._tick(); 319 _tickc.removeFront(); 320 } 321 else if(readyc !is null) 322 { 323 //next 324 //if(rd.softstate != null) 325 prevSoftSt = rd.softstate; 326 327 if(rd.Entries.length > 0 ) 328 { 329 prevLastUnstablei = rd.Entries[$ - 1].Index; 330 prevLastUnstablet = rd.Entries[$ - 1].Term; 331 havePrevLastUnstablei = true; 332 } 333 334 if( !IsEmptyHardState(rd.hs)) 335 { 336 prevHardSt = rd.hs; 337 } 338 339 r._msgs = null; 340 r._readStates = null; 341 advancec = &_advancec; 342 readyc.insertBack(rd); 343 } 344 else if(!advancec.empty()) 345 { 346 if(prevHardSt.Commit != 0) 347 r._raftLog.appliedTo(prevHardSt.Commit); 348 349 if(havePrevLastUnstablei) 350 { 351 r._raftLog.stableTo(prevLastUnstablei , prevLastUnstablet); 352 havePrevLastUnstablei = false; 353 } 354 355 r._raftLog.stableSnapTo(prevSnapi); 356 advancec.removeFront(); 357 advancec = null; 358 } 359 else if(!_status.empty()) 360 { 361 auto c = _status.front(); 362 c.insertBack(getStatus(r)); 363 _status.removeFront(); 364 } 365 else if(!_stop.empty()) 366 { 367 //next 368 //close(_done); 369 } 370 } 371 372 373 } 374 } 375 376 377 void Tick() 378 { 379 //next 380 _tickc.insertBack(new Object); 381 } 382 383 ErrString Campaign(Context ctx) 384 { 385 Message msg = {Type : MessageType.MsgHup}; 386 return step(ctx ,msg); 387 } 388 389 ErrString Propose(Context ctx , string data) 390 { 391 Message msg = {Type : MessageType.MsgProp , Entries :[{Data : data}]}; 392 return step(ctx , msg); 393 } 394 395 string ProposeConfChange(Context ctx , ConfChange cc) 396 { 397 return ErrNil; 398 } 399 400 401 ErrString Step(Context ctx , Message m) 402 { 403 if( IsLocalMsg(m.Type)) 404 return ErrNil; 405 406 return step(ctx , m); 407 } 408 409 ErrString step(Context ctx , Message m) 410 { 411 //next 412 if(m.Type == MessageType.MsgProp) 413 { 414 _propc.insertBack(m); 415 } 416 else 417 { 418 _recvc.insertBack(m); 419 } 420 421 return ErrNil; 422 } 423 424 DList!Ready ready() 425 { 426 return _readyc; 427 } 428 429 void Advance() 430 { 431 _advancec.insertBack(new Object); 432 } 433 434 ConfState ApplyConfChange(ConfChange cc) 435 { 436 //next 437 _confc.insertBack(cc); 438 439 while(!_confstatec.empty()){} 440 441 ConfState cs = _confstatec.front(); 442 443 _confstatec.removeBack(); 444 445 446 return cs; 447 } 448 449 Status status() 450 { 451 //next 452 //_status. 453 Status st; 454 455 return st; 456 } 457 458 void ReportUnreachable(ulong id) 459 { 460 461 } 462 463 void ReportSnapshot(ulong id , SnapshotStatus status) 464 { 465 466 } 467 468 void TransferLeadership(Context ctx , ulong lead , ulong transferee) 469 { 470 471 } 472 473 ErrString ReadIndex(Context ctx , string rctx) 474 { 475 476 Entry[] ents = [{Data:rctx}]; 477 Message msg = {Type : MessageType.MsgReadIndex , Entries : ents}; 478 step(ctx , msg); 479 return ErrNil; 480 } 481 482 }