2024-05-14 16:11:08 +03:00

189 lines
5.7 KiB
C

/* MQTT (over TCP) Example
This example code is in the Public Domain (or CC0 licensed, at your option.)
Unless required by applicable law or agreed to in writing, this
software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied.
*/
#include <stdio.h>
#include <inttypes.h>
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/queue.h"
#include "freertos/event_groups.h"
#include "freertos/message_buffer.h"
#include "esp_log.h"
#include "esp_event.h"
#include "lwip/dns.h"
#include "esp_mac.h"
#include "mqtt_client.h"
#include "mqtt.h"
#if CONFIG_SENDER
static const char *TAG = "SUB";
extern MessageBufferHandle_t xMessageBufferRecv;
extern size_t xItemSize;
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
#else
static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
#endif
{
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)
esp_mqtt_event_handle_t event = event_data;
MQTT_t *mqttBuf = handler_args;
#else
MQTT_t *mqttBuf = event->user_context;
#endif
ESP_LOGI(TAG, "taskHandle=0x%x", (unsigned int)mqttBuf->taskHandle);
mqttBuf->event_id = event->event_id;
switch (event->event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
xTaskNotifyGive( mqttBuf->taskHandle );
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
xTaskNotifyGive( mqttBuf->taskHandle );
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
ESP_LOGI(TAG, "TOPIC=[%.*s] DATA=[%.*s]", event->topic_len, event->topic, event->data_len, event->data);
mqttBuf->topic_len = event->topic_len;
if (mqttBuf->topic_len > sizeof(mqttBuf->topic)) {
ESP_LOGW(TAG, "topic length too big");
mqttBuf->topic_len = sizeof(mqttBuf->topic);
}
for(int i=0;i<mqttBuf->topic_len;i++) {
mqttBuf->topic[i] = event->topic[i];
}
mqttBuf->data_len = event->data_len;
if (mqttBuf->data_len > sizeof(mqttBuf->data)) {
ESP_LOGW(TAG, "payload length too big");
mqttBuf->data_len = sizeof(mqttBuf->data);
}
for(int i=0;i<mqttBuf->data_len;i++) {
mqttBuf->data[i] = event->data[i];
}
xTaskNotifyGive( mqttBuf->taskHandle );
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
xTaskNotifyGive( mqttBuf->taskHandle );
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
#if ESP_IDF_VERSION < ESP_IDF_VERSION_VAL(5, 0, 0)
return ESP_OK;
#endif
}
esp_err_t query_mdns_host(const char * host_name, char *ip);
void convert_mdns_host(char * from, char * to);
void mqtt_sub(void *pvParameters)
{
ESP_LOGI(TAG, "Start");
ESP_LOGI(TAG, "CONFIG_MQTT_BROKER=[%s]", CONFIG_MQTT_BROKER);
// Set client id from mac
uint8_t mac[8];
ESP_ERROR_CHECK(esp_base_mac_addr_get(mac));
for(int i=0;i<8;i++) {
ESP_LOGD(TAG, "mac[%d]=%x", i, mac[i]);
}
char client_id[64];
sprintf(client_id, "esp32-%02x%02x%02x%02x%02x%02x", mac[0],mac[1],mac[2],mac[3],mac[4],mac[5]);
ESP_LOGI(TAG, "client_id=[%s]", client_id);
// Resolve mDNS host name
char ip[128];
ESP_LOGI(TAG, "CONFIG_MQTT_BROKER=[%s]", CONFIG_MQTT_BROKER);
convert_mdns_host(CONFIG_MQTT_BROKER, ip);
ESP_LOGI(TAG, "ip=[%s]", ip);
char uri[138];
sprintf(uri, "mqtt://%s", ip);
ESP_LOGI(TAG, "uri=[%s]", uri);
// Initialize user context
MQTT_t mqttBuf;
mqttBuf.taskHandle = xTaskGetCurrentTaskHandle();
ESP_LOGI(TAG, "taskHandle=0x%x", (unsigned int)mqttBuf.taskHandle);
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = uri,
.broker.address.port = 1883,
#if CONFIG_BROKER_AUTHENTICATION
.credentials.username = CONFIG_AUTHENTICATION_USERNAME,
.credentials.authentication.password = CONFIG_AUTHENTICATION_PASSWORD,
#endif
.credentials.client_id = client_id
};
#else
esp_mqtt_client_config_t mqtt_cfg = {
.user_context = &mqttBuf,
.uri = uri,
.port = 1883,
.event_handle = mqtt_event_handler,
#if CONFIG_BROKER_AUTHENTICATION
.username = CONFIG_AUTHENTICATION_USERNAME,
.password = CONFIG_AUTHENTICATION_PASSWORD,
#endif
.client_id = client_id
};
#endif
esp_mqtt_client_handle_t mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)
esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID, mqtt_event_handler, &mqttBuf);
#endif
esp_mqtt_client_start(mqtt_client);
while (1) {
ulTaskNotifyTake( pdTRUE, portMAX_DELAY );
ESP_LOGI(TAG, "ulTaskNotifyTake");
ESP_LOGI(TAG, "event_id=%"PRIi32, mqttBuf.event_id);
if (mqttBuf.event_id == MQTT_EVENT_CONNECTED) {
esp_mqtt_client_subscribe(mqtt_client, CONFIG_MQTT_SUB_TOPIC, 0);
ESP_LOGI(TAG, "Subscribe to MQTT Server");
} else if (mqttBuf.event_id == MQTT_EVENT_DISCONNECTED) {
break;
} else if (mqttBuf.event_id == MQTT_EVENT_DATA) {
ESP_LOGI(TAG, "TOPIC=[%.*s] DATA=[%.*s]", mqttBuf.topic_len, mqttBuf.topic, mqttBuf.data_len, mqttBuf.data);
size_t sended = xMessageBufferSend(xMessageBufferRecv, mqttBuf.data, mqttBuf.data_len, portMAX_DELAY);
if (sended != mqttBuf.data_len) {
ESP_LOGE(TAG, "xMessageBufferSend fail");
}
} else if (mqttBuf.event_id == MQTT_EVENT_ERROR) {
break;
}
} // end while
ESP_LOGI(TAG, "Task Delete");
esp_mqtt_client_stop(mqtt_client);
vTaskDelete(NULL);
}
#endif