1 module tinyredis.connection; 2 3 /** 4 * Authors: Adil Baig, adil.baig@aidezigns.com 5 */ 6 7 public: 8 import std.socket : TcpSocket; 9 10 private: 11 import std.array : appender, back, popBack; 12 import tinyredis.parser; 13 import tinyredis.response; 14 15 debug(tinyredis) { 16 import std.stdio : writeln; 17 import tinyredis.encoder : escape; 18 } 19 20 public: 21 22 /** 23 * Sends a pre-encoded string 24 * 25 * Params: 26 * conn = Connection to redis server. 27 * encoded_cmd = The command to be sent. 28 * 29 * Throws: $(D ConnectionException) if sending fails. 30 */ 31 void send(TcpSocket conn, string encoded_cmd) 32 { 33 debug(tinyredis) { writeln("Request : '", escape(encoded_cmd) ~ "'"); } 34 35 auto sent = conn.send(encoded_cmd); 36 if (sent != (cast(byte[])encoded_cmd).length) 37 throw new ConnectionException("Error while sending request"); 38 } 39 40 /** 41 * Receive responses from redis server 42 * 43 * Params: 44 * conn = Connection to redis server. 45 * minResponses = The number of multibulks you expect 46 * 47 * Throws: $(D ConnectionException) if there is a socket error or server closes the connection. 48 */ 49 Response[] receiveResponses(TcpSocket conn, size_t minResponses = 0) 50 { 51 byte[] buffer; 52 Response[] responses; 53 Response*[] MultiBulks; //Stack of pointers to multibulks 54 Response[]* stackPtr = &responses; 55 56 while(true) 57 { 58 receive(conn, buffer); 59 60 debug(tinyredis) { writeln("BUFFER : ", escape(cast(string)buffer)); } 61 62 while(buffer.length > 0) 63 { 64 auto r = parseResponse(buffer); 65 if(r.type == ResponseType.Invalid) 66 break; 67 68 *stackPtr ~= r; 69 if(r.type == ResponseType.MultiBulk) 70 { 71 auto mb = &((*stackPtr)[$-1]); 72 if(mb.count > 0) 73 { 74 MultiBulks ~= mb; 75 stackPtr = &((*mb).values); 76 } 77 } 78 else 79 while(MultiBulks.length > 0) 80 { 81 auto mb = *(MultiBulks.back); 82 83 if(mb.count == mb.values.length) 84 { 85 MultiBulks.popBack(); 86 87 if(MultiBulks.length > 0) 88 stackPtr = &((*MultiBulks.back).values); 89 else 90 stackPtr = &responses; 91 } 92 else 93 break; 94 } 95 } 96 97 if(buffer.length == 0 && MultiBulks.length == 0) //Make sure all the multi bulks got their data 98 { 99 debug(tinyredis) { 100 if(minResponses > 1 && responses.length < minResponses) 101 writeln("WAITING FOR MORE RESPONSES ... "); 102 } 103 104 if(responses.length < minResponses) 105 continue; 106 107 break; 108 } 109 110 } 111 112 return responses; 113 } 114 115 /* -------- EXCEPTIONS ------------- */ 116 117 class ConnectionException : Exception { 118 this(string msg) { super(msg); } 119 } 120 121 122 private : 123 124 void receive(TcpSocket conn, ref byte[] buffer) 125 { 126 byte[1024 * 16] buff; 127 size_t len = conn.receive(buff); 128 129 if(len == 0) 130 throw new ConnectionException("Server closed the connection!"); 131 else if(len == TcpSocket.ERROR) 132 throw new ConnectionException("A socket error occurred!"); 133 134 buffer ~= buff[0 .. len]; 135 debug(tinyredis) { writeln("Response : ", "'" ~ escape(cast(string)buff) ~ "'", " Length : ", len); } 136 }