1 module common.network.base; 2 3 import hunt.logging; 4 import hunt.util.Serialize; 5 6 import hunt.net; 7 import hunt.raft; 8 9 import core.stdc.string; 10 11 import common.network.api; 12 13 import std.bitmanip; 14 import std.stdint; 15 16 class Base 17 { 18 19 enum PACK_HEAD_LEN = 4; 20 21 this( NetSocket sock , MessageReceiver receiver) 22 { 23 this.sock = sock; 24 this.receiver = receiver; 25 sock.handler((in ubyte[] data){ 26 onRead(data); 27 }); 28 sock.closeHandler((){ 29 onClose(); 30 }); 31 } 32 33 34 void onRead(in ubyte[] data) 35 { 36 Message[] msgs; 37 int index = 0; 38 int length = cast(int)data.length; 39 int used; 40 while(index < length) 41 { 42 int left = length - index; 43 if( headLen < PACK_HEAD_LEN) 44 { 45 if(left >= PACK_HEAD_LEN - headLen) 46 { 47 used = PACK_HEAD_LEN - headLen; 48 header[headLen .. headLen + used] = data[index .. index + used]; 49 index += used; 50 headLen += used; 51 msgLen = bigEndianToNative!int32_t(header); 52 buffer.length = 0; 53 if(msgLen == 0) 54 { 55 msgs ~= unserialize!Message(cast(byte[])buffer); 56 headLen = 0; 57 } 58 } 59 else 60 { 61 header[headLen .. headLen + left] = data[index .. index + left]; 62 index += left; 63 headLen += left; 64 } 65 } 66 else 67 { 68 if(left >= msgLen - cast(int)buffer.length) 69 { 70 used = msgLen - cast(int)buffer.length; 71 buffer ~= data[index .. index + used]; 72 index += used; 73 msgs ~= unserialize!Message(cast(byte[])buffer); 74 headLen = 0; 75 } 76 else 77 { 78 buffer ~= data[index .. index + left]; 79 index += left; 80 } 81 } 82 83 } 84 85 foreach(m ; msgs) 86 { 87 //logDebug("recv a message " , m); 88 receiver.step(m); 89 } 90 91 92 } 93 94 void onClose() 95 { 96 97 } 98 99 100 NetSocket sock; 101 102 int msgLen; 103 ubyte[] buffer; 104 int headLen = 0; 105 ubyte[PACK_HEAD_LEN] header; 106 MessageReceiver receiver; 107 }