1 module tinyredis.connection; 2 3 /** 4 * Authors: Adil Baig, adil.baig@aidezigns.com 5 */ 6 7 import std.socket : TcpSocket; 8 version(Windows) import core.sys.windows.winsock2: EWOULDBLOCK; 9 10 import tinyredis.response; 11 12 debug(tinyredis) { 13 import std.stdio : writeln; 14 import tinyredis.encoder : escape; 15 } 16 17 /** 18 * Sends a pre-encoded string 19 * 20 * Params: 21 * conn = Connection to redis server. 22 * encoded_cmd = The command to be sent. 23 * 24 * Throws: $(D ConnectionException) if sending fails. 25 */ 26 void send(TcpSocket conn, string encoded_cmd) 27 { 28 debug(tinyredis) writeln("Request : '", escape(encoded_cmd), "'"); 29 30 if (conn.send(encoded_cmd) != encoded_cmd.length) 31 throw new ConnectionException("Error while sending request"); 32 } 33 34 /** 35 * Receive responses from redis server 36 * 37 * Params: 38 * conn = Connection to redis server. 39 * minResponses = The number of multibulks you expect 40 * 41 * Throws: $(D ConnectionException) if there is a socket error or server closes the connection. 42 */ 43 Response[] receiveResponses(TcpSocket conn, size_t minResponses = 0) 44 { 45 import std.array : back, popBack; 46 47 char[] buffer; 48 Response[] responses; 49 50 Response*[] MultiBulks; //Stack of pointers to multibulks 51 Response[]* stackPtr = &responses; // This is the stack where new elements are pushed 52 53 for(;;) 54 { 55 receive(conn, buffer); 56 57 while(buffer.length) 58 { 59 auto r = Response.parse(buffer); 60 if(r.type == ResponseType.Invalid) // This occurs when the buffer is incomplete. Pull more 61 break; 62 63 *stackPtr ~= r; 64 if(r.type == ResponseType.MultiBulk && r.count > 0) 65 { 66 auto mb = &(*stackPtr)[$-1]; 67 MultiBulks ~= mb; 68 stackPtr = &(*mb).values; 69 } 70 71 while(MultiBulks.length) 72 { 73 auto mb = *MultiBulks.back; 74 75 if(mb.count != mb.values.length) 76 break; 77 78 MultiBulks.popBack(); 79 stackPtr = MultiBulks.length ? &(*MultiBulks.back).values : &responses; 80 } 81 } 82 83 if(buffer.length == 0 && MultiBulks.length == 0) //Make sure all the multi bulks got their data 84 { 85 debug(tinyredis) 86 if(minResponses > 1 && responses.length < minResponses) 87 writeln("WAITING FOR MORE RESPONSES ... "); 88 89 if(responses.length >= minResponses) 90 break; 91 } 92 } 93 94 return responses; 95 } 96 97 /* -------- EXCEPTIONS ------------- */ 98 99 class ConnectionException : Exception { 100 this(string msg) { super(msg); } 101 } 102 103 private void receive(TcpSocket conn, ref char[] buffer) 104 { 105 import core.stdc.errno; 106 import std.string : format; 107 108 char[16 << 10] buff = void; 109 size_t len = conn.receive(buff); 110 111 if (conn.blocking) 112 { 113 if(len == 0) 114 throw new ConnectionException("Server closed the connection!"); 115 if(len == TcpSocket.ERROR) 116 throw new ConnectionException("A socket error occurred!"); 117 } 118 else if (len == -1) 119 { 120 if (errno != EWOULDBLOCK) 121 throw new ConnectionException("A socket error occurred! errno: %s".format(errno)); 122 123 len = 0; 124 errno = 0; 125 } 126 127 buffer ~= buff[0 .. len]; 128 debug(tinyredis) writeln("Response : '", escape(cast(string)buffer), "'", " Length : ", len); 129 }