1 module hunt.raft.Readonly; 2 3 4 import hunt.raft.Raft; 5 import hunt.raft.Msg; 6 7 import hunt.logging; 8 9 import std.experimental.allocator; 10 11 struct ReadState{ 12 ulong Index; 13 string RequestCtx; 14 } 15 16 17 struct readIndexStatus 18 { 19 Message req; 20 ulong index; 21 Object[ulong] acks; 22 } 23 24 class readOnly 25 { 26 ReadOnlyOption _option; 27 readIndexStatus*[string] _pendingReadIndex; 28 string[] _readIndexQueue; 29 30 this(ReadOnlyOption option) 31 { 32 _option = option; 33 } 34 35 void addRequest(ulong index , Message m) 36 { 37 auto ctx = m.Entries[0].Data; 38 if(ctx in _pendingReadIndex) 39 return; 40 41 _pendingReadIndex[ctx] = theAllocator.make!readIndexStatus(m , index); 42 _readIndexQueue ~= ctx; 43 } 44 45 int recvAck(Message m) 46 { 47 readIndexStatus **rs = (m.Context in _pendingReadIndex); 48 if(rs == null) 49 return 0; 50 51 (*rs).acks[m.From] = new Object(); 52 return cast(int)(*rs).acks.length + 1; 53 } 54 55 readIndexStatus*[] advance(Message m) 56 { 57 int i = 0; 58 bool found = false; 59 60 auto ctx = m.Context; 61 readIndexStatus*[] rss; 62 63 foreach( v ; _readIndexQueue) 64 { 65 i++; 66 auto rs = v in _pendingReadIndex; 67 if( rs == null) 68 { 69 logError("cannot find corresponding read state from pending map"); 70 } 71 rss ~= *rs; 72 if( v == ctx) 73 { 74 found = true; 75 break; 76 } 77 } 78 79 if(found) 80 { 81 _readIndexQueue = _readIndexQueue[i .. $]; 82 foreach(rs ; rss) 83 { 84 _pendingReadIndex.remove(rs.req.Entries[0].Data); 85 } 86 87 return rss; 88 } 89 return null; 90 } 91 92 string lastPendingRequestCtx() 93 { 94 if(_readIndexQueue.length == 0) 95 return ""; 96 97 return _readIndexQueue[$ - 1]; 98 } 99 100 }