reactor-c
C Runtime for Lingua Franca
Loading...
Searching...
No Matches
influxdb.h
Go to the documentation of this file.
1#include <sys/types.h>
2#include <sys/socket.h>
3#include <netinet/in.h>
4#include <arpa/inet.h>
5#include <sys/uio.h>
6#include <netdb.h>
7#include <stdarg.h>
8#include <string.h>
9#include <stdio.h>
10#include <unistd.h>
11#include <curl/curl.h>
12
13/*
14 Usage:
15 send_udp/post_http(c,
16 INFLUX_MEAS("foo"),
17 INFLUX_TAG("k", "v"), INFLUX_TAG("k2", "v2"),
18 INFLUX_F_STR("s", "string"), INFLUX_F_FLT("f", 28.39, 2),
19
20 INFLUX_MEAS("bar"),
21 INFLUX_F_INT("i", 1048576), INFLUX_F_BOL("b", 1),
22 INFLUX_TS(1512722735522840439),
23
24 INFLUX_END);
25
26 **NOTICE**: For best performance you should sort tags by key before sending them to the database.
27 The sort should match the results from the [Go bytes.Compare
28 function](https://golang.org/pkg/bytes/#Compare).
29 */
30
31#define INFLUX_MEAS(m) IF_TYPE_MEAS, (m)
32#define INFLUX_TAG(k, v) IF_TYPE_TAG, (k), (v)
33#define INFLUX_F_STR(k, v) IF_TYPE_FIELD_STRING, (k), (v)
34#define INFLUX_F_FLT(k, v, p) IF_TYPE_FIELD_FLOAT, (k), (double)(v), (int)(p)
35#define INFLUX_F_INT(k, v) IF_TYPE_FIELD_INTEGER, (k), (long long)(v)
36#define INFLUX_F_BOL(k, v) IF_TYPE_FIELD_BOOLEAN, (k), ((v) ? 1 : 0)
37#define INFLUX_TS(ts) IF_TYPE_TIMESTAMP, (long long)(ts)
38#define INFLUX_END IF_TYPE_ARG_END
39
40typedef struct _influx_client_t {
41 char* host;
42 int port;
43 char* db; // http only
44 char* usr; // http only [optional for auth]
45 char* pwd; // http only [optional for auth]
46 char* token; // http only
48
49typedef struct _influx_v2_client_t {
50 char* host;
51 int port;
52 char* org;
53 char* bucket;
54 char* precision;
55 char* usr; // http only [optional for auth]
56 char* pwd; // http only [optional for auth]
57 char* token; // http only
59
60int format_line(char** buf, int* len, size_t used, ...);
61int post_http(influx_client_t* c, ...);
62int send_udp(influx_client_t* c, ...);
63int post_curl(influx_v2_client_t* c, ...);
64
65#define IF_TYPE_ARG_END 0
66#define IF_TYPE_MEAS 1
67#define IF_TYPE_TAG 2
68#define IF_TYPE_FIELD_STRING 3
69#define IF_TYPE_FIELD_FLOAT 4
70#define IF_TYPE_FIELD_INTEGER 5
71#define IF_TYPE_FIELD_BOOLEAN 6
72#define IF_TYPE_TIMESTAMP 7
73
74int _escaped_append(char** dest, size_t* len, size_t* used, const char* src, const char* escape_seq);
75int _begin_line(char** buf);
76int _format_line(char** buf, va_list ap);
77int _format_line2(char** buf, va_list ap, size_t*, size_t);
78int post_http_send_line(influx_client_t* c, char* buf, int len);
79int send_udp_line(influx_client_t* c, char* line, int len);
80
81int post_http_send_line(influx_client_t* c, char* buf, int len) {
82 int sock = -1, ret_code = 0, content_length = 0;
83 struct sockaddr_in addr;
84 struct iovec iv[2];
85 char ch;
86
87 iv[1].iov_base = buf;
88 iv[1].iov_len = len;
89
90 if (!(iv[0].iov_base = (char*)malloc(len = 0x800))) {
91 free(iv[1].iov_base);
92 return -2;
93 }
94
95 for (;;) {
96 iv[0].iov_len =
97 snprintf((char*)iv[0].iov_base, len,
98 "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\n"
99 "Host: %s\r\n"
100 "Accept: application/json\r\n"
101 "Content-type: text/plain\r\n"
102 "Authorization: Token %s\r\n"
103 "Content-Length: %zd\r\n"
104 "\r\n", // Final blank line is needed.
105 c->db, c->usr ? c->usr : "", c->pwd ? c->pwd : "", c->host, c->token ? c->token : "", iv[1].iov_len);
106 if ((int)iv[0].iov_len >= len && !(iv[0].iov_base = (char*)realloc(iv[0].iov_base, len *= 2))) {
107 free(iv[1].iov_base);
108 free(iv[0].iov_base);
109 return -3;
110 } else
111 break;
112 }
113
114 fprintf(stderr, "influxdb-c::post_http: iv[0] = '%s'\n", (char*)iv[0].iov_base);
115 fprintf(stderr, "influxdb-c::post_http: iv[1] = '%s'\n", (char*)iv[1].iov_base);
116
117 addr.sin_family = AF_INET;
118 addr.sin_port = htons(c->port);
119 // EAL: Rather than just an IP address, allow a hostname, like "localhost"
120 struct hostent* resolved_host = gethostbyname(c->host);
121 if (!resolved_host) {
122 free(iv[1].iov_base);
123 free(iv[0].iov_base);
124 return -4;
125 }
126 memcpy(&addr.sin_addr, resolved_host->h_addr_list[0], resolved_host->h_length);
127 /*
128 if((addr.sin_addr.s_addr = inet_addr(resolved_host->h_addr)) == INADDR_NONE) {
129 free(iv[1].iov_base);
130 free(iv[0].iov_base);
131 return -4;
132 }
133 */
134
135 if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
136 free(iv[1].iov_base);
137 free(iv[0].iov_base);
138 return -5;
139 }
140
141 if (connect(sock, (struct sockaddr*)(&addr), sizeof(addr)) < 0) {
142 ret_code = -6;
143 goto END;
144 }
145
146 if (writev(sock, iv, 2) < (int)(iv[0].iov_len + iv[1].iov_len)) {
147 ret_code = -7;
148 goto END;
149 }
150 iv[0].iov_len = len;
151
152#define _GET_NEXT_CHAR() \
153 (ch = (len >= (int)iv[0].iov_len && \
154 (iv[0].iov_len = recv(sock, iv[0].iov_base, iv[0].iov_len, len = 0)) == (size_t)(-1) \
155 ? 0 \
156 : *((char*)iv[0].iov_base + len++)))
157#define _LOOP_NEXT(statement) \
158 for (;;) { \
159 if (!(_GET_NEXT_CHAR())) { \
160 ret_code = -8; \
161 goto END; \
162 } \
163 statement \
164 }
165#define _UNTIL(c) _LOOP_NEXT(if (ch == c) break;)
166#define _GET_NUMBER(n) _LOOP_NEXT(if (ch >= '0' && ch <= '9') n = n * 10 + (ch - '0'); else break;)
167#define _(c) \
168 if ((_GET_NEXT_CHAR()) != c) \
169 break;
170
171 _UNTIL(' ') _GET_NUMBER(ret_code) for (;;) {
172 _UNTIL('\n')
173 switch (_GET_NEXT_CHAR()) {
174 case 'C':
175 _('o')
176 _('n')
177 _('t')
178 _('e')
179 _('n') _('t') _('-') _('L') _('e') _('n') _('g') _('t') _('h') _(':') _(' ') _GET_NUMBER(content_length) break;
180 case '\r':
181 _('\n')
182 while (content_length-- > 0 && _GET_NEXT_CHAR())
183 ; // printf("%c", ch);
184 goto END;
185 }
186 if (!ch) {
187 ret_code = -10;
188 goto END;
189 }
190 }
191 ret_code = -11;
192END:
193 close(sock);
194 free(iv[0].iov_base);
195 free(iv[1].iov_base);
196 return ret_code / 100 == 2 ? 0 : ret_code;
197}
198#undef _GET_NEXT_CHAR
199#undef _LOOP_NEXT
200#undef _UNTIL
201#undef _GET_NUMBER
202#undef _
203
205 va_list ap;
206 char* line = NULL;
207 int ret_code = 0, len = 0;
208
209 va_start(ap, c);
210 len = _format_line((char**)&line, ap);
211 va_end(ap);
212 if (len < 0)
213 return -1;
214
215 ret_code = post_http_send_line(c, line, len);
216
217 return ret_code;
218}
219
220int send_udp_line(influx_client_t* c, char* line, int len) {
221 int sock = -1, ret = 0;
222 struct sockaddr_in addr;
223
224 addr.sin_family = AF_INET;
225 addr.sin_port = htons(c->port);
226 if ((addr.sin_addr.s_addr = inet_addr(c->host)) == INADDR_NONE) {
227 ret = -2;
228 goto END;
229 }
230
231 if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
232 ret = -3;
233 goto END;
234 }
235
236 if (sendto(sock, line, len, 0, (struct sockaddr*)&addr, sizeof(addr)) < len)
237 ret = -4;
238
239END:
240 if (sock >= 0) {
241 close(sock);
242 }
243 return ret;
244}
245
247 int ret = 0, len;
248 va_list ap;
249 char* line = NULL;
250
251 va_start(ap, c);
252 len = _format_line(&line, ap);
253 va_end(ap);
254 if (len < 0)
255 return -1;
256
257 ret = send_udp_line(c, line, len);
258
259 free(line);
260 return ret;
261}
262
264 va_list ap;
265 char* data = NULL;
266 int len = 0;
267 va_start(ap, c);
268 len = _format_line((char**)&data, ap);
269 va_end(ap);
270
271 CURL* curl;
272
273 /* In windows, this will init the winsock stuff */
274 curl_global_init(CURL_GLOBAL_ALL);
275 CURLcode res;
276
277 /* get a curl handle */
278 curl = curl_easy_init();
279 if (!curl) {
280 return CURLE_FAILED_INIT;
281 }
282
283 char* url_string = (char*)malloc(len);
284 snprintf(url_string, len, "http://%s:%d/api/v2/write?org=%s&bucket=%s&precision=%s", c->host ? c->host : "localhost",
285 c->port ? c->port : 8086, c->org, c->bucket, c->precision ? c->precision : "ns");
286
287 curl_easy_setopt(curl, CURLOPT_URL, url_string);
288 free(url_string);
289
290 char* token_string = (char*)malloc(120 * sizeof(char));
291 sprintf(token_string, "Authorization: Token %s", c->token);
292
293 struct curl_slist* list = NULL;
294 list = curl_slist_append(list, token_string);
295 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list);
296 free(token_string);
297
298 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data);
299 curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcurl-agent/1.0");
300
301 /* Perform the request, res will get the return code */
302 res = curl_easy_perform(curl);
303 /* Check for errors */
304 if (res != CURLE_OK) {
305 fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
306 }
307
308 free(data);
309 curl_easy_cleanup(curl);
310 curl_global_cleanup();
311 return res;
312}
313
314int format_line(char** buf, int* len, size_t used, ...) {
315 va_list ap;
316 va_start(ap, used);
317 used = _format_line2(buf, ap, (size_t*)len, used);
318 va_end(ap);
319 if (*len < 0)
320 return -1;
321 else
322 return used;
323}
324
325int _begin_line(char** buf) {
326 int len = 0x100;
327 if (!(*buf = (char*)malloc(len)))
328 return -1;
329 return len;
330}
331
332int _format_line(char** buf, va_list ap) {
333 size_t len = 0;
334 *buf = NULL;
335 return _format_line2(buf, ap, &len, 0);
336}
337
338int _format_line2(char** buf, va_list ap, size_t* _len, size_t used) {
339#define _APPEND(fmter...) \
340 for (;;) { \
341 if ((written = snprintf(*buf + used, len - used, ##fmter)) < 0) \
342 goto FAIL; \
343 if (used + written >= len && !(*buf = (char*)realloc(*buf, len *= 2))) \
344 return -1; \
345 else { \
346 used += written; \
347 break; \
348 } \
349 }
350
351 size_t len = *_len;
352 int written = 0, type = 0, last_type = 0;
353 unsigned long long i = 0;
354 double d = 0.0;
355
356 if (*buf == NULL) {
357 len = _begin_line(buf);
358 used = 0;
359 }
360
361 type = va_arg(ap, int);
362 while (type != IF_TYPE_ARG_END) {
363 if (type >= IF_TYPE_TAG && type <= IF_TYPE_FIELD_BOOLEAN) {
364 if (last_type < IF_TYPE_MEAS || last_type > (type == IF_TYPE_TAG ? IF_TYPE_TAG : IF_TYPE_FIELD_BOOLEAN))
365 goto FAIL;
366 _APPEND("%c", (last_type <= IF_TYPE_TAG && type > IF_TYPE_TAG) ? ' ' : ',');
367 if (_escaped_append(buf, &len, &used, va_arg(ap, char*), ",= "))
368 return -2;
369 _APPEND("=");
370 }
371 switch (type) {
372 case IF_TYPE_MEAS:
373 if (last_type)
374 _APPEND("\n");
375 if (last_type && last_type <= IF_TYPE_TAG)
376 goto FAIL;
377 if (_escaped_append(buf, &len, &used, va_arg(ap, char*), ", "))
378 return -3;
379 break;
380 case IF_TYPE_TAG:
381 if (_escaped_append(buf, &len, &used, va_arg(ap, char*), ",= "))
382 return -4;
383 break;
385 _APPEND("\"");
386 if (_escaped_append(buf, &len, &used, va_arg(ap, char*), "\""))
387 return -5;
388 _APPEND("\"");
389 break;
391 d = va_arg(ap, double);
392 i = va_arg(ap, int);
393 _APPEND("%.*lf", (int)i, d);
394 break;
396 i = va_arg(ap, long long);
397 _APPEND("%lldi", i);
398 break;
400 i = va_arg(ap, int);
401 _APPEND("%c", i ? 't' : 'f');
402 break;
404 if (last_type < IF_TYPE_FIELD_STRING || last_type > IF_TYPE_FIELD_BOOLEAN)
405 goto FAIL;
406 i = va_arg(ap, long long);
407 _APPEND(" %lld", i);
408 break;
409 default:
410 goto FAIL;
411 }
412 last_type = type;
413 type = va_arg(ap, int);
414 }
415 _APPEND("\n");
416 if (last_type <= IF_TYPE_TAG)
417 goto FAIL;
418 *_len = len;
419 return used;
420FAIL:
421 free(*buf);
422 *buf = NULL;
423 return -1;
424}
425#undef _APPEND
426
427int _escaped_append(char** dest, size_t* len, size_t* used, const char* src, const char* escape_seq) {
428 size_t i = 0;
429
430 for (;;) {
431 if ((i = strcspn(src, escape_seq)) > 0) {
432 if (*used + i > *len && !(*dest = (char*)realloc(*dest, (*len) *= 2)))
433 return -1;
434 strncpy(*dest + *used, src, i);
435 *used += i;
436 src += i;
437 }
438 if (*src) {
439 if (*used + 2 > *len && !(*dest = (char*)realloc(*dest, (*len) *= 2)))
440 return -2;
441 (*dest)[(*used)++] = '\\';
442 (*dest)[(*used)++] = *src++;
443 } else
444 return 0;
445 }
446 return 0;
447}
return ret
Definition hashmap.h:118
void HASHMAP free(HASHMAP(t) *hashmap)
Free all memory used by the given hashmap.
Definition hashmap.h:121
struct _influx_v2_client_t influx_v2_client_t
int format_line(char **buf, int *len, size_t used,...)
Definition influxdb.h:314
#define _APPEND(fmter...)
int _format_line2(char **buf, va_list ap, size_t *, size_t)
Definition influxdb.h:338
#define IF_TYPE_TAG
Definition influxdb.h:67
#define IF_TYPE_ARG_END
Definition influxdb.h:65
#define _GET_NEXT_CHAR()
#define IF_TYPE_FIELD_STRING
Definition influxdb.h:68
#define _UNTIL(c)
int send_udp(influx_client_t *c,...)
Definition influxdb.h:246
#define IF_TYPE_FIELD_BOOLEAN
Definition influxdb.h:71
int post_http_send_line(influx_client_t *c, char *buf, int len)
Definition influxdb.h:81
int _escaped_append(char **dest, size_t *len, size_t *used, const char *src, const char *escape_seq)
Definition influxdb.h:427
int _format_line(char **buf, va_list ap)
Definition influxdb.h:332
#define _GET_NUMBER(n)
#define IF_TYPE_FIELD_INTEGER
Definition influxdb.h:70
int send_udp_line(influx_client_t *c, char *line, int len)
Definition influxdb.h:220
int post_curl(influx_v2_client_t *c,...)
Definition influxdb.h:263
#define IF_TYPE_TIMESTAMP
Definition influxdb.h:72
int post_http(influx_client_t *c,...)
Definition influxdb.h:204
#define IF_TYPE_FIELD_FLOAT
Definition influxdb.h:69
struct _influx_client_t influx_client_t
#define IF_TYPE_MEAS
Definition influxdb.h:66
int _begin_line(char **buf)
Definition influxdb.h:325
#define _(c)
#define fprintf(stream, fmt,...)
Definition lf_flexpret_support.h:95
Definition influxdb.h:40
char * pwd
Definition influxdb.h:45
int port
Definition influxdb.h:42
char * usr
Definition influxdb.h:44
char * token
Definition influxdb.h:46
char * db
Definition influxdb.h:43
char * host
Definition influxdb.h:41
Definition influxdb.h:49
int port
Definition influxdb.h:51
char * bucket
Definition influxdb.h:53
char * usr
Definition influxdb.h:55
char * precision
Definition influxdb.h:54
char * token
Definition influxdb.h:57
char * org
Definition influxdb.h:52
char * host
Definition influxdb.h:50
char * pwd
Definition influxdb.h:56