1 module common.raft.node; 2 3 4 import common.wal.api; 5 6 7 import hunt.raft; 8 import hunt.logging; 9 import hunt.util.Serialize; 10 import hunt.util.timer; 11 import hunt.net; 12 13 import std.string; 14 import std.conv; 15 import std.format; 16 17 18 enum defaultSnapCount = 10000; 19 enum snapshotCatchUpEntriesN = 10000; 20 21 22 class Node 23 { 24 ulong ID; 25 MemoryStorage storage; 26 RawNode node; 27 bool join; 28 ulong lastIndex; 29 ConfState confstate; 30 ulong snapshotIndex; 31 ulong appliedIndex; 32 DataStorage store; 33 34 this(ulong ID , MemoryStorage storage , RawNode node , DataStorage store , 35 ulong lastIndex , ConfState confstate , ulong snapshotIndex , ulong appliedIndex) 36 { 37 this.ID = ID; 38 this.storage = storage; 39 this.node = node; 40 this.store = store; 41 this.lastIndex = lastIndex; 42 this.confstate = confstate; 43 this.snapshotIndex = snapshotIndex; 44 this.appliedIndex = appliedIndex; 45 } 46 47 void publishSnapshot(Snapshot snap) 48 { 49 if(IsEmptySnap(snap)) 50 return; 51 52 if(snap.Metadata.Index <= appliedIndex) 53 { 54 logError(format("snapshot index [%d] should > progress.appliedIndex [%d] + 1", 55 snap.Metadata.Index, appliedIndex)); 56 } 57 58 confstate = snap.Metadata.CS; 59 snapshotIndex = snap.Metadata.Index; 60 appliedIndex = snap.Metadata.Index; 61 } 62 63 Entry[] entriesToApply(Entry[] ents) 64 { 65 if(ents.length == 0) 66 return null; 67 68 auto firstIdx = ents[0].Index; 69 if(firstIdx > appliedIndex + 1) 70 { 71 logError(format("first index of committed entry[%d] should <= progress.appliedIndex[%d] 1", 72 firstIdx, appliedIndex)); 73 } 74 75 if(appliedIndex - firstIdx + 1 < ents.length) 76 return ents[appliedIndex - firstIdx + 1 .. $]; 77 78 return null; 79 } 80 81 82 83 void maybeTriggerSnapshot() 84 { 85 if(appliedIndex - snapshotIndex <= defaultSnapCount) 86 return; 87 88 logInfo(format("start snapshot [applied index: %d | last snapshot index: %d]", 89 appliedIndex, snapshotIndex)); 90 91 auto data = store.getSnapData(); 92 Snapshot snap; 93 auto err = storage.CreateSnapshot(appliedIndex ,&confstate , data , snap); 94 if(err != ErrNil) 95 { 96 logError(err); 97 } 98 99 store.saveSnap(snap); 100 101 long compactIndex = 1; 102 if(appliedIndex > snapshotCatchUpEntriesN) 103 compactIndex = appliedIndex - snapshotCatchUpEntriesN; 104 105 storage.Compact(compactIndex); 106 logInfo("compacted log at index " , compactIndex); 107 snapshotIndex = appliedIndex; 108 } 109 110 bool publishEntries(Entry[] ents) 111 { 112 for(auto i = 0 ; i < ents.length ;i++) 113 { 114 appliedIndex = ents[i].Index; 115 } 116 return true; 117 } 118 119 120 /* 121 void send(Message[] msg) 122 { 123 124 } 125 126 void addPeer(ulong ID , string data) 127 { 128 129 } 130 131 void delPeer(ulong ID) 132 { 133 134 } 135 136 137 void ready() 138 { 139 Ready rd = node.ready(); 140 if(!rd.containsUpdates()) 141 { 142 return; 143 } 144 store.save(rd.hs, rd.Entries); 145 if( !IsEmptySnap(rd.snap)) 146 { 147 store.saveSnap(rd.snap); 148 storage.ApplySnapshot(rd.snap); 149 publishSnapshot(rd.snap); 150 } 151 storage.Append(rd.Entries); 152 send(rd.Messages); 153 if(!publishEntries(entriesToApply(rd.CommittedEntries))) 154 { 155 logWarning("will be stop!"); 156 return; 157 } 158 159 //for readindex 160 foreach( r ; rd.ReadStates) 161 { 162 if( r.Index >= appliedIndex) 163 { 164 165 } 166 } 167 168 maybeTriggerSnapshot(); 169 node.Advance(rd); 170 171 }*/ 172 } 173