AsyncEventSource.cpp 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. /*
  2. Asynchronous WebServer library for Espressif MCUs
  3. Copyright (c) 2016 Hristo Gochkov. All rights reserved.
  4. This library is free software; you can redistribute it and/or
  5. modify it under the terms of the GNU Lesser General Public
  6. License as published by the Free Software Foundation; either
  7. version 2.1 of the License, or (at your option) any later version.
  8. This library is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public
  13. License along with this library; if not, write to the Free Software
  14. Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  15. */
  16. #include "Arduino.h"
  17. #include "AsyncEventSource.h"
  18. static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect){
  19. String ev = "";
  20. if(reconnect){
  21. ev += "retry: ";
  22. ev += String(reconnect);
  23. ev += "\r\n";
  24. }
  25. if(id){
  26. ev += "id: ";
  27. ev += String(id);
  28. ev += "\r\n";
  29. }
  30. if(event != NULL){
  31. ev += "event: ";
  32. ev += String(event);
  33. ev += "\r\n";
  34. }
  35. if(message != NULL){
  36. size_t messageLen = strlen(message);
  37. char * lineStart = (char *)message;
  38. char * lineEnd;
  39. do {
  40. char * nextN = strchr(lineStart, '\n');
  41. char * nextR = strchr(lineStart, '\r');
  42. if(nextN == NULL && nextR == NULL){
  43. size_t llen = ((char *)message + messageLen) - lineStart;
  44. char * ldata = (char *)malloc(llen+1);
  45. if(ldata != NULL){
  46. memcpy(ldata, lineStart, llen);
  47. ldata[llen] = 0;
  48. ev += "data: ";
  49. ev += ldata;
  50. ev += "\r\n\r\n";
  51. free(ldata);
  52. }
  53. lineStart = (char *)message + messageLen;
  54. } else {
  55. char * nextLine = NULL;
  56. if(nextN != NULL && nextR != NULL){
  57. if(nextR < nextN){
  58. lineEnd = nextR;
  59. if(nextN == (nextR + 1))
  60. nextLine = nextN + 1;
  61. else
  62. nextLine = nextR + 1;
  63. } else {
  64. lineEnd = nextN;
  65. if(nextR == (nextN + 1))
  66. nextLine = nextR + 1;
  67. else
  68. nextLine = nextN + 1;
  69. }
  70. } else if(nextN != NULL){
  71. lineEnd = nextN;
  72. nextLine = nextN + 1;
  73. } else {
  74. lineEnd = nextR;
  75. nextLine = nextR + 1;
  76. }
  77. size_t llen = lineEnd - lineStart;
  78. char * ldata = (char *)malloc(llen+1);
  79. if(ldata != NULL){
  80. memcpy(ldata, lineStart, llen);
  81. ldata[llen] = 0;
  82. ev += "data: ";
  83. ev += ldata;
  84. ev += "\r\n";
  85. free(ldata);
  86. }
  87. lineStart = nextLine;
  88. if(lineStart == ((char *)message + messageLen))
  89. ev += "\r\n";
  90. }
  91. } while(lineStart < ((char *)message + messageLen));
  92. }
  93. return ev;
  94. }
  95. // Message
  96. AsyncEventSourceMessage::AsyncEventSourceMessage(const char * data, size_t len)
  97. : _data(nullptr), _len(len), _sent(0), _acked(0)
  98. {
  99. _data = (uint8_t*)malloc(_len+1);
  100. if(_data == nullptr){
  101. _len = 0;
  102. } else {
  103. memcpy(_data, data, len);
  104. _data[_len] = 0;
  105. }
  106. }
  107. AsyncEventSourceMessage::~AsyncEventSourceMessage() {
  108. if(_data != NULL)
  109. free(_data);
  110. }
  111. size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) {
  112. (void)time;
  113. // If the whole message is now acked...
  114. if(_acked + len > _len){
  115. // Return the number of extra bytes acked (they will be carried on to the next message)
  116. const size_t extra = _acked + len - _len;
  117. _acked = _len;
  118. return extra;
  119. }
  120. // Return that no extra bytes left.
  121. _acked += len;
  122. return 0;
  123. }
  124. size_t AsyncEventSourceMessage::send(AsyncClient *client) {
  125. const size_t len = _len - _sent;
  126. if(client->space() < len){
  127. return 0;
  128. }
  129. size_t sent = client->add((const char *)_data, len);
  130. if(client->canSend())
  131. client->send();
  132. _sent += sent;
  133. return sent;
  134. }
  135. // Client
  136. AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server)
  137. : _messageQueue(AlternativeLinkedList<AsyncEventSourceMessage *>([](AsyncEventSourceMessage *m){ delete m; }))
  138. {
  139. _client = request->client();
  140. _server = server;
  141. _lastId = 0;
  142. if(request->hasHeader("Last-Event-ID"))
  143. _lastId = atoi(request->getHeader("Last-Event-ID")->value().c_str());
  144. _client->setRxTimeout(0);
  145. _client->onError(NULL, NULL);
  146. _client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ (void)c; ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this);
  147. _client->onPoll([](void *r, AsyncClient* c){ (void)c; ((AsyncEventSourceClient*)(r))->_onPoll(); }, this);
  148. _client->onData(NULL, NULL);
  149. _client->onTimeout([this](void *r, AsyncClient* c __attribute__((unused)), uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this);
  150. _client->onDisconnect([this](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); delete c; }, this);
  151. _server->_addClient(this);
  152. delete request;
  153. }
  154. AsyncEventSourceClient::~AsyncEventSourceClient(){
  155. _messageQueue.free();
  156. close();
  157. }
  158. void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage){
  159. if(dataMessage == NULL)
  160. return;
  161. if(!connected()){
  162. delete dataMessage;
  163. return;
  164. }
  165. if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
  166. ets_printf("ERROR: Too many messages queued\n");
  167. delete dataMessage;
  168. } else {
  169. _messageQueue.add(dataMessage);
  170. }
  171. if(_client->canSend())
  172. _runQueue();
  173. }
  174. void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
  175. while(len && !_messageQueue.isEmpty()){
  176. len = _messageQueue.front()->ack(len, time);
  177. if(_messageQueue.front()->finished())
  178. _messageQueue.remove(_messageQueue.front());
  179. }
  180. _runQueue();
  181. }
  182. void AsyncEventSourceClient::_onPoll(){
  183. if(!_messageQueue.isEmpty()){
  184. _runQueue();
  185. }
  186. }
  187. void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
  188. _client->close(true);
  189. }
  190. void AsyncEventSourceClient::_onDisconnect(){
  191. _client = NULL;
  192. _server->_handleDisconnect(this);
  193. }
  194. void AsyncEventSourceClient::close(){
  195. if(_client != NULL)
  196. _client->close();
  197. }
  198. void AsyncEventSourceClient::write(const char * message, size_t len){
  199. _queueMessage(new AsyncEventSourceMessage(message, len));
  200. }
  201. void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
  202. String ev = generateEventMessage(message, event, id, reconnect);
  203. _queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
  204. }
  205. void AsyncEventSourceClient::_runQueue(){
  206. while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){
  207. _messageQueue.remove(_messageQueue.front());
  208. }
  209. for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i)
  210. {
  211. if(!(*i)->sent())
  212. (*i)->send(_client);
  213. }
  214. }
  215. // Handler
  216. AsyncEventSource::AsyncEventSource(const String& url)
  217. : _url(url)
  218. , _clients(AlternativeLinkedList<AsyncEventSourceClient *>([](AsyncEventSourceClient *c){ delete c; }))
  219. , _connectcb(NULL)
  220. {}
  221. AsyncEventSource::~AsyncEventSource(){
  222. close();
  223. }
  224. void AsyncEventSource::onConnect(ArEventHandlerFunction cb){
  225. _connectcb = cb;
  226. }
  227. void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
  228. /*char * temp = (char *)malloc(2054);
  229. if(temp != NULL){
  230. memset(temp+1,' ',2048);
  231. temp[0] = ':';
  232. temp[2049] = '\r';
  233. temp[2050] = '\n';
  234. temp[2051] = '\r';
  235. temp[2052] = '\n';
  236. temp[2053] = 0;
  237. client->write((const char *)temp, 2053);
  238. free(temp);
  239. }*/
  240. _clients.add(client);
  241. if(_connectcb)
  242. _connectcb(client);
  243. }
  244. void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
  245. _clients.remove(client);
  246. }
  247. void AsyncEventSource::close(){
  248. for(const auto &c: _clients){
  249. if(c->connected())
  250. c->close();
  251. }
  252. }
  253. // pmb fix
  254. size_t AsyncEventSource::avgPacketsWaiting() const {
  255. if(_clients.isEmpty())
  256. return 0;
  257. size_t aql=0;
  258. uint32_t nConnectedClients=0;
  259. for(const auto &c: _clients){
  260. if(c->connected()) {
  261. aql+=c->packetsWaiting();
  262. ++nConnectedClients;
  263. }
  264. }
  265. // return aql / nConnectedClients;
  266. return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up
  267. }
  268. void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
  269. String ev = generateEventMessage(message, event, id, reconnect);
  270. for(const auto &c: _clients){
  271. if(c->connected()) {
  272. c->write(ev.c_str(), ev.length());
  273. }
  274. }
  275. }
  276. size_t AsyncEventSource::count() const {
  277. return _clients.count_if([](AsyncEventSourceClient *c){
  278. return c->connected();
  279. });
  280. }
  281. bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){
  282. if(request->method() != HTTP_GET || !request->url().equals(_url)) {
  283. return false;
  284. }
  285. request->addInterestingHeader("Last-Event-ID");
  286. return true;
  287. }
  288. void AsyncEventSource::handleRequest(AsyncWebServerRequest *request){
  289. if((_username != "" && _password != "") && !request->authenticate(_username.c_str(), _password.c_str()))
  290. return request->requestAuthentication();
  291. request->send(new AsyncEventSourceResponse(this));
  292. }
  293. // Response
  294. AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server){
  295. _server = server;
  296. _code = 200;
  297. _contentType = "text/event-stream";
  298. _sendContentLength = false;
  299. addHeader("Cache-Control", "no-cache");
  300. addHeader("Connection","keep-alive");
  301. }
  302. void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request){
  303. String out = _assembleHead(request->version());
  304. request->client()->write(out.c_str(), _headLength);
  305. _state = RESPONSE_WAIT_ACK;
  306. }
  307. size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time __attribute__((unused))){
  308. if(len){
  309. new AsyncEventSourceClient(request, _server);
  310. }
  311. return 0;
  312. }