MQTTClient.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. /*******************************************************************************
  2. * Copyright (c) 2014, 2015 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * http://www.eclipse.org/legal/epl-v10.html
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
  15. *******************************************************************************/
  16. #include "MQTTClient.h"
  17. static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
  18. md->topicName = aTopicName;
  19. md->message = aMessage;
  20. }
  21. static int getNextPacketId(MQTTClient *c) {
  22. return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
  23. }
  24. static int sendPacket(MQTTClient* c, int length, Timer* timer)
  25. {
  26. int rc = FAILURE,
  27. sent = 0;
  28. while (sent < length && !TimerIsExpired(timer))
  29. {
  30. rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
  31. if (rc < 0) // there was an error writing the data
  32. break;
  33. sent += rc;
  34. }
  35. if (sent == length)
  36. {
  37. TimerCountdown(&c->ping_timer, c->keepAliveInterval); // record the fact that we have successfully sent the packet
  38. rc = SUCCESSS;
  39. }
  40. else
  41. rc = FAILURE;
  42. return rc;
  43. }
  44. void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
  45. unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
  46. {
  47. int i;
  48. c->ipstack = network;
  49. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  50. c->messageHandlers[i].topicFilter = 0;
  51. c->command_timeout_ms = command_timeout_ms;
  52. c->buf = sendbuf;
  53. c->buf_size = sendbuf_size;
  54. c->readbuf = readbuf;
  55. c->readbuf_size = readbuf_size;
  56. c->isconnected = 0;
  57. c->ping_outstanding = 0;
  58. c->defaultMessageHandler = NULL;
  59. c->next_packetid = 1;
  60. TimerInit(&c->ping_timer);
  61. #if defined(MQTT_TASK)
  62. MutexInit(&c->mutex);
  63. #endif
  64. }
  65. static int decodePacket(MQTTClient* c, int* value, int timeout)
  66. {
  67. unsigned char i;
  68. int multiplier = 1;
  69. int len = 0;
  70. const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
  71. *value = 0;
  72. do
  73. {
  74. int rc = MQTTPACKET_READ_ERROR;
  75. if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
  76. {
  77. rc = MQTTPACKET_READ_ERROR; /* bad data */
  78. goto exit;
  79. }
  80. rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
  81. if (rc != 1)
  82. goto exit;
  83. *value += (i & 127) * multiplier;
  84. multiplier *= 128;
  85. } while ((i & 128) != 0);
  86. exit:
  87. return len;
  88. }
  89. static int readPacket(MQTTClient* c, Timer* timer)
  90. {
  91. int rc = FAILURE;
  92. MQTTHeader header = {0};
  93. int len = 0;
  94. int rem_len = 0;
  95. /* 1. read the header byte. This has the packet type in it */
  96. if (c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer)) != 1)
  97. goto exit;
  98. len = 1;
  99. /* 2. read the remaining length. This is variable in itself */
  100. decodePacket(c, &rem_len, TimerLeftMS(timer));
  101. len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
  102. /* 3. read the rest of the buffer using a callback to supply the rest of the data */
  103. if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len))
  104. goto exit;
  105. header.byte = c->readbuf[0];
  106. rc = header.bits.type;
  107. exit:
  108. return rc;
  109. }
  110. // assume topic filter and name is in correct format
  111. // # can only be at end
  112. // + and # can only be next to separator
  113. static char isTopicMatched(char* topicFilter, MQTTString* topicName)
  114. {
  115. char* curf = topicFilter;
  116. char* curn = topicName->lenstring.data;
  117. char* curn_end = curn + topicName->lenstring.len;
  118. while (*curf && curn < curn_end)
  119. {
  120. if (*curn == '/' && *curf != '/')
  121. break;
  122. if (*curf != '+' && *curf != '#' && *curf != *curn)
  123. break;
  124. if (*curf == '+')
  125. { // skip until we meet the next separator, or end of string
  126. char* nextpos = curn + 1;
  127. while (nextpos < curn_end && *nextpos != '/')
  128. nextpos = ++curn + 1;
  129. }
  130. else if (*curf == '#')
  131. curn = curn_end - 1; // skip until end of string
  132. curf++;
  133. curn++;
  134. };
  135. return (curn == curn_end) && (*curf == '\0');
  136. }
  137. int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
  138. {
  139. int i;
  140. int rc = FAILURE;
  141. // we have to find the right message handler - indexed by topic
  142. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  143. {
  144. if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
  145. isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
  146. {
  147. if (c->messageHandlers[i].fp != NULL)
  148. {
  149. MessageData md;
  150. NewMessageData(&md, topicName, message);
  151. c->messageHandlers[i].fp(&md);
  152. rc = SUCCESSS;
  153. }
  154. }
  155. }
  156. if (rc == FAILURE && c->defaultMessageHandler != NULL)
  157. {
  158. MessageData md;
  159. NewMessageData(&md, topicName, message);
  160. c->defaultMessageHandler(&md);
  161. rc = SUCCESSS;
  162. }
  163. return rc;
  164. }
  165. int keepalive(MQTTClient* c)
  166. {
  167. int rc = FAILURE;
  168. if (c->keepAliveInterval == 0)
  169. {
  170. rc = SUCCESSS;
  171. goto exit;
  172. }
  173. if (TimerIsExpired(&c->ping_timer))
  174. {
  175. if (!c->ping_outstanding)
  176. {
  177. Timer timer;
  178. TimerInit(&timer);
  179. TimerCountdownMS(&timer, 1000);
  180. int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
  181. if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESSS) // send the ping packet
  182. c->ping_outstanding = 1;
  183. }
  184. }
  185. exit:
  186. return rc;
  187. }
  188. int cycle(MQTTClient* c, Timer* timer)
  189. {
  190. // read the socket, see what work is due
  191. unsigned short packet_type = readPacket(c, timer);
  192. int len = 0,
  193. rc = SUCCESSS;
  194. switch (packet_type)
  195. {
  196. case CONNACK:
  197. case PUBACK:
  198. case SUBACK:
  199. break;
  200. case PUBLISH:
  201. {
  202. MQTTString topicName;
  203. MQTTMessage msg;
  204. int intQoS;
  205. if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
  206. (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
  207. goto exit;
  208. msg.qos = (enum QoS)intQoS;
  209. deliverMessage(c, &topicName, &msg);
  210. if (msg.qos != QOS0)
  211. {
  212. if (msg.qos == QOS1)
  213. len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
  214. else if (msg.qos == QOS2)
  215. len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
  216. if (len <= 0)
  217. rc = FAILURE;
  218. else
  219. rc = sendPacket(c, len, timer);
  220. if (rc == FAILURE)
  221. goto exit; // there was a problem
  222. }
  223. break;
  224. }
  225. case PUBREC:
  226. {
  227. unsigned short mypacketid;
  228. unsigned char dup, type;
  229. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
  230. rc = FAILURE;
  231. else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREL, 0, mypacketid)) <= 0)
  232. rc = FAILURE;
  233. else if ((rc = sendPacket(c, len, timer)) != SUCCESSS) // send the PUBREL packet
  234. rc = FAILURE; // there was a problem
  235. if (rc == FAILURE)
  236. goto exit; // there was a problem
  237. break;
  238. }
  239. case PUBCOMP:
  240. break;
  241. case PINGRESP:
  242. c->ping_outstanding = 0;
  243. break;
  244. }
  245. keepalive(c);
  246. exit:
  247. if (rc == SUCCESSS)
  248. rc = packet_type;
  249. return rc;
  250. }
  251. int MQTTYield(MQTTClient* c, int timeout_ms)
  252. {
  253. int rc = SUCCESSS;
  254. Timer timer;
  255. TimerInit(&timer);
  256. TimerCountdownMS(&timer, timeout_ms);
  257. if (cycle(c, &timer) == FAILURE)
  258. {
  259. rc = FAILURE;
  260. }
  261. return rc;
  262. }
  263. void MQTTRun(void* parm)
  264. {
  265. Timer timer;
  266. MQTTClient* c = (MQTTClient*)parm;
  267. TimerInit(&timer);
  268. while (1)
  269. {
  270. #if defined(MQTT_TASK)
  271. MutexLock(&c->mutex);
  272. #endif
  273. TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */
  274. cycle(c, &timer);
  275. #if defined(MQTT_TASK)
  276. MutexUnlock(&c->mutex);
  277. #endif
  278. }
  279. }
  280. #if defined(MQTT_TASK)
  281. int MQTTStartTask(MQTTClient* client)
  282. {
  283. return ThreadStart(&client->thread, &MQTTRun, client);
  284. }
  285. #endif
  286. int waitfor(MQTTClient* c, int packet_type, Timer* timer)
  287. {
  288. int rc = FAILURE;
  289. do
  290. {
  291. if (TimerIsExpired(timer))
  292. break; // we timed out
  293. }
  294. while ((rc = cycle(c, timer)) != packet_type);
  295. return rc;
  296. }
  297. int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
  298. {
  299. Timer connect_timer;
  300. int rc = FAILURE;
  301. MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
  302. int len = 0;
  303. #if defined(MQTT_TASK)
  304. MutexLock(&c->mutex);
  305. #endif
  306. if (c->isconnected) /* don't send connect packet again if we are already connected */
  307. goto exit;
  308. TimerInit(&connect_timer);
  309. TimerCountdownMS(&connect_timer, c->command_timeout_ms);
  310. if (options == 0)
  311. options = &default_options; /* set default options if none were supplied */
  312. c->keepAliveInterval = options->keepAliveInterval;
  313. TimerCountdown(&c->ping_timer, c->keepAliveInterval);
  314. if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
  315. goto exit;
  316. if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESSS) // send the connect packet
  317. goto exit; // there was a problem
  318. // this will be a blocking call, wait for the connack
  319. if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
  320. {
  321. unsigned char connack_rc = 255;
  322. unsigned char sessionPresent = 0;
  323. if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, c->readbuf, c->readbuf_size) == 1)
  324. rc = connack_rc;
  325. else
  326. rc = FAILURE;
  327. }
  328. else
  329. rc = FAILURE;
  330. exit:
  331. if (rc == SUCCESSS)
  332. c->isconnected = 1;
  333. #if defined(MQTT_TASK)
  334. MutexUnlock(&c->mutex);
  335. #endif
  336. return rc;
  337. }
  338. int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
  339. {
  340. int rc = FAILURE;
  341. Timer timer;
  342. int len = 0;
  343. MQTTString topic = MQTTString_initializer;
  344. topic.cstring = (char *)topicFilter;
  345. // This was added because enum QoS was previously typed to *int which resulted in HardFault and unaligned integer read.
  346. // This coping below makes sure the parameter for MQTTSerialize_subscribe is always char no matter what compiler is using for enums
  347. char charQos = (char)qos;
  348. #if defined(MQTT_TASK)
  349. MutexLock(&c->mutex);
  350. #endif
  351. if (!c->isconnected)
  352. goto exit;
  353. TimerInit(&timer);
  354. TimerCountdownMS(&timer, c->command_timeout_ms);
  355. len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, &charQos);
  356. if (len <= 0)
  357. goto exit;
  358. if ((rc = sendPacket(c, len, &timer)) != SUCCESSS) // send the subscribe packet
  359. goto exit; // there was a problem
  360. if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
  361. {
  362. int count = 0, grantedQoS = -1;
  363. unsigned short mypacketid;
  364. if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size) == 1)
  365. rc = grantedQoS; // 0, 1, 2 or 0x80
  366. if (rc != 0x80)
  367. {
  368. int i;
  369. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  370. {
  371. if (c->messageHandlers[i].topicFilter == 0)
  372. {
  373. c->messageHandlers[i].topicFilter = topicFilter;
  374. c->messageHandlers[i].fp = messageHandler;
  375. rc = 0;
  376. break;
  377. }
  378. }
  379. }
  380. }
  381. else
  382. rc = FAILURE;
  383. exit:
  384. #if defined(MQTT_TASK)
  385. MutexUnlock(&c->mutex);
  386. #endif
  387. return rc;
  388. }
  389. int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
  390. {
  391. int rc = FAILURE;
  392. Timer timer;
  393. MQTTString topic = MQTTString_initializer;
  394. topic.cstring = (char *)topicFilter;
  395. int len = 0;
  396. #if defined(MQTT_TASK)
  397. MutexLock(&c->mutex);
  398. #endif
  399. if (!c->isconnected)
  400. goto exit;
  401. TimerInit(&timer);
  402. TimerCountdownMS(&timer, c->command_timeout_ms);
  403. if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
  404. goto exit;
  405. if ((rc = sendPacket(c, len, &timer)) != SUCCESSS) // send the subscribe packet
  406. goto exit; // there was a problem
  407. if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
  408. {
  409. unsigned short mypacketid; // should be the same as the packetid above
  410. if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
  411. rc = 0;
  412. }
  413. else
  414. rc = FAILURE;
  415. exit:
  416. #if defined(MQTT_TASK)
  417. MutexUnlock(&c->mutex);
  418. #endif
  419. return rc;
  420. }
  421. int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
  422. {
  423. int rc = FAILURE;
  424. Timer timer;
  425. MQTTString topic = MQTTString_initializer;
  426. topic.cstring = (char *)topicName;
  427. int len = 0;
  428. #if defined(MQTT_TASK)
  429. MutexLock(&c->mutex);
  430. #endif
  431. if (!c->isconnected)
  432. goto exit;
  433. TimerInit(&timer);
  434. TimerCountdownMS(&timer, c->command_timeout_ms);
  435. if (message->qos == QOS1 || message->qos == QOS2)
  436. message->id = getNextPacketId(c);
  437. len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
  438. topic, (unsigned char*)message->payload, message->payloadlen);
  439. if (len <= 0)
  440. goto exit;
  441. if ((rc = sendPacket(c, len, &timer)) != SUCCESSS) // send the subscribe packet
  442. goto exit; // there was a problem
  443. if (message->qos == QOS1)
  444. {
  445. if (waitfor(c, PUBACK, &timer) == PUBACK)
  446. {
  447. unsigned short mypacketid;
  448. unsigned char dup, type;
  449. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
  450. rc = FAILURE;
  451. }
  452. else
  453. rc = FAILURE;
  454. }
  455. else if (message->qos == QOS2)
  456. {
  457. if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
  458. {
  459. unsigned short mypacketid;
  460. unsigned char dup, type;
  461. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
  462. rc = FAILURE;
  463. }
  464. else
  465. rc = FAILURE;
  466. }
  467. exit:
  468. #if defined(MQTT_TASK)
  469. MutexUnlock(&c->mutex);
  470. #endif
  471. return rc;
  472. }
  473. int MQTTDisconnect(MQTTClient* c)
  474. {
  475. int rc = FAILURE;
  476. Timer timer; // we might wait for incomplete incoming publishes to complete
  477. int len = 0;
  478. #if defined(MQTT_TASK)
  479. MutexLock(&c->mutex);
  480. #endif
  481. TimerInit(&timer);
  482. TimerCountdownMS(&timer, c->command_timeout_ms);
  483. len = MQTTSerialize_disconnect(c->buf, c->buf_size);
  484. if (len > 0)
  485. rc = sendPacket(c, len, &timer); // send the disconnect packet
  486. c->isconnected = 0;
  487. #if defined(MQTT_TASK)
  488. MutexUnlock(&c->mutex);
  489. #endif
  490. return rc;
  491. }