1 module hunt.raft.Log;
2 
3 import hunt.raft.Storage;
4 import hunt.raft.Logunstable;
5 import hunt.raft.Msg;
6 import hunt.raft.Util;
7 
8 import hunt.logging;
9 
10 import std.conv;
11 import std.format;
12 import std.algorithm;
13 
14 class raftLog
15 {
16 	Storage 	_storage;
17 	unstable	_unstable;
18 	ulong		_committed;
19 	ulong		_applied;
20 
21 	this(Storage storage)
22 	{
23 		if (storage is null)
24 		{
25 			logError("storage must not be nil");
26 		}
27 
28 		_storage = storage;
29 		_unstable = new unstable();
30 		ulong firstIndex;
31 		auto err = _storage.FirstIndex(firstIndex);
32 		if(err != ErrNil)
33 		{
34 			logError(err);
35 		}
36 
37 		ulong lastIndex;
38 		err = _storage.LastIndex(lastIndex);
39 		if(err != ErrNil)
40 		{
41 			logError(err);
42 		}
43 
44 		_unstable._offset = lastIndex + 1;
45 		_committed = firstIndex - 1;
46 		_applied = firstIndex -1;
47 	}
48 
49 	override string toString()
50 	{
51 		return format("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d",
52 			_committed, _applied, _unstable._offset, _unstable._entries.length);
53 	}
54 
55 	bool maybeAppend(ulong index , ulong logTerm , ulong committed , Entry[] ents , 
56 		out ulong lastnewi )
57 	{
58 		if(matchTerm(index , logTerm)){
59 			lastnewi = index + ents.length;
60 			auto ci = findConflict(ents);
61 			if( ci == 0){ }
62 			else if(ci <= _committed)
63 			{
64 				logError(format("entry %d conflict with committed entry [committed(%d)]", ci, _committed));
65 			}
66 			else
67 			{
68 				auto offset = index + 1;
69 				append(ents[ cast(uint)(ci - offset) .. $ ]);
70 			}
71 			commitTo(min(committed , lastnewi));
72 			return true;
73 
74 		}
75 
76 		return false;
77 	}
78 
79 	ulong append(Entry[] ents){
80 		if(ents.length == 0)
81 			return lastIndex();
82 
83 		auto after = ents[0].Index - 1;
84 		if(after < _committed)
85 		{
86 			logError(format("after(%d) is out of range [committed(%d)]", after, _committed));
87 		}
88 
89 		_unstable.truncateAndAppend(ents);
90 		return lastIndex();
91 	}
92 
93 	ulong findConflict(Entry[] ents)
94 	{
95 		foreach(ne ; ents){
96 			if(!matchTerm(ne.Index , ne.Term)){
97 
98 				if(ne.Index <= lastIndex()){
99 					ulong t;
100 					auto err = term(ne.Index , t);
101 					logInfo("found conflict at index %d [existing term: %d, conflicting term: %d]",
102 						ne.Index, zeroTermOnErrCompacted(t , err), ne.Term);
103 				}
104 
105 				return ne.Index;
106 			}
107 		}
108 		return 0;
109 	}
110 
111 	Entry[] unstableEntries()
112 	{
113 		return _unstable._entries;
114 	}
115 
116 	Entry[] nextEnts()
117 	{
118 		auto off = max(_applied + 1 , firstIndex());
119 		if( _committed + 1 > off)
120 		{
121 			Entry[] ents;
122 			auto err = slice(off , _committed + 1, noLimit , ents);
123 			if(err != ErrNil)
124 			{
125 				logError(format("unexpected error when getting unapplied entries (%s)", err));
126 			}
127 			return ents;
128 		}
129 		return null;
130 	}
131 
132 	bool hasNextEnts()  {
133 
134 		auto off = max(_applied + 1, firstIndex());
135 		return _committed + 1 > off;
136 	}
137 
138 	ErrString GetSnap(out Snapshot ss) {
139 		if (_unstable._snap != null) {
140 			ss =  *_unstable._snap;
141 			return ErrNil;
142 		}
143 		return _storage.GetSnap(ss);
144 	}
145 
146 
147 
148 
149 	ulong firstIndex()  {
150 	
151 		ulong index;
152 		if(_unstable.maybeFirstIndex(index))
153 			return index;
154 
155 		auto err = _storage.FirstIndex(index);
156 		if (err != ErrNil) {
157 			logError(err);
158 		}
159 		return index;
160 	}
161 
162 
163 	ulong lastIndex()  {
164 		ulong index;
165 		if(_unstable.maybeLastIndex(index))
166 		{
167 			return index;
168 		}
169 		
170 		auto err = _storage.LastIndex(index);
171 		if(err != ErrNil){
172 			logError(err);
173 		}
174 		return index;
175 	}
176 
177 	void commitTo(ulong tocommit) {
178 	
179 		if (_committed < tocommit) {
180 			if (lastIndex() < tocommit) {
181 				logError(format("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, lastIndex()));
182 			}
183 			_committed = tocommit;
184 		}
185 	}
186 	
187 	void appliedTo(ulong i) {
188 		if (i == 0) {
189 			return;
190 		}
191 		if (_committed < i || i < _applied) {
192 			logError(format("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, _applied, _committed));
193 		}
194 		_applied = i;
195 	}
196 	
197 	void stableTo(ulong i , ulong t) { 
198 		_unstable.stableTo(i, t);
199 	}
200 	
201 	void stableSnapTo(ulong i) { 
202 		_unstable.stableSnapTo(i) ;
203 	}
204 	
205 	ulong lastTerm() {
206 
207 		ulong t;
208 		auto err = term(lastIndex() , t);
209 		if( err != ErrNil) {
210 			logError(format("unexpected error when getting the last term (%s)", err));
211 		}
212 		return t;
213 	}
214 
215 
216 	ErrString term(ulong i, out ulong term) {
217 	
218 		auto dummyIndex = firstIndex() - 1;
219 		if (i < dummyIndex || i > lastIndex()) {
220 			term = 0;
221 			return ErrNil;
222 		}
223 
224 
225 		if(_unstable.maybeTerm(i ,term)){
226 
227 			return ErrNil;
228 		}
229 
230 		auto err = _storage.Term(i , term);
231 		if (err == ErrNil) {
232 
233 			return ErrNil;
234 		}
235 		if (err == ErrCompacted || err == ErrUnavailable) {
236 		
237 			term = 0;
238 			return err;
239 		}
240 		logError(err);
241 		return ErrNil;
242 	}
243 
244 
245 
246 	ErrString entries(ulong i,ulong maxsize , out Entry[] ents) {
247 		if (i > lastIndex()) {
248 			return ErrNil;
249 		}
250 		return slice(i, lastIndex()+1, maxsize , ents);
251 	}
252 	
253 
254 	Entry[]  allEntries() {
255 		Entry[] ents;
256 		auto err = entries(firstIndex(), noLimit , ents);
257 		if (err == ErrNil) {
258 			return ents;
259 		}
260 
261 		if (err == ErrCompacted) {
262 			return allEntries();
263 		}
264 
265 		logError(err);
266 		return null;
267 	}
268 	
269 
270 	bool isUpToDate(ulong lasti, ulong term)
271 	{
272 		return term > lastTerm() || (term == lastTerm() && lasti >= lastIndex());
273 	}
274 
275 	bool matchTerm(ulong i , ulong t)
276 	{
277 		ulong t0;
278 		auto err = term(i , t0);
279 		if(err != ErrNil)
280 			return false;
281 		return t0 == t;
282 	}
283 
284 	bool maybeCommit(ulong maxIndex , ulong t)
285 	{
286 		ulong t0;
287 		auto err = term(maxIndex ,t0);
288 		if(maxIndex > _committed && zeroTermOnErrCompacted(t0 , err) == t)
289 		{
290 			commitTo(maxIndex);
291 			return true;
292 		}
293 
294 		return false;
295 	}
296 
297 	void restore(Snapshot ss)
298 	{
299 		logInfo(format("log [%s] starts to restore snapshot [index: %d, term: %d]",
300 				to!string(this.toHash()), ss.Metadata.Index, ss.Metadata.Term));
301 		_committed = ss.Metadata.Index;
302 		_unstable.restore(ss);
303 	}
304 
305 	ErrString slice(ulong lo , ulong hi , ulong maxSize , out Entry[] ents)
306 	{
307 		auto err = mustCheckOutOfBounds(lo , hi);
308 		if( err != ErrNil)
309 		{
310 			return err;
311 		}
312 
313 		if(lo == hi)
314 		{
315 			return ErrNil;
316 		}
317 
318 
319 		if( lo < _unstable._offset)
320 		{
321 			Entry[] store;
322 			err = _storage.Entries(lo , min(hi , _unstable._offset) ,maxSize , store);
323 			if(err == ErrCompacted)
324 			{
325 				return err;
326 			}
327 			else if(err == ErrUnavailable)
328 			{	
329 				logError(format("entries[%d:%d) is unavailable from storage", 
330 					lo, min(hi, _unstable._offset)));
331 			}
332 			else if(err != ErrNil)
333 			{
334 				logError(err);
335 			}
336 
337 			if(store.length < min(hi , _unstable._offset) - lo)
338 			{
339 				ents = store;
340 				return ErrNil;
341 			}
342 
343 			ents = store;
344 		}
345 
346 		if(hi > _unstable._offset)
347 		{
348 			Entry[] uns = _unstable.slice(max(lo , _unstable._offset) , hi);
349 			if( ents.length > 0)
350 				ents ~= uns;
351 			else
352 				ents = uns;
353 		}
354 
355 		ents = limitSize(ents , maxSize);
356 		return ErrNil;
357 	}
358 
359 	ErrString mustCheckOutOfBounds(ulong lo , ulong hi)
360 	{
361 		if(lo > hi)
362 		{
363 			logError(format("invalid slice %d > %d", lo, hi));
364 		}
365 
366 		ulong fi = firstIndex();
367 		if( lo < fi){
368 			return ErrCompacted;
369 		}
370 
371 		auto length = lastIndex() + 1 - fi;
372 		if(lo < fi || hi > fi + length)
373 		{
374 			logError(format("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, lastIndex()));
375 		}
376 
377 		return ErrNil;
378 	}
379 
380 
381 
382 	ulong zeroTermOnErrCompacted(ulong t, ErrString err){
383 		if(err == ErrNil)
384 		{
385 			return t;
386 		}
387 		if (err == ErrCompacted) {
388 			return 0;
389 		}
390 
391 		logError(format("unexpected error (%s)", err));
392 		return 0;
393 	}
394 }
395 
396 
397