1 module app.groupraft; 2 3 4 import common.network; 5 import common.raft.node; 6 7 import hunt.raft; 8 import hunt.logging; 9 import hunt.util.Serialize; 10 import hunt.util.timer; 11 import hunt.net; 12 13 import std.string; 14 import std.conv; 15 import std.format; 16 17 import core.thread; 18 import core.sync.mutex; 19 20 21 import common.wal.storage; 22 23 alias Server = common.network.server.Server; 24 alias Client = common.network.client.Client; 25 alias Storage = common.wal.storage.Storage; 26 27 28 class ClusterClient 29 { 30 ulong firstID; 31 string host; 32 ushort port; 33 string apihost; 34 ushort apiport; 35 } 36 37 38 class GroupRaft : MessageReceiver 39 { 40 this() 41 { 42 43 } 44 45 //node1 [1,2,3] 46 //node2 [1,2,4] 47 //node3 [2,3,4] 48 //node4 [1,2,3] 49 void addPeer(ulong ID , string data) 50 { 51 if(ID in _clients) 52 return ; 53 54 auto client = new Client(_ID , ID); 55 string[] hostport = split(data , ":"); 56 client.connect(hostport[0] , to!int(hostport[1]) , (Result!NetSocket result){ 57 if(result.failed()){ 58 59 new Thread((){ 60 Thread.sleep(dur!"seconds"(1)); 61 addPeer(ID , data); 62 }).start(); 63 return; 64 } 65 _clients[ID] = client; 66 logInfo(_ID , " client connected " , hostport[0] , " " , hostport[1]); 67 }); 68 69 } 70 71 void start(ulong firstID , ulong[][ulong] regions , ClusterClient[] clients) 72 { 73 foreach(c ; clients) 74 { 75 //server 76 if(firstID == c.firstID) 77 { 78 _server = new Server!(Base ,MessageReceiver)(firstID , this); 79 _server.listen(c.host , c.port); 80 logInfo(firstID , " server open " , c.host , " " , c.port); 81 } 82 //client 83 else 84 { 85 addPeer(c.firstID , c.host ~ ":" ~ to!string(c.port)); 86 } 87 } 88 89 _ID = firstID; 90 _mutex = new Mutex(); 91 92 foreach(k , v ; regions) 93 { 94 Config conf = new Config(); 95 auto kvs = new Storage(); 96 auto store = new MemoryStorage(); 97 auto ID = (firstID * 10 + k); 98 Peer[] peers; 99 foreach( id ; v) 100 { 101 Peer p = {ID: id * 10 +k}; 102 peers ~= p; 103 } 104 logInfo(ID ," " , peers , v); 105 106 107 108 Snapshot *shot = null; 109 110 ConfState confState; 111 ulong snapshotIndex; 112 ulong appliedIndex; 113 ulong lastIndex; 114 RawNode node; 115 116 117 HardState hs; 118 Entry[] ents; 119 bool exist = kvs.load("snap.log" ~ to!string(ID) , "entry.log" ~ to!string(ID) ,"hs.log" ~ to!string(ID), shot , hs , ents); 120 if(shot != null) 121 { 122 store.ApplySnapshot(*shot); 123 confState = shot.Metadata.CS; 124 snapshotIndex = shot.Metadata.Index; 125 appliedIndex = shot.Metadata.Index; 126 } 127 128 store.setHadrdState(hs); 129 store.Append(ents); 130 131 if(ents.length > 0) 132 { 133 lastIndex = ents[$ - 1].Index; 134 } 135 conf._ID = ID; 136 conf._ElectionTick = 10; 137 conf._HeartbeatTick = 1; 138 conf._storage = store; 139 conf._MaxSizePerMsg = 1024 * 1024; 140 conf._MaxInflightMsgs = 256; 141 142 if(exist) 143 { 144 node = new RawNode(conf); 145 } 146 else 147 { 148 node = new RawNode(conf , peers); 149 } 150 151 _rafts[ID] = new Node(ID , store , node , kvs , lastIndex ,confState , snapshotIndex , appliedIndex); 152 153 } 154 155 logInfo(_clients , " " , _rafts); 156 157 158 159 /// ready 160 new Thread((){ 161 while(1) 162 { 163 ready(); 164 Thread.sleep(dur!"msecs"(10)); 165 } 166 }).start(); 167 168 /// tick 169 new Thread((){ 170 while(1){ 171 tick(); 172 Thread.sleep(dur!"msecs"(100)); 173 } 174 }).start(); 175 176 NetUtil.startEventLoop(-1); 177 178 } 179 180 181 void tick() 182 { 183 _mutex.lock(); 184 foreach( r ; _rafts) 185 { 186 r.node.Tick(); 187 } 188 _mutex.unlock(); 189 } 190 191 void ready() 192 { 193 _mutex.lock(); 194 foreach(r ; _rafts) 195 { 196 Ready rd = r.node.ready(); 197 if(!rd.containsUpdates()) 198 { 199 continue; 200 } 201 202 r.store.save(rd.hs , rd.Entries); 203 if(!IsEmptySnap(rd.snap)) 204 { 205 r.store.saveSnap(rd.snap); 206 r.storage.ApplySnapshot(rd.snap); 207 r.publishSnapshot(rd.snap); 208 } 209 210 r.storage.Append(rd.Entries); 211 send(rd.Messages); 212 r.publishEntries(r.entriesToApply(rd.CommittedEntries)); 213 r.maybeTriggerSnapshot(); 214 r.node.Advance(rd); 215 } 216 _mutex.unlock(); 217 } 218 219 void send(Message[] msg) 220 { 221 //logInfo(msg); 222 foreach(m ; msg) 223 { 224 ulong ID = m.To / 10; 225 if(ID in _clients) 226 _clients[ID].write(m); 227 } 228 } 229 230 void step(Message msg) 231 { 232 _mutex.lock(); 233 _rafts[msg.To].node.Step(msg); 234 _mutex.unlock(); 235 } 236 237 238 239 Server!(Base,MessageReceiver) _server; 240 MessageTransfer[ulong] _clients; 241 Node[ulong] _rafts; 242 Storage _storage; 243 Mutex _mutex; 244 ulong _ID; 245 } 246