1 module tinyredis.connection;
2 
3 /**
4  * Authors: Adil Baig, adil.baig@aidezigns.com
5  */
6 
7 import std.socket : TcpSocket;
8 version(Windows) import core.sys.windows.winsock2: EWOULDBLOCK;
9 
10 import tinyredis.response;
11 
12 debug(tinyredis) {
13 	import std.stdio : writeln;
14 	import tinyredis.encoder : escape;
15 }
16 
17 /**
18  * Sends a pre-encoded string
19  *
20  * Params:
21  *   conn     	 = Connection to redis server.
22  *   encoded_cmd = The command to be sent.
23  *
24  * Throws: $(D ConnectionException) if sending fails.
25  */
26 void send(TcpSocket conn, string encoded_cmd)
27 {
28 	debug(tinyredis) writeln("Request : '", escape(encoded_cmd), "'");
29 
30 	if (conn.send(encoded_cmd) != encoded_cmd.length)
31 		throw new ConnectionException("Error while sending request");
32 }
33 
34 /**
35  * Receive responses from redis server
36  *
37  * Params:
38  *   conn    	  = Connection to redis server.
39  *   minResponses = The number of multibulks you expect
40  *
41  * Throws: $(D ConnectionException) if there is a socket error or server closes the connection.
42  */
43 Response[] receiveResponses(TcpSocket conn, size_t minResponses = 0)
44 {
45 	import std.array : back, popBack;
46 
47 	char[] buffer;
48 	Response[] responses;
49 
50 	Response*[] MultiBulks; //Stack of pointers to multibulks
51 	Response[]* stackPtr = &responses; // This is the stack where new elements are pushed
52 
53 	for(;;)
54 	{
55 		receive(conn, buffer);
56 
57 		while(buffer.length)
58 		{
59 			auto r = Response.parse(buffer);
60 			if(r.type == ResponseType.Invalid) // This occurs when the buffer is incomplete. Pull more
61 				break;
62 
63 			*stackPtr ~= r;
64 			if(r.type == ResponseType.MultiBulk && r.count > 0)
65 			{
66 				auto mb = &(*stackPtr)[$-1];
67 				MultiBulks ~= mb;
68 				stackPtr = &(*mb).values;
69 			}
70 
71 			while(MultiBulks.length)
72 			{
73 				auto mb = *MultiBulks.back;
74 
75 				if(mb.count != mb.values.length)
76 					break;
77 
78 				MultiBulks.popBack();
79 				stackPtr = MultiBulks.length ? &(*MultiBulks.back).values : &responses;
80 			}
81 		}
82 
83 		if(buffer.length == 0 && MultiBulks.length == 0) //Make sure all the multi bulks got their data
84 		{
85 			debug(tinyredis)
86 				if(minResponses > 1 && responses.length < minResponses)
87 					writeln("WAITING FOR MORE RESPONSES ... ");
88 
89 			if(responses.length >= minResponses)
90 				break;
91 		}
92 	}
93 
94 	return responses;
95 }
96 
97 /* -------- EXCEPTIONS ------------- */
98 
99 class ConnectionException : Exception {
100 	this(string msg) { super(msg); }
101 }
102 
103 private void receive(TcpSocket conn, ref char[] buffer)
104 {
105 	import core.stdc.errno;
106 	import std.string : format;
107 
108 	char[16 << 10] buff = void;
109 	size_t len = conn.receive(buff);
110 
111 	if (conn.blocking)
112 	{
113 		if(len == 0)
114 			throw new ConnectionException("Server closed the connection!");
115 		if(len == TcpSocket.ERROR)
116 			throw new ConnectionException("A socket error occurred!");
117 	}
118 	else if (len == -1)
119 	{
120 		if (errno != EWOULDBLOCK)
121 			throw new ConnectionException("A socket error occurred! errno: %s".format(errno));
122 
123 		len = 0;
124 		errno = 0;
125 	}
126 
127 	buffer ~= buff[0 .. len];
128 	debug(tinyredis) writeln("Response : '", escape(cast(string)buffer), "'", " Length : ", len);
129 }