1 module hunt.raft.Rawnode; 2 3 import hunt.raft.Raft; 4 import hunt.raft.Node; 5 import hunt.raft.Msg; 6 import hunt.raft.Log; 7 import hunt.raft.Status; 8 import hunt.raft.Storage; 9 import hunt.raft.Util; 10 11 import hunt.logging; 12 import hunt.util.Serialize; 13 14 enum ErrStepLocalMsg = "raft: cannot step raft local message"; 15 enum ErrStepPeerNotFound = "raft: cannot step as peer not found"; 16 17 class RawNode 18 { 19 Raft _raft; 20 SoftState* _preSoftSt; 21 HardState _prevHardSt; 22 23 24 Ready newReady() 25 { 26 return Ready.newReady(_raft , _preSoftSt , _prevHardSt); 27 } 28 29 30 bool isLeader() 31 { 32 return _raft._lead == _raft._id; 33 } 34 35 void commitReady(Ready rd) 36 { 37 if(rd.softstate != null) 38 { 39 _preSoftSt = rd.softstate; 40 } 41 42 if (!IsEmptyHardState(rd.hs)) 43 { 44 _prevHardSt = rd.hs; 45 } 46 47 if(_prevHardSt.Commit != 0) 48 { 49 _raft._raftLog.appliedTo(_prevHardSt.Commit); 50 } 51 52 if( rd.Entries.length > 0) 53 { 54 auto e = rd.Entries[$ - 1]; 55 _raft._raftLog.stableTo(e.Index , e.Term); 56 } 57 58 if(!IsEmptySnap(rd.snap)) 59 { 60 _raft._raftLog.stableSnapTo(rd.snap.Metadata.Index); 61 } 62 63 if(rd.ReadStates != null) 64 { 65 _raft._readStates = null; 66 } 67 } 68 69 70 this(Config c) 71 { 72 _raft = new Raft(c); 73 } 74 75 76 77 this(Config c , Peer[] peers) 78 { 79 if( c._ID == 0) 80 { 81 logError("config.ID must not be zero"); 82 } 83 auto r = new Raft(c); 84 85 ulong lastIndex; 86 auto err = c._storage.LastIndex(lastIndex); 87 if( err != ErrNil) 88 { 89 logError(err); 90 } 91 92 if( lastIndex == 0) 93 { 94 r.becomeFollower(1 , None); 95 Entry[] ents; 96 foreach( i , peer ; peers ) 97 { 98 ConfChange cc = { Type:ConfChangeType.ConfChangeAddNode , 99 NodeID : peer.ID , Context: peer.Context}; 100 auto data = cast(string)serialize(cc); 101 Entry ent = {Type:EntryType.EntryConfChange , Term:1 , Index:i + 1 , 102 Data:data}; 103 ents ~= ent; 104 } 105 106 r._raftLog.append(ents); 107 r._raftLog._committed = ents.length; 108 foreach( peer ; peers) 109 { 110 r.addNode(peer.ID); 111 } 112 } 113 114 _preSoftSt = r.softState(); 115 if( lastIndex == 0) 116 _prevHardSt = emptyState; 117 else 118 _prevHardSt = r.hardState(); 119 120 _raft = r; 121 } 122 123 void Tick() 124 { 125 _raft._tick(); 126 } 127 128 void TickQuiesced() 129 { 130 _raft._electionElapsed++; 131 } 132 133 ErrString Campaign() 134 { 135 Message msg = { Type : MessageType.MsgHup }; 136 return _raft.Step(msg); 137 138 } 139 140 ErrString Propose(string data) 141 { 142 Message msg = {Type : MessageType.MsgProp , From : _raft._id , 143 Entries :[{Data : data}]}; 144 145 return _raft.Step(msg); 146 } 147 148 ErrString ProposeConfChange(ConfChange cc) 149 { 150 auto data = cast(string)serialize(cc); 151 152 Message msg = {Type : MessageType.MsgProp , Entries: 153 [{Type : EntryType.EntryConfChange , Data : data}]}; 154 155 return _raft.Step(msg); 156 } 157 158 ConfState ApplyConfChange(ConfChange cc) 159 { 160 if(cc.NodeID == None) 161 { 162 _raft.resetPendingConf(); 163 ConfState cs = {Nodes:_raft.nodes()}; 164 return cs; 165 } 166 167 switch(cc.Type) 168 { 169 case ConfChangeType.ConfChangeAddNode: 170 _raft.addNode(cc.NodeID); 171 break; 172 case ConfChangeType.ConfChangeRemoveNode: 173 _raft.removeNode(cc.NodeID); 174 break; 175 case ConfChangeType.ConfChangeUpdateNode: 176 _raft.resetPendingConf(); 177 break; 178 default: 179 logError("unexpected conf type"); 180 } 181 182 ConfState cs = {Nodes : _raft.nodes()}; 183 return cs; 184 } 185 186 ErrString Step(Message m) 187 { 188 if( IsLocalMsg(m.Type)) 189 return ErrStepLocalMsg; 190 191 auto prs = m.From in _raft._prs; 192 193 if(prs != null || !IsResponseMsg(m.Type)) 194 { 195 return _raft.Step(m); 196 } 197 198 return ErrStepPeerNotFound; 199 } 200 201 Ready ready() 202 { 203 auto rd = newReady(); 204 _raft._msgs = null; 205 return rd; 206 } 207 208 bool HasReady() 209 { 210 if(_raft.softState().equals(_preSoftSt)) 211 return true; 212 213 auto hardSt = _raft.hardState(); 214 215 if(!IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt , _prevHardSt)) 216 return true; 217 218 if(_raft._raftLog._unstable._snap != null && !IsEmptySnap(*_raft._raftLog._unstable._snap)) 219 return true; 220 221 if(_raft._readStates.length != 0) 222 return true; 223 224 return false; 225 } 226 227 228 void Advance(Ready rd) 229 { 230 commitReady(rd); 231 } 232 233 Status status() 234 { 235 return getStatus(_raft); 236 } 237 238 void ReportUnreachable(ulong id) 239 { 240 Message msg = {Type : MessageType.MsgUnreachable , From : id}; 241 _raft.Step(msg); 242 } 243 244 void ReportSnapshot(ulong id , SnapshotStatus status) 245 { 246 auto rej = (status == SnapshotFailure); 247 248 Message msg = {Type : MessageType.MsgSnapStatus , From : id , Reject : rej}; 249 250 _raft.Step(msg); 251 } 252 253 void TransferLeader(ulong transferee) 254 { 255 Message msg = {Type:MessageType.MsgTransferLeader , From : transferee}; 256 _raft.Step(msg); 257 } 258 259 void ReadIndex(string rctx) 260 { 261 Message msg = {Type : MessageType.MsgReadIndex , Entries :[{Data : rctx}]}; 262 _raft.Step(msg); 263 } 264 } 265 266 267