17 #ifndef NMSG_PRIVATE_H
18 #define NMSG_PRIVATE_H
20 #include "nmsg_port_net.h"
25 # ifdef HAVE_SYS_ENDIAN_H
26 # include <sys/endian.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
56 #include <protobuf-c/protobuf-c.h>
63 #include <yajl/yajl_gen.h>
64 #include <yajl/yajl_tree.h>
68 #include "nmsg.pb-c.h"
71 #include "msgmod_plugin.h"
74 #include "libmy/crc32c.h"
75 #include "libmy/list.h"
76 #include "libmy/tree.h"
77 #include "libmy/ubuf.h"
78 #include "libmy/b64_decode.h"
79 #include "libmy/b64_encode.h"
80 #include "libmy/vector.h"
85 #define XSTR(x) STR(x)
87 #define NMSG_SEQSRC_GC_INTERVAL 120
88 #define NMSG_FRAG_GC_INTERVAL 30
89 #define NMSG_NSEC_PER_SEC 1000000000
91 #define NMSG_FLT_MODULE_PREFIX "nmsg_flt" XSTR(NMSG_FLTMOD_VERSION)
92 #define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION)
94 #define NMSG_MODULE_SUFFIX ".so"
96 #define _nmsg_dprintf(level, format, ...) \
98 if (_nmsg_global_debug >= (level)) \
99 fprintf(stderr, format, ##__VA_ARGS__); \
102 #define _nmsg_dprintfv(var, level, format, ...) \
104 if ((var) >= (level)) \
105 fprintf(stderr, format, ##__VA_ARGS__); \
111 nmsg_stream_type_file,
112 nmsg_stream_type_sock,
113 nmsg_stream_type_zmq,
114 nmsg_stream_type_null,
121 struct nmsg_container;
141 extern bool _nmsg_global_autoclose;
142 extern int _nmsg_global_debug;
158 uint64_t sequence_id;
171 uint64_t sequence_id;
173 uint64_t count_dropped;
176 char addr_str[INET6_ADDRSTRLEN];
183 struct sockaddr_storage addr_ss;
192 ProtobufCBinaryData *frags;
213 struct _nmsg_ipreasm *reasm;
218 struct bpf_program userbpf;
226 pthread_mutex_t lock;
236 pthread_mutex_t lock;
244 nmsg_stream_type type;
255 struct timespec lastgc;
265 struct nmsg_brate *brate;
267 struct sockaddr_storage addr_ss;
271 nmsg_input_stream_read_fp stream_read_fp;
276 pthread_mutex_t lock;
277 nmsg_stream_type type;
284 nmsg_random_t random;
293 uint64_t sequence_id;
311 nmsg_msgmod_t msgmod;
320 nmsg_input_read_fp read_fp;
321 nmsg_input_read_loop_fp read_loop_fp;
325 unsigned filter_msgtype;
338 nmsg_output_write_fp write_fp;
339 nmsg_output_flush_fp flush_fp;
343 unsigned filter_msgtype;
350 ProtobufCMessage *message;
393 typedef enum nmsg_msgmod_clos_mode {
394 nmsg_msgmod_clos_m_keyval,
395 nmsg_msgmod_clos_m_multiline
396 } nmsg_msgmod_clos_mode;
401 nmsg_msgmod_clos_mode mode;
431 void _nmsg_alias_fini(
void);
435 ssize_t _nmsg_buf_avail(
struct nmsg_buf *buf);
436 ssize_t _nmsg_buf_used(
struct nmsg_buf *buf);
437 struct nmsg_buf * _nmsg_buf_new(
size_t sz);
438 void _nmsg_buf_destroy(
struct nmsg_buf **buf);
439 void _nmsg_buf_reset(
struct nmsg_buf *buf);
443 struct nmsg_dlmod * _nmsg_dlmod_init(
const char *path);
444 void _nmsg_dlmod_destroy(
struct nmsg_dlmod **dlmod);
458 nmsg_message_t _nmsg_message_dup(
struct nmsg_message *msg);
479 nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
481 nmsg_res _input_nmsg_unpack_container(nmsg_input_t,
Nmsg__Nmsg **, uint8_t *,
size_t);
482 nmsg_res _input_nmsg_unpack_container2(
const uint8_t *,
size_t,
unsigned,
Nmsg__Nmsg **);
488 nmsg_res _input_nmsg_deserialize_header(
const uint8_t *,
size_t, ssize_t *,
unsigned *);
491 nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
494 nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
498 nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
499 nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
502 nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
505 nmsg_res _input_json_read(nmsg_input_t, nmsg_message_t *);
509 void _input_seqsrc_destroy(nmsg_input_t);
513 void _output_stop(nmsg_output_t);
516 nmsg_res _output_frag_write(nmsg_output_t);
519 nmsg_res _output_nmsg_flush(nmsg_output_t);
520 nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
521 nmsg_res _output_nmsg_write_container(nmsg_output_t);
522 nmsg_res _output_nmsg_write_sock(nmsg_output_t, uint8_t *buf,
size_t len);
523 nmsg_res _output_nmsg_write_file(nmsg_output_t, uint8_t *buf,
size_t len);
525 nmsg_res _output_nmsg_write_zmq(nmsg_output_t, uint8_t *buf,
size_t len);
529 nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
532 nmsg_res _output_json_write(nmsg_output_t, nmsg_message_t);
535 struct nmsg_brate * _nmsg_brate_init(
size_t target_byte_rate);
536 void _nmsg_brate_destroy(
struct nmsg_brate **);
537 void _nmsg_brate_sleep(
struct nmsg_brate *,
size_t container_sz,
size_t n_payloads,
size_t n);
586 _nmsg_ipdg_parse_reasm(
struct nmsg_ipdg *dg,
unsigned etype,
size_t len,
587 const u_char *pkt,
struct _nmsg_ipreasm *reasm,
588 unsigned *new_len, u_char *new_pkt,
int *defrag,
Implementing message filter modules.
Base nmsg support header.
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
nmsg_output_type
An enum identifying the underlying implementation of an nmsg_output_t object.
an nmsg_message MUST always have a non-NULL ->np member.
Structure mapping protocol buffer schema fields to nmsg_msgmod_field_type values for "transparent" mo...
Structure exported by message modules to implement a new message type.