1 module hunt.raft.Readonly;
2 
3 
4 import hunt.raft.Raft;
5 import hunt.raft.Msg;
6 
7 import hunt.logging;
8 
9 import std.experimental.allocator;
10 
11 struct ReadState{
12 	ulong 		Index;
13 	string		RequestCtx;
14 }
15 
16 
17 struct readIndexStatus
18 {
19 	Message 		req;
20 	ulong 			index;
21 	Object[ulong] 	acks;
22 }
23 
24 class readOnly
25 {
26 	ReadOnlyOption 					_option;
27 	readIndexStatus*[string]		_pendingReadIndex;
28 	string[]						_readIndexQueue;
29 
30 	this(ReadOnlyOption option)
31 	{
32 		_option = option;
33 	}
34 
35 	void addRequest(ulong index , Message m)
36 	{
37 		auto ctx = m.Entries[0].Data;
38 		if(ctx  in _pendingReadIndex)
39 			return;
40 
41 		_pendingReadIndex[ctx] = theAllocator.make!readIndexStatus(m , index);
42 		_readIndexQueue ~= ctx;
43 	}
44 
45 	int recvAck(Message m)
46 	{
47 		readIndexStatus **rs = (m.Context in _pendingReadIndex);
48 		if(rs == null)
49 			return 0;
50 
51 		(*rs).acks[m.From] = new Object();
52 		return cast(int)(*rs).acks.length + 1;
53 	}
54 
55 	readIndexStatus*[] advance(Message m)
56 	{
57 		int i = 0;
58 		bool found = false;
59 
60 		auto ctx = m.Context;
61 		readIndexStatus*[] rss;
62 
63 		foreach( v  ; _readIndexQueue)
64 		{
65 			i++;
66 			auto rs = v in _pendingReadIndex;
67 			if( rs == null)
68 			{
69 				logError("cannot find corresponding read state from pending map");
70 			}
71 			rss ~=  *rs;
72 			if( v == ctx)
73 			{
74 				found = true;
75 				break;
76 			}		
77 		}
78 
79 		if(found)
80 		{
81 			_readIndexQueue = _readIndexQueue[i .. $];
82 			foreach(rs ; rss)
83 			{
84 				_pendingReadIndex.remove(rs.req.Entries[0].Data);
85 			}
86 
87 			return rss;
88 		}
89 		return null;
90 	}
91 
92 	string lastPendingRequestCtx()
93 	{
94 		if(_readIndexQueue.length == 0)
95 			return "";
96 
97 		return _readIndexQueue[$ - 1];
98 	}
99 
100 }