| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- /*
- Asynchronous WebServer library for Espressif MCUs
- Copyright (c) 2016 Hristo Gochkov. All rights reserved.
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- */
- #include "Arduino.h"
- #include "AsyncEventSource.h"
- static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect){
- String ev = "";
- if(reconnect){
- ev += "retry: ";
- ev += String(reconnect);
- ev += "\r\n";
- }
- if(id){
- ev += "id: ";
- ev += String(id);
- ev += "\r\n";
- }
- if(event != NULL){
- ev += "event: ";
- ev += String(event);
- ev += "\r\n";
- }
- if(message != NULL){
- size_t messageLen = strlen(message);
- char * lineStart = (char *)message;
- char * lineEnd;
- do {
- char * nextN = strchr(lineStart, '\n');
- char * nextR = strchr(lineStart, '\r');
- if(nextN == NULL && nextR == NULL){
- size_t llen = ((char *)message + messageLen) - lineStart;
- char * ldata = (char *)malloc(llen+1);
- if(ldata != NULL){
- memcpy(ldata, lineStart, llen);
- ldata[llen] = 0;
- ev += "data: ";
- ev += ldata;
- ev += "\r\n\r\n";
- free(ldata);
- }
- lineStart = (char *)message + messageLen;
- } else {
- char * nextLine = NULL;
- if(nextN != NULL && nextR != NULL){
- if(nextR < nextN){
- lineEnd = nextR;
- if(nextN == (nextR + 1))
- nextLine = nextN + 1;
- else
- nextLine = nextR + 1;
- } else {
- lineEnd = nextN;
- if(nextR == (nextN + 1))
- nextLine = nextR + 1;
- else
- nextLine = nextN + 1;
- }
- } else if(nextN != NULL){
- lineEnd = nextN;
- nextLine = nextN + 1;
- } else {
- lineEnd = nextR;
- nextLine = nextR + 1;
- }
- size_t llen = lineEnd - lineStart;
- char * ldata = (char *)malloc(llen+1);
- if(ldata != NULL){
- memcpy(ldata, lineStart, llen);
- ldata[llen] = 0;
- ev += "data: ";
- ev += ldata;
- ev += "\r\n";
- free(ldata);
- }
- lineStart = nextLine;
- if(lineStart == ((char *)message + messageLen))
- ev += "\r\n";
- }
- } while(lineStart < ((char *)message + messageLen));
- }
- return ev;
- }
- // Message
- AsyncEventSourceMessage::AsyncEventSourceMessage(const char * data, size_t len)
- : _data(nullptr), _len(len), _sent(0), _acked(0)
- {
- _data = (uint8_t*)malloc(_len+1);
- if(_data == nullptr){
- _len = 0;
- } else {
- memcpy(_data, data, len);
- _data[_len] = 0;
- }
- }
- AsyncEventSourceMessage::~AsyncEventSourceMessage() {
- if(_data != NULL)
- free(_data);
- }
- size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) {
- (void)time;
- // If the whole message is now acked...
- if(_acked + len > _len){
- // Return the number of extra bytes acked (they will be carried on to the next message)
- const size_t extra = _acked + len - _len;
- _acked = _len;
- return extra;
- }
- // Return that no extra bytes left.
- _acked += len;
- return 0;
- }
- size_t AsyncEventSourceMessage::send(AsyncClient *client) {
- const size_t len = _len - _sent;
- if(client->space() < len){
- return 0;
- }
- size_t sent = client->add((const char *)_data, len);
- if(client->canSend())
- client->send();
- _sent += sent;
- return sent;
- }
- // Client
- AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server)
- : _messageQueue(AlternativeLinkedList<AsyncEventSourceMessage *>([](AsyncEventSourceMessage *m){ delete m; }))
- {
- _client = request->client();
- _server = server;
- _lastId = 0;
- if(request->hasHeader("Last-Event-ID"))
- _lastId = atoi(request->getHeader("Last-Event-ID")->value().c_str());
-
- _client->setRxTimeout(0);
- _client->onError(NULL, NULL);
- _client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ (void)c; ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this);
- _client->onPoll([](void *r, AsyncClient* c){ (void)c; ((AsyncEventSourceClient*)(r))->_onPoll(); }, this);
- _client->onData(NULL, NULL);
- _client->onTimeout([this](void *r, AsyncClient* c __attribute__((unused)), uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this);
- _client->onDisconnect([this](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); delete c; }, this);
- _server->_addClient(this);
- delete request;
- }
- AsyncEventSourceClient::~AsyncEventSourceClient(){
- _messageQueue.free();
- close();
- }
- void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage){
- if(dataMessage == NULL)
- return;
- if(!connected()){
- delete dataMessage;
- return;
- }
- if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
- ets_printf("ERROR: Too many messages queued\n");
- delete dataMessage;
- } else {
- _messageQueue.add(dataMessage);
- }
- if(_client->canSend())
- _runQueue();
- }
- void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
- while(len && !_messageQueue.isEmpty()){
- len = _messageQueue.front()->ack(len, time);
- if(_messageQueue.front()->finished())
- _messageQueue.remove(_messageQueue.front());
- }
- _runQueue();
- }
- void AsyncEventSourceClient::_onPoll(){
- if(!_messageQueue.isEmpty()){
- _runQueue();
- }
- }
- void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
- _client->close(true);
- }
- void AsyncEventSourceClient::_onDisconnect(){
- _client = NULL;
- _server->_handleDisconnect(this);
- }
- void AsyncEventSourceClient::close(){
- if(_client != NULL)
- _client->close();
- }
- void AsyncEventSourceClient::write(const char * message, size_t len){
- _queueMessage(new AsyncEventSourceMessage(message, len));
- }
- void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
- String ev = generateEventMessage(message, event, id, reconnect);
- _queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
- }
- void AsyncEventSourceClient::_runQueue(){
- while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){
- _messageQueue.remove(_messageQueue.front());
- }
- for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i)
- {
- if(!(*i)->sent())
- (*i)->send(_client);
- }
- }
- // Handler
- AsyncEventSource::AsyncEventSource(const String& url)
- : _url(url)
- , _clients(AlternativeLinkedList<AsyncEventSourceClient *>([](AsyncEventSourceClient *c){ delete c; }))
- , _connectcb(NULL)
- {}
- AsyncEventSource::~AsyncEventSource(){
- close();
- }
- void AsyncEventSource::onConnect(ArEventHandlerFunction cb){
- _connectcb = cb;
- }
- void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
- /*char * temp = (char *)malloc(2054);
- if(temp != NULL){
- memset(temp+1,' ',2048);
- temp[0] = ':';
- temp[2049] = '\r';
- temp[2050] = '\n';
- temp[2051] = '\r';
- temp[2052] = '\n';
- temp[2053] = 0;
- client->write((const char *)temp, 2053);
- free(temp);
- }*/
-
- _clients.add(client);
- if(_connectcb)
- _connectcb(client);
- }
- void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
- _clients.remove(client);
- }
- void AsyncEventSource::close(){
- for(const auto &c: _clients){
- if(c->connected())
- c->close();
- }
- }
- // pmb fix
- size_t AsyncEventSource::avgPacketsWaiting() const {
- if(_clients.isEmpty())
- return 0;
-
- size_t aql=0;
- uint32_t nConnectedClients=0;
-
- for(const auto &c: _clients){
- if(c->connected()) {
- aql+=c->packetsWaiting();
- ++nConnectedClients;
- }
- }
- // return aql / nConnectedClients;
- return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up
- }
- void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
- String ev = generateEventMessage(message, event, id, reconnect);
- for(const auto &c: _clients){
- if(c->connected()) {
- c->write(ev.c_str(), ev.length());
- }
- }
- }
- size_t AsyncEventSource::count() const {
- return _clients.count_if([](AsyncEventSourceClient *c){
- return c->connected();
- });
- }
- bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){
- if(request->method() != HTTP_GET || !request->url().equals(_url)) {
- return false;
- }
- request->addInterestingHeader("Last-Event-ID");
- return true;
- }
- void AsyncEventSource::handleRequest(AsyncWebServerRequest *request){
- if((_username != "" && _password != "") && !request->authenticate(_username.c_str(), _password.c_str()))
- return request->requestAuthentication();
- request->send(new AsyncEventSourceResponse(this));
- }
- // Response
- AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server){
- _server = server;
- _code = 200;
- _contentType = "text/event-stream";
- _sendContentLength = false;
- addHeader("Cache-Control", "no-cache");
- addHeader("Connection","keep-alive");
- }
- void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request){
- String out = _assembleHead(request->version());
- request->client()->write(out.c_str(), _headLength);
- _state = RESPONSE_WAIT_ACK;
- }
- size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time __attribute__((unused))){
- if(len){
- new AsyncEventSourceClient(request, _server);
- }
- return 0;
- }
|