1 module tinyredis.connection;
2 
3 /**
4  * Authors: Adil Baig, adil.baig@aidezigns.com
5  */
6 
7 public:
8     import std.socket : TcpSocket;
9 	    
10 private:
11     import std.array : appender, back, popBack;
12     import tinyredis.parser;
13     import tinyredis.response;
14 
15 debug(tinyredis) {
16 	import std.stdio : writeln;
17 	import tinyredis.encoder : escape;
18 }    
19 
20 public:
21 
22 	/**
23      * Sends a pre-encoded string
24      *
25      * Params:
26      *   conn     	 = Connection to redis server.
27      *   encoded_cmd = The command to be sent.
28      *
29      * Throws: $(D ConnectionException) if sending fails.
30      */
31 	void send(TcpSocket conn, string encoded_cmd)
32     {
33         debug(tinyredis) { writeln("Request : '", escape(encoded_cmd) ~ "'"); }
34         
35         auto sent = conn.send(encoded_cmd);
36         if (sent != (cast(byte[])encoded_cmd).length)
37             throw new ConnectionException("Error while sending request");
38     }
39 
40     /**
41      * Receive responses from redis server
42      *
43      * Params:
44      *   conn    	  = Connection to redis server.
45      *   minResponses = The number of multibulks you expect
46      *
47      * Throws: $(D ConnectionException) if there is a socket error or server closes the connection.
48      */
49     Response[] receiveResponses(TcpSocket conn, size_t minResponses = 0)
50     {
51         byte[] buffer;
52         Response[] responses;
53         Response*[] MultiBulks; //Stack of pointers to multibulks
54         Response[]* stackPtr = &responses;
55         
56         while(true)
57         {
58             receive(conn, buffer);
59             
60             debug(tinyredis) { writeln("BUFFER : ", escape(cast(string)buffer)); } 
61             
62             while(buffer.length > 0)
63             {
64                 auto r = parseResponse(buffer);
65                 if(r.type == ResponseType.Invalid)
66                      break;
67                
68                 *stackPtr ~= r;
69                 if(r.type == ResponseType.MultiBulk)
70                 {
71                     auto mb = &((*stackPtr)[$-1]);
72                     if(mb.count > 0)
73                     {
74                         MultiBulks ~= mb;
75                         stackPtr = &((*mb).values);
76                     }
77                 }
78                 else
79                     while(MultiBulks.length > 0)
80                     {
81                         auto mb = *(MultiBulks.back);
82                         
83                         if(mb.count == mb.values.length)
84                         {
85                             MultiBulks.popBack();
86                             
87                             if(MultiBulks.length > 0)
88                                 stackPtr = &((*MultiBulks.back).values);
89                             else
90                                 stackPtr = &responses;
91                         }
92                         else
93                             break;
94                     }
95             }
96             
97             if(buffer.length == 0 && MultiBulks.length == 0) //Make sure all the multi bulks got their data
98             {
99                 debug(tinyredis) {
100                     if(minResponses > 1 && responses.length < minResponses)
101                         writeln("WAITING FOR MORE RESPONSES ... ");
102                 }
103                     
104                 if(responses.length < minResponses)
105                     continue;
106                     
107                 break;
108             }
109                 
110         }
111         
112         return responses;
113     }
114     
115    /* -------- EXCEPTIONS ------------- */
116  
117     class ConnectionException : Exception {
118         this(string msg) { super(msg); }
119     }
120 
121 
122 private :
123     
124     void receive(TcpSocket conn, ref byte[] buffer)
125     {
126         byte[1024 * 16] buff;
127         size_t len = conn.receive(buff);
128         
129         if(len == 0)
130             throw new ConnectionException("Server closed the connection!");
131         else if(len == TcpSocket.ERROR)
132             throw new ConnectionException("A socket error occurred!");
133 
134         buffer ~= buff[0 .. len];
135         debug(tinyredis) { writeln("Response : ", "'" ~ escape(cast(string)buff) ~ "'", " Length : ", len); }
136     }